Die vollständigen und inkrementellen Daten einer bestimmten MySQL-Tabelle mit der Nachrichtenwarteschlange synchronisieren - Lösung

Die vollständigen und inkrementellen Daten einer bestimmten MySQL-Tabelle mit der Nachrichtenwarteschlange synchronisieren - Lösung

1. Ursprüngliche Nachfrage

Es ist erforderlich, die ursprünglichen vollständigen Daten sowie die inkrementellen Daten bestimmter Tabellen in bestimmten MySQL-Bibliotheken in Echtzeit zu synchronisieren, und die entsprechenden Änderungen und Löschungen müssen ebenfalls synchronisiert werden.

Die Datensynchronisierung darf nicht aufdringlich sein: Geschäftsabläufe dürfen nicht geändert werden und auf der Geschäftsseite darf kein zu großer Leistungsdruck entstehen.

Anwendungsszenarien: ETL-Datensynchronisierung und Reduzierung des Drucks auf Unternehmensserver.

2. Lösung

3. Kanaleinführung und Installation

Canal ist ein Open-Source-Projekt von Alibaba, das in reinem Java entwickelt wurde. Basierend auf der inkrementellen Protokollanalyse der Datenbank ermöglicht es inkrementelles Datenabonnement und -verbrauch und unterstützt derzeit hauptsächlich MySQL (unterstützt auch MariaDB).

Funktionsprinzip: Implementierung der MySQL-Master-Slave-Replikation

Aus allgemeiner Sicht ist die Replikation in drei Schritte unterteilt:

  1. Der Master zeichnet die Änderungen im Binärprotokoll auf (diese Aufzeichnungen werden als Binärprotokollereignisse bezeichnet und können mit „show binlog events“ angezeigt werden).
  2. Der Slave kopiert die Binärprotokollereignisse des Masters in sein Relay-Protokoll.
  3. Der Slave wiederholt die Ereignisse im Relay-Protokoll und ändert die Daten, sodass sie seine eigenen widerspiegeln.

Wie der Kanal funktioniert

Das Prinzip ist relativ einfach:

  1. Canal simuliert das interaktive Protokoll des MySQL-Slaves, gibt sich als MySQL-Slave aus und sendet das Dump-Protokoll an den MySQL-Master
  2. Der MySQL-Master empfängt die Dump-Anforderung und beginnt mit dem Weiterleiten des Binärprotokolls an den Slave (Kanal).
  3. Canal analysiert binäre Log-Objekte (ursprünglich Byte-Streams)

Architektur

veranschaulichen:

  • Server stellt eine Kanallaufinstanz dar, die einer JVM entspricht
  • Eine Instanz entspricht einer Datenwarteschlange (ein Server entspricht 1..n Instanzen)

Instanzmodul:

  • eventParser (Datenquellenzugriff, Simulation des Slave-Protokolls und der Master-Interaktion, Protokollanalyse)
  • eventSink (Parser- und Store-Connector, führt Datenfilterung, -verarbeitung und -verteilung durch)
  • eventStore (Datenspeicher)
  • metaManager (inkrementeller Abonnement- und Verbrauchsinformationsmanager)

Installieren

1. Vorbereitung der MySQL- und Kafka-Umgebung

2. Laden Sie den Kanal herunter: wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz

3. Entpacken: tar -zxvf canal.deployer-1.1.3.tar.gz

4. Konfigurieren Sie die Dateiparameter im Verzeichnis conf

Konfigurieren Sie canal.properties:

Geben Sie conf/example ein und konfigurieren Sie instance.properties:

5. Starten Sie: bin/startup.sh

6. Protokollanzeige:

4. Überprüfung

1. Entwickeln Sie den entsprechenden Kafka-Consumer

Paket org.kafka;

importiere java.util.Arrays;
importiere java.util.Properties;
importiere org.apache.kafka.clients.consumer.ConsumerRecord;
importiere org.apache.kafka.clients.consumer.ConsumerRecords;
importiere org.apache.kafka.clients.consumer.KafkaConsumer;
importiere org.apache.kafka.common.serialization.StringDeserializer;


/**
 *
 * Titel: KafkaConsumerTest
 * Beschreibung:
 * Kafka-Consumer-Demo
 * Version: 1.0.0
 * @Autor pancm
 * @date 26. Januar 2018 */
öffentliche Klasse KafkaConsumerTest implementiert Runnable {

    privater finaler KafkaConsumer<String, String>-Verbraucher;
    private ConsumerRecords<String, String> msgList;
    privates endgültiges String-Thema;
    private statische endgültige Zeichenfolge GROUPID = "GruppeA";

    öffentlicher KafkaConsumerTest(String topicName) {
        Eigenschaften-Eigenschaften = neue Eigenschaften();
        props.put("bootstrap.servers", "192.168.7.193:9092");
        props.put("group.id", GRUPPENRID);
        props.put("aktivieren.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("Sitzung.Timeout.ms", "30000");
        props.put("auto.offset.reset", "latest");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("Wert.deserializer", StringDeserializer.class.getName());
        this.consumer = neuer KafkaConsumer<String, String>(Eigenschaften);
        dieses.Thema = Themenname;
        this.consumer.subscribe(Arrays.asList(Thema));
    }

    @Überschreiben
    öffentliche Leere ausführen() {
        int Nachrichtennummer = 1;
        System.out.println("---------Verbrauch starten---------");
        versuchen {
            für (; ; ) {
                msgList = Verbraucher.Umfrage(1000);
                if (null != msgList && msgList.count() > 0) {
                    für (ConsumerRecord<String, String> Datensatz : msgList) {
                        //Drucken, nachdem 100 Datensätze verbraucht wurden, aber die gedruckten Daten folgen möglicherweise nicht diesem Muster System.out.println(messageNo + "=======receive: key = " + record.key() + ", value = " + record.value() + " offset===" + record.offset());


// Zeichenfolge v = decodeUnicode(Datensatz.Wert());

// System.out.println(v);

                        // Beenden, wenn 1000 Nachrichten verbraucht sind, if (messageNo % 1000 == 0) {
                            brechen;
                        }
                        NachrichtNr++;
                    }
                } anders {
                    Thread.sleep(11);
                }
            }
        } Fang (UnterbrocheneAusnahme e) {
            e.printStackTrace();
        Endlich
            Verbraucher.schließen();
        }
    }

    öffentliche statische void main(String args[]) {
        KafkaConsumerTest test1 = neuer KafkaConsumerTest("Beispieldaten");
        Thread thread1 = neuer Thread(test1);
        thread1.start();
    }


    /*
     * Chinesisch in Unicode konvertieren*/
    öffentliche statische Zeichenfolge gbEncoding(finale Zeichenfolge gbString) {
        char[] utfBytes = gbString.toCharArray();
        Zeichenfolge unicodeBytes = "";
        für (int i = 0; i < utfBytes.Länge; i++) {
            String hexB = Integer.toHexString(utfBytes[i]);
            wenn (hexB.Länge() <= 2) {
                hexB = "00" + hexB;
            }
            unicodeBytes = unicodeBytes + "\\u" + hexB;
        }
        UnicodeBytes zurückgeben;
    }

    /*
     * Unicode-Kodierung ins Chinesische*/
    öffentlicher statischer String decodeUnicode(finaler String dataStr) {
        Geben Sie den Startwert ein.
        int Ende = 0;
        endgültiger StringBuffer-Puffer = neuer StringBuffer();
        während (Start > -1) {
            Ende = dataStr.indexOf("\\u", Start + 2);
            Zeichenfolge charStr = "";
            wenn (Ende == -1) {
                charStr = dataStr.substring(start + 2, dataStr.length());
            } anders {
                charStr = dataStr.substring(start + 2, end);
            }
            char letter = (char) Integer.parseInt(charStr, 16); // Hexadezimale Ganzzahlzeichenfolge analysieren.
            Puffer.anhängen(neues Zeichen(Buchstabe).toString());
            Anfang = Ende;
        }
        gibt buffer.toString() zurück;

    }
}

2. Daten zur Tabelle bak1 hinzufügen

TABELLE ERSTELLEN `bak1` (
  `vin` varchar(20) NICHT NULL,
  `p1` doppelter STANDARD NULL,
  `p2` doppelter STANDARD NULL,
  `p3` doppelter STANDARD NULL,
  `p4` doppelter DEFAULT NULL,
  `p5` doppelter DEFAULT NULL,
  `p6` doppelter DEFAULT NULL,
  `p7` doppelter DEFAULT NULL,
  `p8` doppelter DEFAULT NULL,
  `p9` doppelter DEFAULT NULL,
  `p0` doppelter STANDARD NULL
) ENGINE=InnoDB STANDARD-CHARSET=utf8mb4

zeige, erstelle Tabelle bak1;

in bak1 einfügen, wähle '李雷abcv',
  `p1`,
  `p2`,
  `p3`,
  `p4`,
  `p5`,
  `p6`,
  `p7`,
  `p8`,
  `p9`,
  `p0` von Moci-Grenze 10

3. Zeigen Sie die Ausgabeergebnisse an:

Damit ist dieser Artikel über die Synchronisierung der vollständigen und inkrementellen Daten einer bestimmten MySQL-Tabelle mit einer Nachrichtenwarteschlange abgeschlossen – Lösung. Weitere Informationen zur Synchronisierung von Daten in einer bestimmten MySQL-Tabelle finden Sie in früheren Artikeln auf 123WORDPRESS.COM oder in den folgenden verwandten Artikeln. Ich hoffe, Sie werden 123WORDPRESS.COM auch in Zukunft unterstützen!

Das könnte Sie auch interessieren:
  • Detaillierte Erklärung zur Synchronisierung von Daten von MySQL mit Elasticsearch
  • Tutorial zum Synchronisieren von MySQL-Daten mit ElasticSearch mithilfe von Python
  • Schritte zum Synchronisieren von MongoDB-Daten mit MySQL mithilfe von node.js
  • MySQL5.6 Master-Slave-Replikation (MySQL-Datensynchronisierungskonfiguration)
  • Detaillierte Erläuterung zur Reduzierung der MySQL Master-Slave-Datensynchronisationsverzögerung
  • MySQL-Trigger zum Synchronisieren von Daten zwischen zwei Tabellen
  • Zusammenfassung der Lösungen für das Problem Slave_IO_Running:No bei der MySQL-Datensynchronisierung
  • Methode zur Synchronisierung von MySQL-Sicherungs- und Migrationsdaten
  • Konfigurationsmethode für die MYSQL5-Masterslave-Datensynchronisation
  • So synchronisieren Sie Mysql-Daten

<<:  Eine detaillierte Einführung zum Einrichten von Jenkins auf Tencent Cloud Server

>>:  HTML-Code zum Hinzufügen eines Mengenabzeichens zur Nachrichtenschaltfläche

Artikel empfehlen

Ein Artikel, der Ihnen hilft, mehr über JavaScript-Arrays zu erfahren

Inhaltsverzeichnis 1. Die Rolle des Arrays: 2. De...

VUE+Canvas implementiert das Spiel God of Wealth und erhält Barren

Willkommen zur vorherigen Canvas-Spielserie: 《VUE...

Wie stelle ich Tomcat als automatisch gestarteten Dienst ein? Der schnellste Weg

Stellen Sie Tomcat so ein, dass der Dienst automa...

Befehl zum Anzeigen der Erstellungszeit der Binlog-Datei unter Linux

Inhaltsverzeichnis Hintergrund analysieren Verfah...

Zusammenfassung der Wissenspunkte zu den Linux-Befehlen ps und pstree

Der ps-Befehl in Linux ist die Abkürzung für „Pro...

Parameter, um Iframe transparent zu machen

<iframe src="./ads_top_tian.html" all...

Lösung für die hohe CPU-Auslastung des Tomcat-Prozesses

Inhaltsverzeichnis Fall Kontextwechsel-Overhead? ...

Wie gestaltet man eine Webseite? Wie erstelle ich eine Webseite?

Wenn es um das Verständnis von Webdesign geht, sc...

Detaillierte Erklärung der Gründe, warum MySQL-Verbindungen hängen bleiben

Inhaltsverzeichnis 1. Hintergrund Architektur Pro...

Beispiele für die korrekte Verwendung von Karten in WeChat-Miniprogrammen

Inhaltsverzeichnis Vorwort 1. Vorbereitung 2. Tat...