1. Ursprüngliche NachfrageEs ist erforderlich, die ursprünglichen vollständigen Daten sowie die inkrementellen Daten bestimmter Tabellen in bestimmten MySQL-Bibliotheken in Echtzeit zu synchronisieren, und die entsprechenden Änderungen und Löschungen müssen ebenfalls synchronisiert werden. Die Datensynchronisierung darf nicht aufdringlich sein: Geschäftsabläufe dürfen nicht geändert werden und auf der Geschäftsseite darf kein zu großer Leistungsdruck entstehen. Anwendungsszenarien: ETL-Datensynchronisierung und Reduzierung des Drucks auf Unternehmensserver. 2. Lösung 3. Kanaleinführung und InstallationCanal ist ein Open-Source-Projekt von Alibaba, das in reinem Java entwickelt wurde. Basierend auf der inkrementellen Protokollanalyse der Datenbank ermöglicht es inkrementelles Datenabonnement und -verbrauch und unterstützt derzeit hauptsächlich MySQL (unterstützt auch MariaDB). Funktionsprinzip: Implementierung der MySQL-Master-Slave-Replikation Aus allgemeiner Sicht ist die Replikation in drei Schritte unterteilt:
Wie der Kanal funktioniertDas Prinzip ist relativ einfach:
Architekturveranschaulichen:
Instanzmodul:
Installieren1. Vorbereitung der MySQL- und Kafka-Umgebung 2. Laden Sie den Kanal herunter: wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz 3. Entpacken: tar -zxvf canal.deployer-1.1.3.tar.gz 4. Konfigurieren Sie die Dateiparameter im Verzeichnis conf Konfigurieren Sie canal.properties: Geben Sie conf/example ein und konfigurieren Sie instance.properties: 5. Starten Sie: bin/startup.sh 6. Protokollanzeige: 4. Überprüfung1. Entwickeln Sie den entsprechenden Kafka-Consumer Paket org.kafka; importiere java.util.Arrays; importiere java.util.Properties; importiere org.apache.kafka.clients.consumer.ConsumerRecord; importiere org.apache.kafka.clients.consumer.ConsumerRecords; importiere org.apache.kafka.clients.consumer.KafkaConsumer; importiere org.apache.kafka.common.serialization.StringDeserializer; /** * * Titel: KafkaConsumerTest * Beschreibung: * Kafka-Consumer-Demo * Version: 1.0.0 * @Autor pancm * @date 26. Januar 2018 */ öffentliche Klasse KafkaConsumerTest implementiert Runnable { privater finaler KafkaConsumer<String, String>-Verbraucher; private ConsumerRecords<String, String> msgList; privates endgültiges String-Thema; private statische endgültige Zeichenfolge GROUPID = "GruppeA"; öffentlicher KafkaConsumerTest(String topicName) { Eigenschaften-Eigenschaften = neue Eigenschaften(); props.put("bootstrap.servers", "192.168.7.193:9092"); props.put("group.id", GRUPPENRID); props.put("aktivieren.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("Sitzung.Timeout.ms", "30000"); props.put("auto.offset.reset", "latest"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("Wert.deserializer", StringDeserializer.class.getName()); this.consumer = neuer KafkaConsumer<String, String>(Eigenschaften); dieses.Thema = Themenname; this.consumer.subscribe(Arrays.asList(Thema)); } @Überschreiben öffentliche Leere ausführen() { int Nachrichtennummer = 1; System.out.println("---------Verbrauch starten---------"); versuchen { für (; ; ) { msgList = Verbraucher.Umfrage(1000); if (null != msgList && msgList.count() > 0) { für (ConsumerRecord<String, String> Datensatz : msgList) { //Drucken, nachdem 100 Datensätze verbraucht wurden, aber die gedruckten Daten folgen möglicherweise nicht diesem Muster System.out.println(messageNo + "=======receive: key = " + record.key() + ", value = " + record.value() + " offset===" + record.offset()); // Zeichenfolge v = decodeUnicode(Datensatz.Wert()); // System.out.println(v); // Beenden, wenn 1000 Nachrichten verbraucht sind, if (messageNo % 1000 == 0) { brechen; } NachrichtNr++; } } anders { Thread.sleep(11); } } } Fang (UnterbrocheneAusnahme e) { e.printStackTrace(); Endlich Verbraucher.schließen(); } } öffentliche statische void main(String args[]) { KafkaConsumerTest test1 = neuer KafkaConsumerTest("Beispieldaten"); Thread thread1 = neuer Thread(test1); thread1.start(); } /* * Chinesisch in Unicode konvertieren*/ öffentliche statische Zeichenfolge gbEncoding(finale Zeichenfolge gbString) { char[] utfBytes = gbString.toCharArray(); Zeichenfolge unicodeBytes = ""; für (int i = 0; i < utfBytes.Länge; i++) { String hexB = Integer.toHexString(utfBytes[i]); wenn (hexB.Länge() <= 2) { hexB = "00" + hexB; } unicodeBytes = unicodeBytes + "\\u" + hexB; } UnicodeBytes zurückgeben; } /* * Unicode-Kodierung ins Chinesische*/ öffentlicher statischer String decodeUnicode(finaler String dataStr) { Geben Sie den Startwert ein. int Ende = 0; endgültiger StringBuffer-Puffer = neuer StringBuffer(); während (Start > -1) { Ende = dataStr.indexOf("\\u", Start + 2); Zeichenfolge charStr = ""; wenn (Ende == -1) { charStr = dataStr.substring(start + 2, dataStr.length()); } anders { charStr = dataStr.substring(start + 2, end); } char letter = (char) Integer.parseInt(charStr, 16); // Hexadezimale Ganzzahlzeichenfolge analysieren. Puffer.anhängen(neues Zeichen(Buchstabe).toString()); Anfang = Ende; } gibt buffer.toString() zurück; } } 2. Daten zur Tabelle bak1 hinzufügen TABELLE ERSTELLEN `bak1` ( `vin` varchar(20) NICHT NULL, `p1` doppelter STANDARD NULL, `p2` doppelter STANDARD NULL, `p3` doppelter STANDARD NULL, `p4` doppelter DEFAULT NULL, `p5` doppelter DEFAULT NULL, `p6` doppelter DEFAULT NULL, `p7` doppelter DEFAULT NULL, `p8` doppelter DEFAULT NULL, `p9` doppelter DEFAULT NULL, `p0` doppelter STANDARD NULL ) ENGINE=InnoDB STANDARD-CHARSET=utf8mb4 zeige, erstelle Tabelle bak1; in bak1 einfügen, wähle '李雷abcv', `p1`, `p2`, `p3`, `p4`, `p5`, `p6`, `p7`, `p8`, `p9`, `p0` von Moci-Grenze 10 3. Zeigen Sie die Ausgabeergebnisse an: Damit ist dieser Artikel über die Synchronisierung der vollständigen und inkrementellen Daten einer bestimmten MySQL-Tabelle mit einer Nachrichtenwarteschlange abgeschlossen – Lösung. Weitere Informationen zur Synchronisierung von Daten in einer bestimmten MySQL-Tabelle finden Sie in früheren Artikeln auf 123WORDPRESS.COM oder in den folgenden verwandten Artikeln. Ich hoffe, Sie werden 123WORDPRESS.COM auch in Zukunft unterstützen! Das könnte Sie auch interessieren:
|
<<: Eine detaillierte Einführung zum Einrichten von Jenkins auf Tencent Cloud Server
>>: HTML-Code zum Hinzufügen eines Mengenabzeichens zur Nachrichtenschaltfläche
1. Problembeschreibung <br />Wenn JS verwen...
Inhaltsverzeichnis Erstellen Sie ein Docker-Image...
Vorwort Immer noch in Bezug auf das zuvor erwähnt...
Ich habe mich beim Backend angemeldet, um die Lös...
Details zur MySQL-Triggersyntax: Ein Trigger ist ...
Implementieren Sie das Vergrößern und Verkleinern...
In diesem Artikelbeispiel wird der spezifische Co...
Inhaltsverzeichnis Vorwort NULL in MySQL 2 NULL b...
In diesem Artikel wird der spezifische Code von U...
1. Installation und Konfiguration des Apache-Serv...
Installieren Sie JDK: Offizieller Oracle-Download...
Inhaltsverzeichnis Vorwort Ursache Phänomen warum...
Dieser Artikel stellt hauptsächlich den Installati...
Inhaltsverzeichnis Einführung Traditionelle Überg...
BMP ist ein von Hardwaregeräten unabhängiges und ...