Apache Spark 2.0-Jobs brauchen lange, bis sie abgeschlossen sind

Apache Spark 2.0-Jobs brauchen lange, bis sie abgeschlossen sind

Phänomen

Bei der Verwendung von Apache Spark 2.x kann folgendes Phänomen auftreten: Obwohl unsere Spark-Jobs alle abgeschlossen sind, wird unser Programm immer noch ausgeführt. Beispielsweise verwenden wir Spark SQL, um SQL auszuführen, was am Ende eine große Anzahl von Dateien generiert. Dann können wir sehen, dass alle Spark-Jobs dieses SQL tatsächlich vollständig ausgeführt wurden, diese Abfrageanweisung jedoch immer noch ausgeführt wird. Aus dem Protokoll können wir ersehen, dass der Treiberknoten die von den Aufgaben generierten Dateien nacheinander in das Verzeichnis der endgültigen Tabelle verschiebt. Dieses Phänomen tritt leicht auf, wenn unser Job viele Dateien generiert. Dieser Artikel stellt eine Methode zur Lösung dieses Problems vor.

Warum tritt dieses Phänomen auf?

Spark 2.x verwendet Hadoop 2.x. Wenn es die generierte Datei in HDFS speichert, ruft es schließlich saveAsHadoopFile auf, das FileOutputCommitter verwendet, und zwar wie folgt:

Das Problem liegt in der FileOutputCommitter-Implementierung von Hadoop 2.x. In FileOutputCommitter gibt es zwei bemerkenswerte Methoden: commitTask und commitJob. In der FileOutputCommitter-Implementierung von Hadoop 2.x steuert der Parameter mapreduce.fileoutputcommitter.algorithm.version, wie commitTask und commitJob funktionieren. Der spezifische Code lautet wie folgt (der Einfachheit halber habe ich irrelevante Anweisungen entfernt. Den vollständigen Code finden Sie in FileOutputCommitter.java):

Wie Sie sehen können, gibt es in der Methode commitTask eine bedingte Beurteilung algorithmVersion == 1, die dem Wert des Parameters mapreduce.fileoutputcommitter.algorithm.version entspricht, der standardmäßig 1 ist. Wenn dieser Parameter 1 ist, werden die von der Aufgabe vorübergehend generierten Daten nach Abschluss der Aufgabe in das entsprechende Verzeichnis der Aufgabe verschoben und dann beim Aufruf von commitJob in das endgültige Jobausgabeverzeichnis verschoben. Der Standardwert dieses Parameters in Hadoop 2.x ist 1! Aus diesem Grund sehen wir, dass der Job abgeschlossen ist, das Programm jedoch noch Daten verschiebt, was dazu führt, dass der gesamte Job nicht abgeschlossen wird. Am Ende wird die CommitJob-Funktion vom Spark-Treiber ausgeführt, sodass es einen Grund für die langsame Ausführung gibt.

Und wir können sehen, dass, wenn wir den Wert des Parameters mapreduce.fileoutputcommitter.algorithm.version auf 2 setzen, bei der Ausführung von commitTask die Methode mergePaths aufgerufen wird, um die vom Task generierten Daten direkt aus dem temporären Verzeichnis des Tasks in das schließlich vom Programm generierte Verzeichnis zu verschieben. Beim Ausführen von commitJob müssen Daten nicht direkt verschoben werden, daher ist es natürlich viel schneller als der Standardwert.

Beachten Sie, dass wir dies in Versionen vor Hadoop 2.7.0 erreichen können, indem wir den Parameter mapreduce.fileoutputcommitter.algorithm.version auf einen anderen Wert als 1 setzen, da das Programm diesen Wert nicht auf 2 begrenzt. Ab Hadoop 2.7.0 muss der Wert des Parameters mapreduce.fileoutputcommitter.algorithm.version jedoch 1 oder 2 sein. Einzelheiten finden Sie unter MAPREDUCE-4815.

So legen Sie diesen Parameter in Spark fest

Das Problem wurde gefunden und wir können es im Programm lösen. Es gibt mehrere Möglichkeiten:

  • Setzen Sie spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2 direkt in conf/spark-defaults.conf . Dies hat globale Auswirkungen.
  • Legen Sie es direkt im Spark-Programm fest, spark.conf.set("mapreduce.fileoutputcommitter.algorithm.version", "2"), dies erfolgt auf Jobebene.
  • Wenn Sie die Dataset-API zum Schreiben von Daten in HDFS verwenden, können Sie dataset.write.option("mapreduce.fileoutputcommitter.algorithm.version", "2") festlegen.

Wenn Ihre Hadoop-Version jedoch 3.x ist, ist der Standardwert des Parameters mapreduce.fileoutputcommitter.algorithm.version bereits auf 2 eingestellt. Einzelheiten finden Sie unter MAPREDUCE-6336 und MAPREDUCE-6406.

Da dieser Parameter einen gewissen Einfluss auf die Leistung hat, wurde er in Spark 2.2.0 im Spark-Konfigurationsdokument configuration.html aufgezeichnet. Weitere Informationen finden Sie unter SPARK-20107.

Zusammenfassen

Oben habe ich Ihnen Apache Spark 2.0 vorgestellt. Ich hoffe, es wird Ihnen helfen!

Das könnte Sie auch interessieren:
  • So verwenden Sie Spark und Scala zum Analysieren von Apache-Zugriffsprotokollen
  • Was sind die neuen Funktionen von Apache Spark 2.4, das 2018 veröffentlicht wird?

<<:  Perfekte Lösung für das Problem, dass Daten abgeschnitten werden, wenn die Funktion „group concat“ in Mysql5.7 verwendet wird

>>:  JS beherrscht schnell die Verwendung von ES6-Klassen

Artikel empfehlen

Das Prinzip und die Implementierung des JS-Drag-Effekts

Die Drag-Funktion wird hauptsächlich verwendet, u...

So verwenden Sie Axios-Anfragen im Vue-Projekt

Inhaltsverzeichnis 1. Installation 2. Es gibt kei...

Beispiel für die Verwendung des href-Attributs und des onclick-Ereignisses eines Tags

Das „a“-Tag wird hauptsächlich verwendet, um Seit...

Detaillierte Erklärung des Unterschieds zwischen Vue-Lebenszyklus

Lebenszyklusklassifizierung Jede Komponente von V...

Was bedeutet das „a“ in rgba? CSS RGBA-Farbleitfaden

RGBA ist eine CSS-Farbe, mit der Farbwert und Tra...

Grundlegendes Tutorial zum WeChat-Miniprogramm: Verwendung von Echart

Vorwort Schauen wir uns zunächst den Endeffekt an...

Linux-Installation Redis-Implementierungsprozess und Fehlerlösung

Ich habe heute Redis installiert und es sind eini...

Reine JS-Methode zum Exportieren von Tabellen nach Excel

html <div > <button type="button&qu...