Lösung für das Datenasymmetrieproblem zwischen MySQL und Elasticsearch Das JDBC-Eingabe-Plugin kann nur Datenbankanhänge und inkrementelle Schreibvorgänge in Elasticsearch implementieren, aber die Datenbank auf der JDBC-Quellseite kann häufig Datenbanklösch- oder Aktualisierungsvorgänge durchführen. Dadurch entsteht eine Asymmetrie zwischen der Datenbank und der Datenbasis der Suchmaschine. Wenn Sie über ein Entwicklungsteam verfügen, können Sie natürlich ein Programm schreiben, um Suchmaschinenvorgänge beim Löschen oder Aktualisieren zu synchronisieren. Wenn Sie über diese Möglichkeit nicht verfügen, können Sie die folgende Methode ausprobieren. Hier ist ein Datentabellenartikel, das mtime-Feld ist als ON UPDATE CURRENT_TIMESTAMP definiert, sodass sich die Zeit jeder Aktualisierung von mtime ändert mysql> Beschreibungsartikel; +-------------+--------------+------+-----+--------------------------------+-----------+ | Feld | Typ | Null | Schlüssel | Standard | Extra | +-------------+--------------+------+-----+--------------------------------+-----------+ | Ich würde | int(11) | NEIN | | 0 | | | Titel | Mitteltext | NEIN | | NULL | | | Beschreibung | Mitteltext | JA | | NULL | | | Autor | varchar(100) | JA | | NULL | | | Quelle | varchar(100) | JA | | NULL | | | Inhalt | Langtext | JA | | NULL | | | Status | Aufzählung('J','N')| NEIN | | 'N' | | | ctime | Zeitstempel | NEIN | | AKTUELLER_ZEITSTEMPEL | | | mtime | Zeitstempel | JA | | BEI UPDATE CURRENT_TIMESTAMP | | +-------------+--------------+------+-----+--------------------------------+-----------+ 7 Zeilen im Satz (0,00 Sek.) Logstash fügt Abfrageregeln für mtime hinzu jdbc { jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar" jdbc_driver_class => "com.mysql.jdbc.Treiber" jdbc_connection_string => "jdbc:mysql://localhost:3306/cms" jdbc_user => "cms" jdbc_password => "Passwort" Zeitplan => "* * * * *" #Zeitgesteuerter Cron-Ausdruck, hier wird er einmal pro Minute ausgeführt. Anweisung => "select * from article where mtime > :sql_last_value" use_column_value => wahr Tracking-Spalte => "mtime" tracking_column_type => "Zeitstempel" record_last_run => wahr last_run_metadata_path => "/var/tmp/article-mtime.last" } Erstellen Sie eine Papierkorbtabelle, die zum Lösen des Problems der Datenbanklöschung oder des Deaktivierungsstatus = „N“ verwendet wird. Tabelle „elasticsearch_trash“ erstellen ( `id` int(11) NICHT NULL, `ctime` Zeitstempel NULL DEFAULT CURRENT_TIMESTAMP, PRIMÄRSCHLÜSSEL (`id`) ) ENGINE=InnoDB STANDARD-CHARSET=utf8 Trigger für die Artikeltabelle erstellen CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_UPDATE` VOR DEM UPDATE AUF `article` FÜR JEDE ZEILE BEGINNEN - Die Logik hier besteht darin, das Problem zu lösen, dass, wenn der Artikelstatus N ändert, die entsprechenden Daten in der Suchmaschine gelöscht werden müssen. WENN NEU.status = 'N' DANN in elasticsearch_trash(id) Werte (OLD.id) einfügen; ENDE, WENN; – Die Logik hierbei besteht darin, dass bei einer Änderung des Status auf Y die Artikel-ID weiterhin in der Methode elasticsearch_trash vorhanden ist, was zu einer versehentlichen Löschung führen kann. Daher müssen Sie die Recyclingdatensätze im Papierkorb löschen. WENN NEU.status = 'J' DANN aus elasticsearch_trash löschen, wobei ID = OLD.id; ENDE, WENN; ENDE CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_DELETE` VOR DEM LÖSCHEN VON `article` FÜR JEDE ZEILE BEGINNEN – Die Logik hierbei besteht darin, dass ein Artikel beim Löschen in den Papierkorb der Suchmaschine verschoben wird. in elasticsearch_trash(id) Werte (OLD.id) einfügen; ENDE Als Nächstes müssen wir eine einfache Shell schreiben, die einmal pro Minute ausgeführt wird, um Daten aus der Datentabelle „elasticsearch_trash“ abzurufen, und dann mit dem Befehl „curl“ die Restful-Schnittstelle von Elasticsearch aufrufen, um die abgerufenen Daten zu löschen. Sie können auch verwandte Programme entwickeln. Hier ist ein Beispiel für eine geplante Spring-Boot-Aufgabe. juristische Person Paket cn.netkiller.api.domain.elasticsearch; importiere java.util.Date; importiere javax.persistence.Column; importiere javax.persistence.Entity; importiere javax.persistence.Id; importiere javax.persistence.Table; @Juristische Person @Tisch öffentliche Klasse ElasticsearchTrash { @Ausweis private int-ID; @Column(columnDefinition = "ZEITSTEMPEL STANDARD AKTUELLER_ZEITSTEMPEL") privates Datum ctime; öffentliche int getId() { Rückgabe-ID; } öffentliche void setId(int id) { diese.id = ID; } öffentliches Datum getCtime() { ctime zurückgeben; } öffentliche void setCtime(Datum ctime) { dies.ctime = ctime; } } Lagerhaus Paket cn.netkiller.api.repository.elasticsearch; importiere org.springframework.data.repository.CrudRepository; importiere com.example.api.domain.elasticsearch.ElasticsearchTrash; öffentliche Schnittstelle ElasticsearchTrashRepository erweitert CrudRepository<ElasticsearchTrash, Integer>{ } Geplante Aufgaben Paket cn.netkiller.api.schedule; importiere org.elasticsearch.action.delete.DeleteResponse; importiere org.elasticsearch.client.transport.TransportClient; importiere org.elasticsearch.rest.RestStatus; importiere org.slf4j.Logger; importiere org.slf4j.LoggerFactory; importiere org.springframework.beans.factory.annotation.Autowired; importiere org.springframework.scheduling.annotation.Scheduled; importiere org.springframework.stereotype.Component; importiere com.example.api.domain.elasticsearch.ElasticsearchTrash; importiere com.example.api.repository.elasticsearch.ElasticsearchTrashRepository; @Komponente öffentliche Klasse ScheduledTasks { privater statischer finaler Logger-Logger = LoggerFactory.getLogger (ScheduledTasks.class); @Autowired privater TransportClient-Client; @Autowired privates ElasticsearchTrashRepository alasticsearchTrashRepository; öffentliche ScheduledTasks() { } @Scheduled(fixedRate = 1000 * 60) // Führe die geplante Aufgabe alle 60 Sekunden aus public void cleanTrash() { für (ElasticsearchTrash elasticsearchTrash : alasticsearchTrashRepository.findAll()) { DeleteResponse-Antwort = Client.prepareDelete("Information", "Artikel", elasticsearchTrash.getId() + "").get(); RestStatus status = antwort.status(); logger.info("löschen {} {}", elasticsearchTrash.getId(), status.toString()); wenn (status == RestStatus.OK || status == RestStatus.NICHT_GEFUNDEN) { alasticsearchTrashRepository.delete(elasticsearchTrash); } } } } Spring Boot startet das Hauptprogramm. Paket cn.netkiller.api; importiere org.springframework.boot.SpringApplication; importiere org.springframework.boot.autoconfigure.SpringBootApplication; importiere org.springframework.scheduling.annotation.EnableScheduling; @SpringBootAnwendung @EnableScheduling öffentliche Klasse Anwendung { öffentliche statische void main(String[] args) { SpringApplication.run(Anwendung.Klasse, Argumente); } } Oben finden Sie eine Erklärung zur Lösung des Datenasymmetrieproblems zwischen MySQL und Elasticsearch. Wenn Sie Fragen haben, hinterlassen Sie bitte eine Nachricht oder diskutieren Sie in der Community dieser Site. Vielen Dank fürs Lesen und ich hoffe, es kann Ihnen helfen. Vielen Dank für Ihre Unterstützung dieser Site! Das könnte Sie auch interessieren:
|
<<: Versuchen Sie Docker+Nginx, um die Single-Page-Anwendungsmethode bereitzustellen
>>: Anwendungsbeispiele für die try_files-Direktive von Nginx
Überblick Das Rahmendiagramm dieses Artikels ist ...
Viele Unternehmen bieten derzeit Sonderaktionen m...
Mit REGELN kann die Art der inneren Rahmen der Ta...
Als Front-End-Webentwickler sind Sie beim Erstell...
Verwenden Sie v-model, um das Paging-Informations...
In unserem aktuellen Projekt müssen wir die Googl...
Starten Sie den MySQL-Container in Docekr Verwend...
Da die Verwendung von group by in MySQL immer zu ...
Beim Surfen im Internet stoßen wir nicht oft auf ...
Zuerst ist die Idee Um diesen Effekt zu erzielen,...
Gehen Sie zunächst zum Herunterladen auf die offi...
1: Definieren Sie eine gespeicherte Prozedur zum ...
<br />„Es gibt keine hässlichen Frauen auf d...
Inhaltsverzeichnis Anforderungsbeschreibung: Anfo...