Dataframe ist eine neue API, die in Spark 1.3.0 eingeführt wurde und es Spark ermöglicht, strukturierte Daten in großem Maßstab zu verarbeiten. Es ist einfacher zu verwenden als die ursprüngliche RDD-Konvertierungsmethode und seine Rechenleistung soll doppelt so schnell sein. Spark kann RDD in der Offline-Batchverarbeitung oder im Echtzeit-Computing in DataFrame konvertieren und dann die Daten über einfache SQL-Befehle bearbeiten. Für Personen, die mit SQL vertraut sind, ist der Konvertierungs- und Filterprozess sehr praktisch und kann sogar Anwendungen auf höherer Ebene enthalten. Beispielsweise werden in Echtzeit der Themenname und die SQL-Anweisung von Kafka übergeben, und der Hintergrund liest die konfigurierten Inhaltsfelder, spiegelt sie in einer Klasse wider und verwendet das Eingabe- und Ausgabe-SQL, um die Echtzeitdaten zu berechnen. In diesem Fall können auch Personen, die Spark Streaming nicht kennen, problemlos die Vorteile des Echtzeit-Computings nutzen. Das folgende Beispiel zeigt den Vorgang, eine lokale Datei in ein RDD zu lesen, sie implizit in einen DataFrame zu konvertieren, um die Daten abzufragen und sie schließlich in Form eines Anhängens in eine MySQL-Tabelle zu schreiben. Das Scala-Codebeispiel lautet wie folgt importiere java.sql.Timestamp importiere org.apache.spark.sql.{SaveMode, SQLContext} importiere org.apache.spark.{SparkContext, SparkConf} Objekt DataFrameSql { Fallklasse Memberbase (data_date:Long,memberid:String,createtime:Timestamp,sp:Int)erweitert Serializable{ überschreibe def toString: String="%d\t%s\t%s\t%d".format(data_date,memberid,createtime,sp) } def main(args:Array[String]): Einheit = { val conf = neue SparkConf() conf.setMaster("lokal[2]") // ---------------------- //Der Parameter spark.sql.autoBroadcastJoinThreshold legt fest, ob eine Tabelle gesendet werden soll. Der Standardwert ist 10 M. Wird auf -1 gesetzt, um zu deaktivieren. //spark.sql.codegen gibt an, ob SQL in Java-Bytecode vorkompiliert werden soll. Langes oder häufiges SQL hat einen Optimierungseffekt. // spark.sql.inMemoryColumnarStorage.batchSize: Die Anzahl der gleichzeitig verarbeiteten Zeilen. Achten Sie auf OOM. //spark.sql.inMemoryColumnarStorage.compressed legt fest, ob der Spaltenspeicher im Speicher komprimiert werden muss// ---------------------- conf.set("spark.sql.shuffle.partitions","20") //Die Standardpartition ist 200 conf.setAppName("dataframe test") val sc = neuer SparkContext(conf) val sqc = neuer SQLContext(sc) val ac = sc.accumulator(0,"Fehlernums") val Datei = sc.textFile("src\\main\\resources\\000000_0") val log = file.map(Zeilen => Zeilen.split(" ")).filter(Zeile => if (line.length != 4) { //Führe einen einfachen Filter aus ac.add(1) FALSCH } sonst wahr) .map(Zeile => Mitgliederbasis(Zeile(0).toLong, Zeile(1),Timestamp.valueOf(Zeile(2)), Zeile(3).toInt)) // Methode 1: Implizite Konvertierung verwenden import sqc.implicits._ val dftemp = log.toDF() // Konvertierung/* Methode 2: Verwenden Sie die Methode createDataFrame, um Felder und ihre Typen mithilfe der internen Reflexion abzurufen: val dftemp = sqc.createDataFrame(log) */ val df = dftemp.registerTempTable("Mitgliedsdatenbankinfo") /*val sqlcommand = "Wählen Sie date_format(createtime,'yyyy-MM')als mm,Anzahl(1) als Nums" + "aus der Memberbaseinfo-Gruppe nach date_format(createtime,'yyyy-MM')" + "Sortieren nach Zahlen absteigend, mm aufsteigend"*/ val sqlcommand="Wählen Sie * aus Memberbaseinfo" val sel = sqc.sql(SQL-Befehl) val prop = neue java.util.Properties prop.setProperty("Benutzer","etl") prop.setProperty("Passwort","xxx") //Rufen Sie DataFrameWriter auf, um Daten in MySQL zu schreiben val dataResult = sqc.sql(sqlcommand).write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/test","t_spark_dataframe_test",prop) // Die Tabelle existiert möglicherweise nicht println(ac.name.get+" "+ac.value) sc.stop() } } Die Beispieldaten in der Textdatei im obigen Code lauten wie folgt. Die Daten stammen aus Hive. Die Feldinformationen sind Partitionsnummer, Benutzer-ID, Registrierungszeit und Drittanbieternummer. 20160309 45386477 2012-06-12 20:13:15 901438 20160309 45390977 2012-06-12 22:38:06 901036 20160309 45446677 2012-06-14 21:57:39 901438 20160309 45464977 2012-06-15 13:42:55 901438 20160309 45572377 2012-06-18 14:55:03 902606 20160309 45620577 2012-06-20 00:21:09 902606 20160309 45628377 2012-06-20 10:48:05 901181 20160309 45628877 2012-06-20 11:10:15 902606 20160309 45667777 2012-06-21 18:58:34 902524 20160309 45680177 2012-06-22 01:49:55 20160309 45687077 2012-06-22 11:23:22 902607 Beachten Sie hier die Feldtypzuordnung, also die Zuordnung von Fallklasse zu Datenrahmen, wie im Screenshot von der offiziellen Website gezeigt: Weitere Einzelheiten finden Sie im offiziellen Dokument Spark SQL and DataFrame Guide. Das obige Beispiel zur Konvertierung von Spark RDD in einen Dataframe und zum Schreiben in MySQL ist alles, was ich mit Ihnen teilen möchte. Ich hoffe, es kann Ihnen als Referenz dienen. Ich hoffe auch, dass Sie 123WORDPRESS.COM unterstützen werden. Das könnte Sie auch interessieren:
|
<<: So führen Sie eine Spring Boot-Anwendung in Docker aus
>>: So verwenden Sie den Vue-Filter
will-change teilt dem Browser mit, welche Änderun...
In diesem Artikel wird der spezifische Code für J...
Vorwort Was ist Staat Wir alle sagen, dass React ...
Dieser Artikel fasst hauptsächlich einige häufig ...
Inhaltsverzeichnis Die Groß-/Kleinschreibung von ...
Klicken Sie hier, um zum Abschnitt „HTML-Tutorial“...
1. Regulärer Ausdruck für den Standort Schauen wi...
Das Betrachten einer Website ist eigentlich wie di...
(Wenn eine Webseite geladen wird, gibt es manchma...
Basierend auf täglichen Entwicklungserfahrungen u...
Nachdem Sie den folgenden Artikel gelesen haben, ...
1. Einleitung Nginx ist ein kostenloser, quelloff...
Jeder hat schon Flipper und Ziegelsteinzertrümmer...
Zahlungs-Countdown, um zur Startseite zurückzukeh...
Das Hauptsymptom des Konflikts besteht darin, dass...