Beispiel für die Konvertierung von Spark RDD in einen Dataframe und das Schreiben in MySQL

Beispiel für die Konvertierung von Spark RDD in einen Dataframe und das Schreiben in MySQL

Dataframe ist eine neue API, die in Spark 1.3.0 eingeführt wurde und es Spark ermöglicht, strukturierte Daten in großem Maßstab zu verarbeiten. Es ist einfacher zu verwenden als die ursprüngliche RDD-Konvertierungsmethode und seine Rechenleistung soll doppelt so schnell sein. Spark kann RDD in der Offline-Batchverarbeitung oder im Echtzeit-Computing in DataFrame konvertieren und dann die Daten über einfache SQL-Befehle bearbeiten. Für Personen, die mit SQL vertraut sind, ist der Konvertierungs- und Filterprozess sehr praktisch und kann sogar Anwendungen auf höherer Ebene enthalten. Beispielsweise werden in Echtzeit der Themenname und die SQL-Anweisung von Kafka übergeben, und der Hintergrund liest die konfigurierten Inhaltsfelder, spiegelt sie in einer Klasse wider und verwendet das Eingabe- und Ausgabe-SQL, um die Echtzeitdaten zu berechnen. In diesem Fall können auch Personen, die Spark Streaming nicht kennen, problemlos die Vorteile des Echtzeit-Computings nutzen.

Das folgende Beispiel zeigt den Vorgang, eine lokale Datei in ein RDD zu lesen, sie implizit in einen DataFrame zu konvertieren, um die Daten abzufragen und sie schließlich in Form eines Anhängens in eine MySQL-Tabelle zu schreiben. Das Scala-Codebeispiel lautet wie folgt

importiere java.sql.Timestamp
importiere org.apache.spark.sql.{SaveMode, SQLContext}
importiere org.apache.spark.{SparkContext, SparkConf}
Objekt DataFrameSql {
 Fallklasse Memberbase (data_date:Long,memberid:String,createtime:Timestamp,sp:Int)erweitert Serializable{
 überschreibe def toString: String="%d\t%s\t%s\t%d".format(data_date,memberid,createtime,sp)
 }
 def main(args:Array[String]): Einheit = {
 val conf = neue SparkConf()
 conf.setMaster("lokal[2]")
// ----------------------
 //Der Parameter spark.sql.autoBroadcastJoinThreshold legt fest, ob eine Tabelle gesendet werden soll. Der Standardwert ist 10 M. Wird auf -1 gesetzt, um zu deaktivieren. //spark.sql.codegen gibt an, ob SQL in Java-Bytecode vorkompiliert werden soll. Langes oder häufiges SQL hat einen Optimierungseffekt. // spark.sql.inMemoryColumnarStorage.batchSize: Die Anzahl der gleichzeitig verarbeiteten Zeilen. Achten Sie auf OOM.
 //spark.sql.inMemoryColumnarStorage.compressed legt fest, ob der Spaltenspeicher im Speicher komprimiert werden muss// ----------------------
 conf.set("spark.sql.shuffle.partitions","20") //Die Standardpartition ist 200 conf.setAppName("dataframe test")
 val sc = neuer SparkContext(conf)
 val sqc = neuer SQLContext(sc)
 val ac = sc.accumulator(0,"Fehlernums")
 val Datei = sc.textFile("src\\main\\resources\\000000_0")
 val log = file.map(Zeilen => Zeilen.split(" ")).filter(Zeile =>
  if (line.length != 4) { //Führe einen einfachen Filter aus ac.add(1)
  FALSCH
  } sonst wahr)
  .map(Zeile => Mitgliederbasis(Zeile(0).toLong, Zeile(1),Timestamp.valueOf(Zeile(2)), Zeile(3).toInt))
 // Methode 1: Implizite Konvertierung verwenden import sqc.implicits._
 val dftemp = log.toDF() // Konvertierung/*
  Methode 2: Verwenden Sie die Methode createDataFrame, um Felder und ihre Typen mithilfe der internen Reflexion abzurufen: val dftemp = sqc.createDataFrame(log)
  */
 val df = dftemp.registerTempTable("Mitgliedsdatenbankinfo")
 /*val sqlcommand = "Wählen Sie date_format(createtime,'yyyy-MM')als mm,Anzahl(1) als Nums" +
  "aus der Memberbaseinfo-Gruppe nach date_format(createtime,'yyyy-MM')" +
  "Sortieren nach Zahlen absteigend, mm aufsteigend"*/
 val sqlcommand="Wählen Sie * aus Memberbaseinfo"
 val sel = sqc.sql(SQL-Befehl)
 val prop = neue java.util.Properties
 prop.setProperty("Benutzer","etl")
 prop.setProperty("Passwort","xxx")
 //Rufen Sie DataFrameWriter auf, um Daten in MySQL zu schreiben
 val dataResult = sqc.sql(sqlcommand).write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/test","t_spark_dataframe_test",prop) // Die Tabelle existiert möglicherweise nicht println(ac.name.get+" "+ac.value)
 sc.stop()
 }
}

Die Beispieldaten in der Textdatei im obigen Code lauten wie folgt. Die Daten stammen aus Hive. Die Feldinformationen sind Partitionsnummer, Benutzer-ID, Registrierungszeit und Drittanbieternummer.

20160309 45386477 2012-06-12 20:13:15 901438
20160309 45390977 2012-06-12 22:38:06 901036
20160309 45446677 2012-06-14 21:57:39 901438
20160309 45464977 2012-06-15 13:42:55 901438
20160309 45572377 2012-06-18 14:55:03 902606
20160309 45620577 2012-06-20 00:21:09 902606
20160309 45628377 2012-06-20 10:48:05 901181
20160309 45628877 2012-06-20 11:10:15 902606
20160309 45667777 2012-06-21 18:58:34 902524
20160309 45680177 2012-06-22 01:49:55 
20160309 45687077 2012-06-22 11:23:22 902607

Beachten Sie hier die Feldtypzuordnung, also die Zuordnung von Fallklasse zu Datenrahmen, wie im Screenshot von der offiziellen Website gezeigt:

Weitere Einzelheiten finden Sie im offiziellen Dokument Spark SQL and DataFrame Guide.

Das obige Beispiel zur Konvertierung von Spark RDD in einen Dataframe und zum Schreiben in MySQL ist alles, was ich mit Ihnen teilen möchte. Ich hoffe, es kann Ihnen als Referenz dienen. Ich hoffe auch, dass Sie 123WORDPRESS.COM unterstützen werden.

Das könnte Sie auch interessieren:
  • SparkSQL verwendet IDEA für den schnellen Einstieg in DataFrame und DataSet
  • DataFrame: So konvertieren Sie eine Scala-Klasse über SparkSql in einen DataFrame
  • Konvertierungsbeispiele zwischen pyspark.sql.DataFrame und pandas.DataFrame
  • Eine kurze Diskussion über das Missverständnis von DataFrame- und SparkSql-Werten
  • Spark SQL 2.4.8 Zwei Möglichkeiten zum Bedienen von Dataframe

<<:  So führen Sie eine Spring Boot-Anwendung in Docker aus

>>:  So verwenden Sie den Vue-Filter

Artikel empfehlen

CSS-Leistungsoptimierung - detaillierte Erklärung der Will-Change-Verwendung

will-change teilt dem Browser mit, welche Änderun...

JavaScript-Canvas zum Erzielen von Meteoreffekten

In diesem Artikel wird der spezifische Code für J...

Detaillierte Erklärung der Abkürzung von State in React

Vorwort Was ist Staat Wir alle sagen, dass React ...

Hinweise zur Groß-/Kleinschreibung bei MySQL

Inhaltsverzeichnis Die Groß-/Kleinschreibung von ...

Auszeichnungssprache - Titel

Klicken Sie hier, um zum Abschnitt „HTML-Tutorial“...

Gutes Website-Copywriting und gute Benutzererfahrung

Das Betrachten einer Website ist eigentlich wie di...

Beispiel für die Installation und Bereitstellung von Docker unter Linux

Nachdem Sie den folgenden Artikel gelesen haben, ...

Der Prozess der Installation und Konfiguration von Nginx in Win10

1. Einleitung Nginx ist ein kostenloser, quelloff...

js, um den Zahlungs-Countdown zu realisieren und zur Startseite zurückzukehren

Zahlungs-Countdown, um zur Startseite zurückzukeh...