Vorwort Ein Klassenkamerad untersucht die Streaming-Unterstützung von MLSQL Stack. Dann sagte ich, dass die Flow-Debugging-Funktion eigentlich ziemlich schwierig ist. Durch Übung hoffen wir, die folgenden drei Punkte zu erreichen:
Nachdem ich diese drei Punkte umgesetzt hatte, stellte ich fest, dass das Debuggen viel einfacher wurde. Verfahren Zuerst habe ich eine kaf_write.mlsql-Datei erstellt, um das Schreiben von Daten in Kafka zu erleichtern: setze abc=''' { "x": 100, "y": 200, "z": 200 ,"dataType":"A-Gruppe"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B-Gruppe"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B-Gruppe"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B-Gruppe"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B-Gruppe"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B-Gruppe"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B-Gruppe"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B-Gruppe"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B-Gruppe"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B-Gruppe"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B-Gruppe"} '''; lade jsonStr.`abc` als Tabelle1; Wählen Sie to_json(struct(*)) als Wert aus Tabelle1 als Tabelle2; Speichern Sie „Anhängen von Tabelle2“ als „kafka.`wow`“, wobei kafka.bootstrap.servers="127.0.0.1:9092"; Auf diese Weise können die Daten bei jedem Ausführen in Kafka geschrieben werden. Dann muss ich nach dem Schreiben sehen, ob die Daten wirklich eingetragen sind und wie sie aussehen:
Dieser Satz bedeutet, dass ich 10 Kafka-Daten von Kafka abtasten möchte. Die Adresse von Kafka ist 127.0.0.1:9092 und das Thema ist wow. Die laufenden Ergebnisse sind wie folgt: Es gibt kein Problem. Dann habe ich ein sehr einfaches Streaming-Programm geschrieben: -- der Streamname sollte eindeutig sein. setze streamName="streamExample"; -- verwenden Sie kafkaTool, um das Schema aus Kafka abzuleiten !kafkaTool registerSchema 2 Datensätze von „127.0.0.1:9092“ wow; Lade die Optionen von kafka.`wow` kafka.bootstrap.servers="127.0.0.1:9092" als newkafkatable1; Wählen Sie * aus newkafkatable1 als Tabelle21; -- Drucken in der Webkonsole statt in der Terminalkonsole. speichern anhängen table21 als Webkonsole.`` Optionsmodus="Anhängen" und Dauer="15" und checkpointLocation="/tmp/s-cpl4"; Die Ergebnisse sind wie folgt: Wir können den Echtzeiteffekt auch im Terminal sehen. Auffüllen Natürlich bietet MLSQL Stack noch zwei weitere großartige Features für das Streaming. Erstens können Sie HTTP-Protokoll-Callbacks für Streaming-Ereignisse festlegen und Batch-SQL verwenden, um die Streaming-Ergebnisse zu verarbeiten und sie schließlich in der Datenbank zu speichern. Siehe das folgende Skript: -- der Streamname sollte eindeutig sein. setze streamName="streamExample"; - einige Daten verspotten. Daten festlegen=''' {"Schlüssel": "ja", "Wert": "nein", "Thema": "Test", "Partition": 0, "Offset": 0, "Zeitstempel": "2008-01-24 18:01:01.001", "Zeitstempeltyp": 0} {"Schlüssel": "ja", "Wert": "nein", "Thema": "Test", "Partition": 0, "Offset": 1, "Zeitstempel": "2008-01-24 18:01:01.002", "Zeitstempeltyp": 0} {"Schlüssel": "ja", "Wert": "nein", "Thema": "Test", "Partition": 0, "Offset": 2, "Zeitstempel": "2008-01-24 18:01:01.003", "Zeitstempeltyp": 0} {"Schlüssel": "ja", "Wert": "nein", "Thema": "Test", "Partition": 0, "Offset": 3, "Zeitstempel": "2008-01-24 18:01:01.003", "Zeitstempeltyp": 0} {"Schlüssel": "ja", "Wert": "nein", "Thema": "Test", "Partition": 0, "Offset": 4, "Zeitstempel": "2008-01-24 18:01:01.003", "Zeitstempeltyp": 0} {"Schlüssel": "ja", "Wert": "nein", "Thema": "Test", "Partition": 0, "Offset": 5, "Zeitstempel": "2008-01-24 18:01:01.003", "Zeitstempeltyp": 0} '''; -- Daten als Tabelle laden lade jsonStr.`data` als Datenquelle; --convert Tabelle als Streamquelle Optionen für mockStream.`datasource` laden Schrittgrößenbereich="0-3" als newkafkatable1; -- Aggregation Wählen Sie Cast (Wert als Zeichenfolge) als k aus newkafkatable1 als Tabelle21; !callback-Post „http://127.0.0.1:9002/api_v1/test“, wenn „gestartet, Fortschritt, beendet“; -- geben Sie das Ergebnis auf der Konsole aus. speichern anhängen table21 als Benutzerdefiniert.`` Optionenmodus="Anhängen" und Dauer="15" und sourceTable="jack" und Code=''' wähle count(*) als c von Jack als Newjack; Speichern Sie „Newjack“ als Parquet anhängen. `/tmp/jack`; ''' und checkpointLocation="/tmp/cpl15"; Zusammenfassen Das Obige ist der vollständige Inhalt dieses Artikels. Ich hoffe, dass der Inhalt dieses Artikels einen gewissen Lernwert für Ihr Studium oder Ihre Arbeit hat. Vielen Dank für Ihre Unterstützung von 123WORDPRESS.COM. Das könnte Sie auch interessieren:
|
<<: Abrufen der Erstellungszeit einer Datei unter Linux und ein praktisches Tutorial
>>: Verwendung der JavaScript-Sleep-Funktion
Geschäftsszenario: Verwenden Sie den EL-Dialog vo...
Was ist das Lieferantenpräfix? Anbieterpräfix – B...
Zum Beispiel: Code kopieren Der Code lautet wie fo...
Inhaltsverzeichnis Die Knotenversion stimmt nicht...
Verwenden Sie den folgenden Terminalbefehl, um de...
(I) Methode 1: Vorab direkt im Skript-Tag definie...
Melden Sie sich zuerst bei MySQL an Shell> MyS...
MySQL ist eine kostenlose relationale Datenbank m...
In diesem Artikelbeispiel wird der spezifische Co...
Inhaltsverzeichnis Tomcat mit Docker installieren...
Apollo Open Source-Adresse: https://github.com/ct...
Die Methode zur Lösung des Problems, das anfängli...
In diesem Artikelbeispiel wird der spezifische Co...
Problembeschreibung: Wenn die Anzahl der asynchro...
/******************** * Zeichengerätetreiber*****...