Vorwort Ein Klassenkamerad untersucht die Streaming-Unterstützung von MLSQL Stack. Dann sagte ich, dass die Flow-Debugging-Funktion eigentlich ziemlich schwierig ist. Durch Übung hoffen wir, die folgenden drei Punkte zu erreichen:
Nachdem ich diese drei Punkte umgesetzt hatte, stellte ich fest, dass das Debuggen viel einfacher wurde. Verfahren Zuerst habe ich eine kaf_write.mlsql-Datei erstellt, um das Schreiben von Daten in Kafka zu erleichtern: setze abc=''' { "x": 100, "y": 200, "z": 200 ,"dataType":"A-Gruppe"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B-Gruppe"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B-Gruppe"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B-Gruppe"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B-Gruppe"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B-Gruppe"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B-Gruppe"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B-Gruppe"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B-Gruppe"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B-Gruppe"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B-Gruppe"} '''; lade jsonStr.`abc` als Tabelle1; Wählen Sie to_json(struct(*)) als Wert aus Tabelle1 als Tabelle2; Speichern Sie „Anhängen von Tabelle2“ als „kafka.`wow`“, wobei kafka.bootstrap.servers="127.0.0.1:9092"; Auf diese Weise können die Daten bei jedem Ausführen in Kafka geschrieben werden. Dann muss ich nach dem Schreiben sehen, ob die Daten wirklich eingetragen sind und wie sie aussehen:
Dieser Satz bedeutet, dass ich 10 Kafka-Daten von Kafka abtasten möchte. Die Adresse von Kafka ist 127.0.0.1:9092 und das Thema ist wow. Die laufenden Ergebnisse sind wie folgt: Es gibt kein Problem. Dann habe ich ein sehr einfaches Streaming-Programm geschrieben: -- der Streamname sollte eindeutig sein. setze streamName="streamExample"; -- verwenden Sie kafkaTool, um das Schema aus Kafka abzuleiten !kafkaTool registerSchema 2 Datensätze von „127.0.0.1:9092“ wow; Lade die Optionen von kafka.`wow` kafka.bootstrap.servers="127.0.0.1:9092" als newkafkatable1; Wählen Sie * aus newkafkatable1 als Tabelle21; -- Drucken in der Webkonsole statt in der Terminalkonsole. speichern anhängen table21 als Webkonsole.`` Optionsmodus="Anhängen" und Dauer="15" und checkpointLocation="/tmp/s-cpl4"; Die Ergebnisse sind wie folgt: Wir können den Echtzeiteffekt auch im Terminal sehen. Auffüllen Natürlich bietet MLSQL Stack noch zwei weitere großartige Features für das Streaming. Erstens können Sie HTTP-Protokoll-Callbacks für Streaming-Ereignisse festlegen und Batch-SQL verwenden, um die Streaming-Ergebnisse zu verarbeiten und sie schließlich in der Datenbank zu speichern. Siehe das folgende Skript: -- der Streamname sollte eindeutig sein. setze streamName="streamExample"; - einige Daten verspotten. Daten festlegen=''' {"Schlüssel": "ja", "Wert": "nein", "Thema": "Test", "Partition": 0, "Offset": 0, "Zeitstempel": "2008-01-24 18:01:01.001", "Zeitstempeltyp": 0} {"Schlüssel": "ja", "Wert": "nein", "Thema": "Test", "Partition": 0, "Offset": 1, "Zeitstempel": "2008-01-24 18:01:01.002", "Zeitstempeltyp": 0} {"Schlüssel": "ja", "Wert": "nein", "Thema": "Test", "Partition": 0, "Offset": 2, "Zeitstempel": "2008-01-24 18:01:01.003", "Zeitstempeltyp": 0} {"Schlüssel": "ja", "Wert": "nein", "Thema": "Test", "Partition": 0, "Offset": 3, "Zeitstempel": "2008-01-24 18:01:01.003", "Zeitstempeltyp": 0} {"Schlüssel": "ja", "Wert": "nein", "Thema": "Test", "Partition": 0, "Offset": 4, "Zeitstempel": "2008-01-24 18:01:01.003", "Zeitstempeltyp": 0} {"Schlüssel": "ja", "Wert": "nein", "Thema": "Test", "Partition": 0, "Offset": 5, "Zeitstempel": "2008-01-24 18:01:01.003", "Zeitstempeltyp": 0} '''; -- Daten als Tabelle laden lade jsonStr.`data` als Datenquelle; --convert Tabelle als Streamquelle Optionen für mockStream.`datasource` laden Schrittgrößenbereich="0-3" als newkafkatable1; -- Aggregation Wählen Sie Cast (Wert als Zeichenfolge) als k aus newkafkatable1 als Tabelle21; !callback-Post „http://127.0.0.1:9002/api_v1/test“, wenn „gestartet, Fortschritt, beendet“; -- geben Sie das Ergebnis auf der Konsole aus. speichern anhängen table21 als Benutzerdefiniert.`` Optionenmodus="Anhängen" und Dauer="15" und sourceTable="jack" und Code=''' wähle count(*) als c von Jack als Newjack; Speichern Sie „Newjack“ als Parquet anhängen. `/tmp/jack`; ''' und checkpointLocation="/tmp/cpl15"; Zusammenfassen Das Obige ist der vollständige Inhalt dieses Artikels. Ich hoffe, dass der Inhalt dieses Artikels einen gewissen Lernwert für Ihr Studium oder Ihre Arbeit hat. Vielen Dank für Ihre Unterstützung von 123WORDPRESS.COM. Das könnte Sie auch interessieren:
|
<<: Abrufen der Erstellungszeit einer Datei unter Linux und ein praktisches Tutorial
>>: Verwendung der JavaScript-Sleep-Funktion
Kürzlich stieß ich auf eine Datenbank mit folgend...
Wie wir alle wissen, ist „mailto“ ein sehr praktis...
Inhaltsverzeichnis Vorwort: 1. Kurze Einführung i...
Inhaltsverzeichnis Umsetzungsideen Es gibt drei M...
1. Chinesische Eingabemethode einrichten 2. Stell...
Hintergrund Zu Beginn meines Japanisch-Lernens fi...
Zwei Fälle: 1. Mit Index 2. Ohne Index Voraussetz...
Vor allem aus zwei Gründen: 1. Hervorhebung/Zeile...
bei um + Zeit um 17:23 at> touch /mnt/file{1.....
In diesem Artikelbeispiel wird der spezifische Co...
Problembeschreibung: Der Benutzer hat die Anforde...
Einführung in Textschatten Verwenden Sie in CSS d...
Viele Freunde wollten schon immer wissen, wie man...
1. Erstellen Sie eine Tabelle CREATE TABLE `stude...
Es gibt vier wichtige MySQL-Zeichenfolgenabfangfu...