Schritte zum Aktivieren des MySQL-Datenbanküberwachungs-Binlogs

Schritte zum Aktivieren des MySQL-Datenbanküberwachungs-Binlogs

Vorwort

Wir müssen häufig etwas basierend auf bestimmten Vorgängen tun, die Benutzer mit ihren eigenen Daten durchführen.

Wenn ein Benutzer beispielsweise sein Konto löscht, senden wir ihm eine Textnachricht, um ihn zu tadeln und ihn anzuflehen, zurückzukommen.

Eine ähnliche Funktion kann sicherlich in der Geschäftslogikschicht implementiert werden, und dieser Vorgang kann nach Erhalt der Löschanforderung des Benutzers ausgeführt werden, aber das Datenbank-Binlog bietet uns eine andere Vorgangsmethode.

Um das Binärprotokoll zu überwachen, sind zwei Schritte erforderlich. Der erste Schritt besteht natürlich darin, diese Funktion in Ihrem MySQL zu aktivieren, und der zweite Schritt besteht darin, ein Programm zum Lesen des Protokolls zu schreiben.

MySQL aktiviert Binlog.

Zunächst einmal ist das Binärprotokoll von MySQL normalerweise nicht geöffnet. Daher benötigen wir:

Suchen Sie die MySQL-Konfigurationsdatei my.cnf. Der Speicherort kann je nach Betriebssystem unterschiedlich sein. Sie können es selbst finden.

Fügen Sie den folgenden Inhalt hinzu:

[mysqld]
Server-ID = 1
log-bin = mysql-bin
Binlog-Format = ROW

Starten Sie dann MySQL neu.

/ubuntu
Dienst MySQL Neustart
// Mac
mysql.server neu starten

Überwachen Sie, ob die Aktivierung erfolgreich war

Geben Sie die MySQL-Befehlszeile ein und führen Sie Folgendes aus:

Variablen wie „%log_bin%“ anzeigen;

Wenn das Ergebnis wie unten dargestellt aussieht, war der Vorgang erfolgreich:


Zeigen Sie den Status des geschriebenen Binärprotokolls an:


Code zum Lesen des Binärprotokolls

Einführung von Abhängigkeiten

Wir verwenden einige Open-Source-Implementierungen. Aus seltsamen Gründen habe ich das Paket mysql-binlog-connector-java (offizielles GitHub-Repository) ausgewählt [github.com/shyiko/mysq…]. Die spezifischen Abhängigkeiten sind wie folgt:

<!-- https://mvnrepository.com/artifact/com.github.shyiko/mysql-binlog-connector-java -->
 <Abhängigkeit>
 <groupId>com.github.shyiko</groupId>
 <artifactId>mysql-binlog-connector-java</artifactId>
 <version>0.17.0</version>
 </Abhängigkeit>

Natürlich gibt es viele Open-Source-Implementierungen für die Binlog-Verarbeitung. Alibabas „cancl“ ist eine davon und Sie können sie auch verwenden.

Schreiben Sie eine Demo

Schreiben wir gemäß der Readme-Datei im offiziellen Repository einfach eine Demo.

 öffentliche statische void main(String[] args) {
 BinaryLogClient-Client = neuer BinaryLogClient("Hostname", 3306, "Benutzername", "Passwort");
 EventDeserializer eventDeserializer = neuer EventDeserializer();
 eventDeserializer.setCompatibilityMode(
 EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
 EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
 );
 client.setEventDeserializer(eventDeserializer);
 client.registerEventListener(neuer BinaryLogClient.EventListener() {

 @Überschreiben
 öffentliche void beiEreignis(Ereignisereignis) {
 //ZU TUN
 tuetwas();
 logger.info(Ereignis.toString());
 }
 });
 client.verbinden();
 }

Dies ist vollständig gemäß dem offiziellen Tutorial geschrieben. Sie können Ihre eigene Geschäftslogik in onEvent schreiben. Da ich nur teste, drucke ich jedes darin enthaltene Ereignis aus.

Danach habe ich mich manuell bei MySQL angemeldet und die entsprechenden Hinzufügungs-, Änderungs- und Löschvorgänge ausgeführt. Die überwachten Protokolle waren wie folgt:

00:23:13.331 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=0, eventType=ROTATE, serverId=1, headerLength=19, dataLength=28, nextPosition=0, flags=32}, data=RotateEventData{binlogFilename='mysql-bin.000001', binlogPosition=886}}
00:23:13.334 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468403000, eventType=FORMAT_DESCRIPTION, serverId=1, headerLength=19, dataLength=100, nextPosition=0, flags=0}, data=FormatDescriptionEventData{binlogVersion=4, serverVersion='5.7.23-0ubuntu0.16.04.1-log', headerLength=19, dataLength=95}}
00:23:23.715 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=951, flags=0}, data=null}
00:23:23.716 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=QUERY, serverId=1, headerLength=19, dataLength=51, nextPosition=1021, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='pf', sql='BEGIN'}}
00:23:23.721 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=32, nextPosition=1072, flags=0}, data=TableMapEventData{tableId=108, database='pf', table='student', columnTypes=15, 3, columnMetadata=135, 0, columnNullability={}}}
00:23:23.724 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=EXT_WRITE_ROWS, serverId=1, headerLength=19, dataLength=23, nextPosition=1114, flags=0}, data=WriteRowsEventData{tableId=108, includedColumns={0, 1}, rows=[
[[B@546a03af, 2]
]}}
00:23:23.725 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1145, flags=0}, data=XidEventData{xid=28}}
00:23:55.872 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=1210, flags=0}, data=null}
00:23:55.872 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=QUERY, serverId=1, headerLength=19, dataLength=51, nextPosition=1280, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='pf', sql='BEGIN'}}
00:23:55.873 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=32, nextPosition=1331, flags=0}, data=TableMapEventData{tableId=108, database='pf', table='student', columnTypes=15, 3, columnMetadata=135, 0, columnNullability={}}}
00:23:55.875 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=EXT_UPDATE_ROWS, serverId=1, headerLength=19, dataLength=31, nextPosition=1381, flags=0}, data=UpdateRowsEventData{tableId=108, includedColumnsBeforeUpdate={0, 1}, includedColumns={0, 1}, rows=[
{vorher=[[B@6833ce2c, 1], nachher=[[B@725bef66, 3]}
]}}
00:23:55.875 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1412, flags=0}, data=XidEventData{xid=41}}
00:24:22.333 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=1477, flags=0}, data=null}
00:24:22.334 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=QUERY, serverId=1, headerLength=19, dataLength=51, nextPosition=1547, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='pf', sql='BEGIN'}}
00:24:22.334 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=32, nextPosition=1598, flags=0}, data=TableMapEventData{tableId=108, database='pf', table='student', columnTypes=15, 3, columnMetadata=135, 0, columnNullability={}}}
00:24:22.335 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=EXT_DELETE_ROWS, serverId=1, headerLength=19, dataLength=23, nextPosition=1640, flags=0}, data=DeleteRowsEventData{tableId=108, includedColumns={0, 1}, rows=[
[[B@1888ff2c, 3]
]}}
00:24:22.335 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1671, flags=0}, data=XidEventData{xid=42}}

Erstellen Sie eine bessere und individuellere Werkzeugklasse entsprechend Ihrem eigenen Unternehmen

Ich wollte den Code zunächst veröffentlichen, aber als der Code immer länger wurde, beschloss ich, ihn auf GitHub hochzuladen. Hier werde ich nur einen Teil der Implementierung veröffentlichen. Code-Transferportal

Umsetzungsideen

  1. Unterstützt die Überwachung einer einzelnen Tabelle, da wir nicht wirklich alle Datentabellen in allen Datenbanken überwachen möchten.
  2. Kann von mehreren Threads verwendet werden.
  3. Konvertieren Sie den überwachten Inhalt in eine Form, die uns gefällt (die Datenstruktur im Artikel ist möglicherweise nicht besonders gut, mir fällt keine passendere ein).

Die Umsetzungsideen lauten also in etwa wie folgt:

  1. Kapseln Sie einen Client ein, stellen Sie nach außen nur die Erfassungsmethode bereit und schirmen Sie den Code für die Initialisierungsdetails ab.
  2. Stellt eine Methode zum Registrieren eines Listeners (Pseudo) bereit, die einen Listener für eine Tabelle registrieren kann (definieren Sie eine Listener-Schnittstelle neu, und alle registrierten Listener müssen diese nur implementieren).
  3. Der einzige echte Listener ist der Client, der alle Operationen auf dieser Datenbankinstanz abhört, sie in das gewünschte LogItem-Format konvertiert und in die Sperrwarteschlange stellt.
  4. Starten Sie mehrere Threads, nutzen Sie blockierende Warteschlangen, rufen Sie den entsprechenden Datentabellen-Listener für ein LogItem auf und führen Sie einige Geschäftslogiken aus.

Initialisierungscode:

 öffentliche MysqlBinLogListener(Conf conf) {
 BinaryLogClient-Client = neuer BinaryLogClient (conf.host, conf.port, conf.username, conf.passwd);
 EventDeserializer eventDeserializer = neuer EventDeserializer();
 eventDeserializer.setCompatibilityMode(
 EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
 EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
 );
 client.setEventDeserializer(eventDeserializer);
 dies.parseClient = Client;
 diese.Warteschlange = neue ArrayBlockingQueue<>(1024);
 diese.conf = conf;
 Listener = neue ConcurrentHashMap<>();
 dbTableCols = neue ConcurrentHashMap<>();
 this.consumer = Executors.newFixedThreadPool(consumerThreads);
 }

Registrierungscode:

 public void regListener(String db, String table, BinLogListener listener) throws Exception {
 : Zeichenfolge dbTable = getdbTable(db, Tabelle);
 Klasse.fürName("com.mysql.jdbc.Driver");
 // Speichern Sie die Spalteninformationen der aktuell registrierten Tabelle Connection connection = DriverManager.getConnection("jdbc:mysql://" + conf.host + ":" + conf.port, conf.username, conf.passwd);
 Map<String, Colum> cols = getColMap(Verbindung, Datenbank, Tabelle);
 dbTableCols.put(dbTable, cols);

 //Speichern Sie den aktuell registrierten Listener
 Liste<BinLogListener> Liste = Listener.getOrDefault(dbTable, neue ArrayList<>());
 Liste.Hinzufügen(Listener);
 listeners.put(dbTable, Liste);
 }

In diesem Schritt erhalten wir die Schemainformationen der Tabelle, während wir den Listener registrieren, und speichern sie in der Karte, um die nachfolgende Datenverarbeitung zu erleichtern.

Abhörcode:

 @Überschreiben
 öffentliche void beiEreignis(Ereignisereignis) {
 Ereignistyp Ereignistyp = Ereignis.getHeader().getEventType();

 wenn (Ereignistyp == Ereignistyp.TABLE_MAP) {
 TableMapEventData tableData = event.getData();
 : String db = tableData.getDatabase();
 Zeichenfolgetabelle = tableData.getTable();
 dbTable = getdbTable(db, Tabelle);
 }

 // Behandeln Sie nur die drei Vorgänge Hinzufügen, Löschen und Aktualisieren, wenn (isWrite(eventType) || isUpdate(eventType) || isDelete(eventType)) {
 wenn (istSchreiben(Ereignistyp)) {
 WriteRowsEventData-Daten = event.getData();
 für (Serializable[] Zeile: data.getRows()) {
  wenn (dbTableCols.containsKey(dbTable)) {
  LogItem e = LogItem.itemFromInsert(row, dbTableCols.get(dbTable));
  e.setDbTable(dbTable);
  Warteschlange hinzufügen(e);
  }
 }
 }
 }
 }

Ich bin faul,,, hier implementiere ich nur die Verarbeitung der Add-Operation und habe keine anderen Operationen geschrieben.

Verbrauchscode:

 public void parse() wirft IOException {
 parseClient.registerEventListener(dies);

 für (int i = 0; i < consumerThreads; i++) {
 Verbraucher.submit(() -> {
 während (wahr) {
  if (queue.size() > 0) {
  versuchen {
  LogItem-Element = Warteschlange.take();
  : Zeichenfolge dbtable = item.getDbTable();
  listeners.get(dbtable).forEach(l -> {
  l.beiEreignis(Element);
  });

  } Fang (UnterbrocheneAusnahme e) {
  e.printStackTrace();
  }
  }
  Thread.sleep(1000);
 }
 });
 }
 parseClient.connect();
 }

Holen Sie sich beim Verbrauch das Element aus der Warteschlange und lassen Sie dann den entsprechenden Listener bzw. die entsprechenden Listener das Element entsprechend verwenden.

Testcode:

 öffentliche statische void main(String[] args) wirft Exception {
 Conf conf = neue Conf();
 conf.host = "Hostname";
 conf.port = 3306;
 conf.Benutzername = conf.Passwort = "hhsgsb";

 MysqlBinLogListener mysqlBinLogListener = neuer MysqlBinLogListener(conf);
 mysqlBinLogListener.parseArgsAndRun(args);
 mysqlBinLogListener.regListener("pf", "student", item -> {
 System.out.println(neuer String((byte[])item.getAfter().get("name")));
 logger.info("einfügen in {}, Wert = {}", item.getDbTable(), item.getAfter());
 });
 mysqlBinLogListener.regListener("pf", "Lehrer", Element -> System.out.println("Lehrer ===="));

 mysqlBinLogListener.parse();
 }

In diesem kurzen Code werden zwei Listener registriert, die jeweils die Schüler- und Lehrertabellen abhören und ausdrucken. Nach dem Testen kann die definierte Geschäftslogik beim Einfügen von Daten in die Lehrertabelle unabhängig ausgeführt werden.

Hinweis: Die Tool-Klasse kann hier nicht direkt verwendet werden, da viele Ausnahmebehandlungen nicht durchgeführt werden und die Funktion nur auf die Einfügeanweisung hört, die als Referenz für die Implementierung verwendet werden kann.

Verweise

  • github.com/shyiko/mysq…
  • https://www.jb51.net/article/166761.htm

Zusammenfassen

Das Obige ist der vollständige Inhalt dieses Artikels. Ich hoffe, dass der Inhalt dieses Artikels einen gewissen Lernwert für Ihr Studium oder Ihre Arbeit hat. Vielen Dank für Ihre Unterstützung von 123WORDPRESS.COM.

Das könnte Sie auch interessieren:
  • MySQL-Reihe: Redo-Log, Undo-Log und Binlog – ausführliche Erklärung
  • Spezifische Verwendung des MySQL-Parameters binlog_ignore_db
  • So wählen Sie das Format bei der Verwendung von Binlog in MySQL
  • Detaillierte Erläuterung des Binlog-Protokollanalysetools zur Überwachung von MySQL: Canal
  • Ausführliche Erklärung des Binlogs in MySQL 8.0
  • Zusammenfassung einiger Gedanken zur Binlog-Optimierung in MySQL
  • Detaillierte Erläuterung des Befehls zum Bereinigen des MySQL-Datenbank-Binlogs
  • So unterscheiden Sie MySQLs innodb_flush_log_at_trx_commit und sync_binlog

<<:  vitrualBox+ubuntu16.04 installieren Sie Python3.6, neuestes Tutorial und detaillierte Schritte

>>:  Detaillierte Schritte zur Installation einer virtuellen Maschine und Verwendung von CentOS 8 mit VMware 15

Artikel empfehlen

Docker-Bereitstellungs- und Installationsschritte für Jenkins

Zuerst benötigen wir einen Server mit installiert...

So implementieren Sie eine MySQL-Master-Slave-Replikation basierend auf Docker

Vorwort Die MySQL Master-Slave-Replikation ist di...

Konfigurieren Sie ein Implementierungsbeispiel für den Mysql-Master-Slave-Dienst

Konfigurieren Sie ein Implementierungsbeispiel fü...

Eine kurze Analyse der Probleme mit JS-Originalwerten und Referenzwerten

Primitive Werte -> primitive Typen Number Stri...

Detaillierte Erklärung der Kernfunktionen und der Ereignisbehandlung von jQuery

Inhaltsverzeichnis Ereignis Seite wird geladen Ve...

Zusammenfassung der DTD-Verwendung in HTML

DTD ist ein Satz grammatikalischer Regeln zur Ausz...

Eine kurze Analyse der startReactApplication-Methode von React Native

In diesem Artikel haben wir den Startvorgang von ...

So erstellen Sie eine Deep-Learning-Umgebung mit Python in einem Docker-Container

Überprüfen Sie die Virtualisierung im Task-Manage...