CEP – Komplexe Ereignisverarbeitung. Die Zahlung wurde nicht innerhalb einer bestimmten Zeitspanne nach der Bestellung bestätigt. Die Taxibestellung wurde generiert, jedoch wurde dem Fahrgast nicht bestätigt, dass er innerhalb einer bestimmten Zeitspanne in das Taxi einsteigen kann. Es liegt keine Lieferbestätigung für das Essen zum Mitnehmen innerhalb eines bestimmten Zeitraums nach der geplanten Lieferzeit vor. Apache FlinkCEP API CEPTimentoEventJob Kurze Analyse des FlinkCEP-Quellcodes DataStream und PatternStream DataStream besteht im Allgemeinen aus Ereignissen oder Elementen desselben Typs. Ein DataStream kann durch eine Reihe von Transformationsvorgängen wie Filtern und Zuordnen in einen anderen DataStream umgewandelt werden. PatternStream ist eine Abstraktion des CEP-Mustervergleichsstreams, der DataStream und Pattern kombiniert und Methoden wie select und flatSelect bereitstellt. PatternStream ist kein DataStream. Es bietet eine Methode zum Senden einer Map, die aus einer passenden Mustersequenz und den zugehörigen Ereignissen (also Map<Mustername, List<Ereignis>>) besteht, an SingleOutputStreamOperator, der ein DataStream ist. Die Methoden und Variablen in der Toolklasse CEPOperatorUtils werden mit „PatternStream“ benannt, zum Beispiel: öffentlich statisch <EIN, AUS> Einzelausgabestreamoperator <OUT> createPatternStream(...){...} öffentlich statisch <IN, OUT1, OUT2> Einzelausgabestreamoperator <OUT1> erstelleTimeoutPatternStream(...){...} Finale Einzelausgabestreamoperator <OUT> Musterstrom; Einzelausgabestreamoperator @Öffentlich öffentlich Klasse Einzelausgabestreamoperator <T> erweitert Datenstrom <T> {...} PatternStream-Konstruktionsmethode: MusterStream ( Finale Datenstrom <T> Eingabestream, Finale Muster <T, ?> Muster) { Das .inputStream = EingabeStream; Das .muster = Muster; Das .comparator = Null ; } MusterStream ( Finale Datenstrom <T> Eingabestream, Finale Muster <T, ?> Muster, Finale Ereigniskomparator <T> Komparator) { Das .inputStream = EingabeStream; Das .muster = Muster; Das .comparator = Komparator; } Muster, Quantifizierer und EventComparator Pattern ist die Basisklasse für die Musterdefinition im Builder-Modus. Das definierte Muster wird von NFACompiler zum Generieren von NFA verwendet. Wenn Sie ähnliche Methoden wie „next“ und „folgtBy“ selbst implementieren möchten, z. B. „timeEnd“, sollte es möglich sein, „Pattern“ zu erweitern und neu zu schreiben. öffentlich Klasse Muster <T, F erweitert T> /** Modusname */ Privat Finale Zeichenfolge Name; /**Vorheriges Muster*/ Privat Finale Muster <T, ? erweitert T>vorheriges; /**Einschränkungen, die ein Ereignis erfüllen muss, um mit dem aktuellen Muster übereinzustimmen*/ Privat IterativeBedingung <F> Zustand; /** Länge des Zeitfensters, Mustervergleich innerhalb der Zeitlänge*/ Privat Zeit FensterZeit; /** Musterquantifizierer, d. h. ein Muster passt zu mehreren Ereignissen usw. Standardmäßig passt es zu einem*/ Privat Quantor Quantifizierer = Quantor .eins( Konsumstrategie .STRIKT); /** Die Bedingungen, die Ereignisse erfüllen müssen, um das Sammeln von Ereignissen im Schleifenzustand zu beenden*/ Privat IterativeBedingung <F> bisZustand; /** * Gilt für den Modus {@code times} und wird verwendet, um die Häufigkeit beizubehalten, mit der ein Ereignis in diesem Modus nacheinander auftreten kann.*/ Privat Mal mal; // Überspringe die Strategie nach dem Abgleichen des privaten Ereignisses Finale NachMatchSkipStrategy nachMatchSkipStrategy; ... } Quantifizierer werden verwendet, um bestimmte Musterverhalten zu beschreiben. Es gibt drei Hauptkategorien: Einzel-Einzel-Übereinstimmung, Schleife-Schleife-Übereinstimmung, Zeit-Übereinstimmung innerhalb einer bestimmten Anzahl von Malen oder eines Zeitbereichs. Jedes Muster kann optional sein (Einzelübereinstimmung oder Schleifenübereinstimmung) und kann über eine festgelegte ConsumingStrategy verfügen. Schleifen und Zeiten verfügen außerdem über eine zusätzliche interne ConsumingStrategy, die zwischen im Muster empfangenen Ereignissen verwendet wird. öffentlich Klasse Quantor { ... /** * 5 Attribute, können kombiniert werden, aber nicht alle Kombinationen sind gültig*/ öffentlich Aufzählung QuantifiziererEigenschaft { EINZEL, Schleifen, MAL, OPTIONAL, GIERIG } /** * Eine Strategie, die beschreibt, welche Ereignisse in diesem Muster übereinstimmen */ öffentlich Aufzählung Konsumstrategie { STRIKT, SKIP_TILL_NEXT, ÜBERSPRINGEN_BIS_JEDER, NICHT_FOLGEN, NICHT_WEITER } /** * Beschreibt, wie oft ein Ereignis im aktuellen Muster hintereinander auftreten kann. Eine Musterbedingung ist beispielsweise nichts anderes als ein Boolescher Wert, und ein Ereignis, das die wahre Bedingung erfüllt, tritt hintereinander oder in einem bestimmten Zeitbereich auf, z. B. 2 bis 4 Mal. 2 Mal, 3 Mal und 4 Mal werden alle mit dem aktuellen Muster abgeglichen, sodass dasselbe Ereignis wiederholt abgeglichen wird.*/ öffentlich statisch Klasse Mal { Privat Finale int aus; Privat Finale int Zu; Privat Mal ( int aus, int Zu) Voraussetzungen .checkArgument(von > 0 , „Das Von-Zeichen sollte eine positive Zahl größer als 0 sein.“ ); Voraussetzungen .checkArgument(bis >= von, „Das Bis sollte eine Zahl größer oder gleich dem Von sein: „ + ab + "." ); Das .from = von; Das .zu = zu; } öffentlich int getFrom() { zurückkehren aus; } öffentlich int getTo() { zurückkehren Zu; } //Nummernbereich öffentlich statisch Mal von( int aus, int Zu) zurückkehren neu Mal (von, bis); } //Geben Sie die genaue Anzahl der öffentlichen statisch Mal von( int mal) zurückkehren neu Mal (mal, mal); } @Überschreiben öffentlich Boolescher Wert ist gleich( Objekt o) { Wenn ( Das == o) { zurückkehren WAHR ; } Wenn (o == Null || getClass() != o.getClass()) { zurückkehren FALSCH ; } Mal mal = ( Mal ) o; zurückkehren von == mal.von && bis == mal.bis; } @Überschreiben öffentlich int hashCode() { zurückkehren Objekte .hash(von, nach); } } ... } EventComparator, benutzerdefinierter Ereigniskomparator, implementiert die EventComparator-Schnittstelle. öffentlich Schnittstelle Ereigniskomparator <T> erweitert Komparator <T>, Serialisierbar { lang serialVersionUID = 1 Liter ; } NFACompiler und NFA NFACompiler bietet Methoden zum Kompilieren von Pattern in NFA oder NFAFactory. Mit NFAFactory können mehrere NFAs erstellt werden. öffentlich Klasse NFA-Compiler { ... /** * NFAFactory erstellt eine Schnittstelle für NFA* * @param <T> Typ der Eingabeereignisse, die vom NFA verarbeitet werden */ öffentlich Schnittstelle NFA-Fabrik <T> erweitert Serialisierbar { NFA<T> erstelleNFA(); } /** * Konkrete Implementierung von NFAFactory: NFAFactoryImpl * * <p>Die Implementierung übernimmt den Serialisierer des Eingabetyps, die Fensterzeit und den Satz von * Zustände und deren Übergänge, um daraus einen NFA erstellen zu können. * * @param <T> Typ der Eingabeereignisse, die vom NFA verarbeitet werden */ Privat statisch Klasse NFAFactoryImpl <T> implementiert NFA-Fabrik <T> { Privat statisch Finale lang serialVersionUID = 8939783698296714379L ; Privat Finale lang FensterZeit; Privat Finale Sammlung < Zustand <T>> Zustände; Privat Finale Boolescher Wert Zeitüberschreitungsbehandlung; Privat NFAFactoryImpl ( lang Fensterzeit, Sammlung < Zustand <T>> Zustände, Boolescher Wert Timeout-Behandlung) { Das .windowTime = Fensterzeit; Das .states = Staaten; Das .timeoutHandling = Zeitüberschreitungsbehandlung; } @Überschreiben öffentlich NFA<T> erstelleNFA() { // Ein NFA besteht aus einem Statussatz, der Länge des Zeitfensters und der Angabe, ob Timeouts behandelt werden sollen. neu NFA<>(Zustände, Fensterzeit, Timeout-Behandlung); } } } NFA: Nichtdeterministischer endlicher Automat – nichtdeterministischer endlicher (Zustands-)Automat. Weitere Informationen finden Sie unter https://zh.wikipedia.org/wiki/Nichtdeterministischer endlicher Automat öffentlich Klasse NFA<T> { /** * Die Menge aller gültigen NFA-Zustände, die vom NFACompiler zurückgegeben werden. * Diese werden direkt aus dem benutzerdefinierten Muster abgeleitet. */ Privat Finale Karte < Zeichenfolge , Zustand <T>> Zustände; /** * Pattern.within(Time) gibt die Länge des Zeitfensters an */ Privat Finale lang FensterZeit; /** * Ein Timeout-Matchmarker */ Privat Finale Boolescher Wert Zeitüberschreitung behandeln; ... } PatternSelectFunction und PatternFlatSelectFunction Wenn über den Musternamen auf eine Karte mit übereinstimmenden Ereignissen zugegriffen werden kann, wird die Methode select() von PatternSelectFunction aufgerufen. Der Mustername wird bei der Definition des Musters angegeben. Die Methode select() gibt genau ein Ergebnis zurück. Wenn Sie mehrere Ergebnisse zurückgeben müssen, können Sie PatternFlatSelectFunction implementieren. öffentlich Schnittstelle Musterauswahlfunktion <EIN, AUS> erweitert Funktion , Serialisierbar { /** * Generieren Sie ein Ergebnis aus der angegebenen Ereigniskarte. Diese Ereignisse werden eindeutig durch den Namen des Schemas identifiziert, mit dem sie verknüpft sind */ OUT auswählen( Karte < Zeichenfolge , Liste <IN>>-Muster) wirft Ausnahme ; } Anstatt ein OUT zurückzugeben, verwendet PatternFlatSelectFunction Collector, um die übereinstimmenden Ereignisse zu sammeln. öffentlich Schnittstelle MusterFlatSelectFunktion <EIN, AUS> erweitert Funktion , Serialisierbar { /** * Ein oder mehrere Ergebnisse generieren */ Leere flacheAuswahl( Karte < Zeichenfolge , Liste <IN>> Muster, Kollektor <OUT> Ausgang) wirft Ausnahme ; } SelectTimeoutCepOperator, MusterTimeoutFunktion SelectTimeoutCepOperator wird erstellt, wenn die Methode createTimeoutPatternStream() in CEPOperatorUtils aufgerufen wird. Die Methoden in SelectTimeoutCepOperator, die von der Operatoriteration aufgerufen werden, sind processMatchedSequences() und processTimedOutSequences(). Die Vorlagenmethode ... entspricht der Methode processEvent() und der Methode advanceTime() in der abstrakten Klasse AbstractKeyedCEPPatternOperator. Es gibt auch den FlatSelectTimeoutCepOperator und die entsprechende PatternFlatTimeoutFunction. öffentlich Klasse Wählen Sie TimeoutCepOperator <IN, OUT1, OUT2, TASTE> erweitert AbstraktKeyedCEPPatternOperator <IN, KEY, OUT1, Wählen Sie TimeoutCepOperator . Wählen Sie Wrapper <IN, OUT1, OUT2>> { Privat Ausgabetag <OUT2> zeitgesteuertesOutputTag; öffentlich Wählen Sie TimeoutCepOperator ( TypSerializer <IN> inputSerializer, Boolescher Wert istVerarbeitungszeit, NFA-Compiler . NFA-Fabrik <IN> nfaFactory, Finale Ereigniskomparator <IN> Komparator, NachMatchSkipStrategy überspringenStrategie, // Die Benennung von Parametern ist verwirrend … einschließlich der Benennung von Mitgliedern in der SelectWrapper-Klasse … Musterauswahlfunktion <IN, OUT1> FlatSelect-Funktion, MusterTimeoutFunktion <IN, OUT2> flatTimeoutFunktion, Ausgabetag <OUT2> Ausgabetag, Ausgabetag <IN> lateDataOutputTag) { super ( Eingabeserializer, istVerarbeitungszeit, nfaFabrik, Komparator, überspringenStrategie, neu Wählen Sie Wrapper <>(flacheAuswahlfunktion, flacheTimeoutfunktion), lateDataOutputTag); Das .timedOutOutputTag = Ausgabetag; } ... } öffentlich Schnittstelle MusterTimeoutFunktion <EIN, AUS> erweitert Funktion , Serialisierbar { OUT-Timeout ( Karte < Zeichenfolge , Liste <IN>> Muster, lang TimeoutZeitstempel) wirft Ausnahme ; } öffentlich Schnittstelle MusterFlatTimeoutFunktion <EIN, AUS> erweitert Funktion , Serialisierbar { Leere Time-out( Karte < Zeichenfolge , Liste <IN>> Muster, lang TimeoutZeitstempel, Kollektor <OUT> Ausgang) wirft Ausnahme ; } CEP und CEPOperatorUtils CEP ist eine Toolklasse zum Erstellen von PatternStream. PatternStream ist lediglich eine Kombination aus DataStream und Pattern. öffentlich Klasse CEP { öffentlich statisch <T> MusterStream <T> Muster( Datenstrom <T>-Eingang, Muster <T, ?> Muster) { zurückkehren neu MusterStream <>(Eingabe, Muster); } öffentlich statisch <T> MusterStream <T> Muster( Datenstrom <T>-Eingang, Muster <T, ?> Muster, Ereigniskomparator <T> Komparator) { zurückkehren neu MusterStream <>(Eingabe, Muster, Komparator); } } CEPOperatorUtils erstellt SingleOutputStreamOperator (DataStream), wenn die select()-Methode und die flatSelect()-Methode von PatternStream aufgerufen werden. öffentlich Klasse CEPOperatorUtils { ... Privat statisch <IN, OUT, K> Einzelausgabestreamoperator <OUT>erstellePatternStream( Finale Datenstrom <IN> Eingabestream, Finale Muster <IN, ?> Muster, Finale Typinformation <OUT> outTypeInfo, Finale Boolescher Wert Timeout-Behandlung, Finale Ereigniskomparator <IN> Komparator, Finale OperatorBuilder <IN, OUT> OperatorBuilder) { Finale TypSerializer <IN> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig()); // prüfen, ob wir Verarbeitungszeit verwenden Finale Boolescher Wert isProcessingTime = inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() == Zeitmerkmal . Verarbeitungszeit ; // Kompilieren Sie unser Muster in eine NFAFactory, um später NFAs zu instanziieren Finale NFA-Compiler . NFA-Fabrik <IN> nfaFactory = NFA-Compiler .compileFactory(Muster, TimeoutHandling); Finale Einzelausgabestreamoperator <OUT> Musterstrom; Wenn (EingabeStream Instanz von KeyedStream ) { KeyedStream <IN, K> keyedStream = ( KeyedStream <IN, K>) Eingabestream; MusterStream = keyedStream.transform( operatorBuilder.getKeyedOperatorName(), outTypeInfo, OperatorBuilder.build( Eingabeserializer, istVerarbeitungszeit, nfaFabrik, Komparator, Muster.getAfterMatchSkipStrategy())); } anders { Schlüsselauswahl <IN, Byte > Schlüsselselektor = neu NullByteKeySelector <>(); MusterStream = EingabeStream.keyBy(Schlüsselselektor).transform( operatorBuilder.getOperatorName(), outTypeInfo, OperatorBuilder.build( Eingabeserializer, istVerarbeitungszeit, nfaFabrik, Komparator, Muster.getAfterMatchSkipStrategy() )).forceNonParallel(); } zurückkehren MusterStream; } ... } Schritte zur FlinkCEP-Implementierung
Schritte zur Implementierung des FlinkCEP-Matching-Timeouts Der Stream von TimeoutCEP benötigt keyBy, also KeyedStream. Wenn der inputStream nicht KeyedStream ist, wird ein neuer 0-Byte-Schlüssel erstellt (siehe oben im CEPOperatorUtils-Quellcode). Schlüsselauswahl <IN, Byte > Schlüsselselektor = neu NullByteKeySelector <>(); Pattern ruft schließlich auf, um die Fensterzeit festzulegen. Wenn Sie nach Primärschlüssel gruppieren, wird in einem Zeitfenster höchstens ein Timeout-Ereignis abgeglichen, sodass Sie PatternStream.select(...) verwenden können.
Das FlinkCEP-Timeout ist unzureichend Ähnlich wie bei der Flink-Fensteraggregation müssen, wenn Sie zum Fortschreiten die Ereigniszeit und von abhängigen Ereignissen generierte Wasserzeichen verwenden, nachfolgende Ereignisse eintreffen, bevor das Fenster zum Berechnen und Ausgeben von Ergebnissen ausgelöst wird. FlinkCEP-Timeout – vollständige Demo öffentlich Klasse CEPTimentoEventJob { Privat statisch Finale Zeichenfolge LOCAL_KAFKA_BROKER = "localhost:9092" ; Privat statisch Finale Zeichenfolge GROUP_ID = CEPTimentoEventJob . Klasse .getSimpleName(); Privat statisch Finale Zeichenfolge GRUPPENTHEMEN = GRUPPENTHEMEN-ID; öffentlich statisch Leere hauptsächlich( Zeichenfolge [] Argumente) wirft Ausnahme { // ParameterTool Parameter = ParameterTool .fromArgs(args); StreamAusführungsumgebung umgebung = StreamAusführungsumgebung .getExecutionEnvironment(); // Ereigniszeit verwenden env.setStreamTimeCharacteristic( Zeitmerkmal . Ereigniszeit ); env.enableCheckpointing( 5000 ); env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig . ExternalisierteCheckpointCleanup .RETAIN_ON_CANCELLATION); env.getConfig().disableSysoutLogging(); env.getConfig().setRestartStrategy( NeustartStrategien .fixedDelayRestart( 5 , 10000 )); // Verwenden Sie nicht die endgültige POJO-Zeit AssignerWithPeriodicWatermarks Extraktor = neu IngestionTimeExtractor <POJO>(); // Bleiben Sie konsistent mit Partitionenv.setParallelism( des Kafka-Themas 3 ); Eigenschaften kafkaProps = neu Eigenschaften (); kafkaProps.setProperty( "bootstrap.servers" , LOCAL_KAFKA_BROKER); kafkaProps.setProperty( "Gruppen-ID" , GRUPPEN-ID); // Zugriff auf Kafka-Nachrichten FlinkKafkaConsumer011 <POJO> Verbraucher = neu FlinkKafkaConsumer011 <>(GRUPPENTHEMA, neu POJOSchema (), kafkaProps); Datenstrom <POJO> pojoDataStream = env.addSource(Verbraucher) .assignTimestampsAndWatermarks(Extraktor); pojoDataStream.print(); // Gruppieren nach Primärschlüsselhilfe, d. h., bei jedem POJO-Ereignis eine Übereinstimmungserkennung durchführen [Unterschiedliche POJO-Typen können unterschiedliche Zeiträume verwenden] // 1. Datenstrom <POJO> keyedPojos = pojoDataStream .keyBy( "Hilfe" ); // Von der Initialisierung bis zum Endzustand – eine vollständige POJO-Ereignissequenz // 2. Muster <POJO, POJO> abgeschlossenPojo = Muster .<POJO>beginnen( "init" ) .Wo( neu Einfache Bedingung <POJO>() { Privat statisch Finale lang serialVersionUID = - 6847788055093903603L ; @Überschreiben öffentlich Boolescher Wert Filter (POJO pojo) wirft Ausnahme { zurückkehren "02" .equals(pojo.getAstatus()); } }) .gefolgt von( "Ende" ) // .next("Ende") .Wo( neu Einfache Bedingung <POJO>() { Privat statisch Finale lang serialVersionUID = - 2655089736460847552L ; @Überschreiben öffentlich Boolescher Wert Filter (POJO pojo) wirft Ausnahme { zurückkehren "00" .equals(pojo.getAstatus()) || "01" .equals(pojo.getAstatus()); } }); // Finde die Ereignishilfe, die den Endzustand nicht innerhalb von 1 Minute erreicht hat [zu Testzwecken] // Wenn es für verschiedene Typen unterschiedliche Zeitspannen gibt, z. B. einige eine Zeitüberschreitung von 1 Minute und andere eine Zeitüberschreitung von 1 Stunde haben, generieren Sie mehrere PatternStreams. // 3. MusterStream <POJO> Musterstrom = CEP.Muster(keyedPojos, completedPojo.innerhalb( Zeit .Minuten( 1 ))); // Timeout für die Nebenausgabe definieren // 4. Ausgabetag <POJO> Zeitüberschreitung = neu Ausgabetag <POJO>( "Zeitüberschreitung" ) { Privat statisch Finale lang serialVersionUID = 773503794597666247L ; }; // Ausgabetag<L> timeoutOutputTag, MusterFlatTimeoutFunktion<T, L> MusterFlatTimeoutFunktion, MusterFlatSelectFunktion<T, R> MusterFlatSelectFunktion // 5. Einzelausgabestreamoperator <POJO> timeoutPojos = patternStream.flatSelect( Zeitüberschreitung, neu POJOZeitüberschreitung (), neu FlacheAuswahlNichts () ); //Drucken Sie das Timeout-POJO aus // 6.7. timeoutPojos.getSideOutput(timedout).print(); timeoutPojos.print(); umgebung.execute( CEPTimentoEventJob . Klasse .getSimpleName()); } /** * Sammeln Sie die Timeout-Ereignisse */ öffentlich statisch Klasse POJOZeitüberschreitung implementiert MusterFlatTimeoutFunktion <POJO, POJO> { Privat statisch Finale lang serialVersionUID = - 4214641891396057732L ; @Überschreiben öffentlich Leere Time-out( Karte < Zeichenfolge , Liste <POJO>> Karte, lang ich, Kollektor <POJO>-Sammler) wirft Ausnahme { Wenn ( Null != Karte.get( "init" )) { für (POJO pojoInit: map.get( "init" )) { System .out.println( "Zeitüberschreitung bei Initialisierung:" + pojoInit.getAid()); Sammler.sammeln(pojoInit); } } // Da das Ende abgelaufen ist und nicht empfangen wurde, kann das Ende hier nicht abgerufen werden. .out.println( "Zeitüberschreitung Ende: " + Karte.get( "Ende" )); } } /** * Normalerweise geschieht nichts, aber Sie können auch alle übereinstimmenden Ereignisse weiterleiten. Bei lockerer Nähe können ignorierte oder durchdrungene Ereignisse nicht ausgewählt und weitergeleitet werden. * Vervollständigen Sie die Initialisierungs- und Enddaten innerhalb einer Minute. * * @param <T> */ öffentlich statisch Klasse FlacheAuswahlNichts <T> implementiert MusterFlatSelectFunktion <T, T> { Privat statisch Finale lang serialVersionUID = - 3029589950677623844L ; @Überschreiben öffentlich Leere flacheAuswahl( Karte < Zeichenfolge , Liste <T>> Muster, Kollektor <T> Sammler) { System .out.println( "FlatSelect: " + Muster); } } } Testergebnisse (gefolgt von): 3 > POJO{Hilfe= 'ID000-0' , ein Stil = 'STIL000-0' , ein Name = 'NAME-0' , logTime= 1563419728242 , Energie= 529,00 , Alter= 0 , tt= 2019 - 07 - 18 , astatus= '02' , erstelleZeit= Null , updateTime= Null } 3 > POJO{Hilfe= 'ID000-1' , ein Stil = 'STIL000-2' , ein Name = 'NAME-1' , logTime= 1563419728783 , Energie= 348,00 , Alter= 26 , tt= 2019 - 07 - 18 , astatus= '02' , erstelleZeit= Null , updateTime= Null } 3 > POJO{Hilfe= 'ID000-0' , ein Stil = 'STIL000-0' , ein Name = 'NAME-0' , logTime= 1563419749259 , Energie= 492,00 , Alter= 0 , tt= 2019 - 07 - 18 , astatus= '00' , erstelleZeit= Null , updateTime= Null } flache Auswahl: {init=[POJO{aid= 'ID000-0' , ein Stil = 'STIL000-0' , ein Name = 'NAME-0' , logTime= 1563419728242 , Energie= 529,00 , Alter= 0 , tt= 2019 - 07 - 18 , astatus= '02' , erstelleZeit= Null , updateTime= Null }], Ende =[POJO{Hilfe= 'ID000-0' , ein Stil = 'STIL000-0' , ein Name = 'NAME-0' , logTime= 1563419749259 , Energie= 492,00 , Alter= 0 , tt= 2019 - 07 - 18 , astatus= '00' , erstelleZeit= Null , updateTime= Null }]} Zeitüberschreitung bei Init:ID000- 1 3 > POJO{Hilfe= 'ID000-1' , ein Stil = 'STIL000-2' , ein Name = 'NAME-1' , logTime= 1563419728783 , Energie= 348,00 , Alter= 26 , tt= 2019 - 07 - 18 , astatus= '02' , erstelleZeit= Null , updateTime= Null } Time-out Ende : Null 3 > POJO{Hilfe= 'ID000-2' , ein Stil = 'STIL000-0' , ein Name = 'NAME-0' , logTime= 1563419829639 , Energie= 467,00 , Alter= 0 , tt= 2019 - 07 - 18 , astatus= '03' , erstelleZeit= Null , updateTime= Null } 3 > POJO{Hilfe= 'ID000-2' , ein Stil = 'STIL000-0' , ein Name = 'NAME-0' , logTime= 1563419841394 , Energie= 107,00 , Alter= 0 , tt= 2019 - 07 - 18 , astatus= '00' , erstelleZeit= Null , updateTime= Null } 3 > POJO{Hilfe= 'ID000-3' , ein Stil = 'STIL000-0' , ein Name = 'NAME-0' , logTime= 1563419967721 , Energie= 431,00 , Alter= 0 , tt= 2019 - 07 - 18 , astatus= '02' , erstelleZeit= Null , updateTime= Null } 3 > POJO{Hilfe= 'ID000-3' , ein Stil = 'STIL000-2' , ein Name = 'NAME-0' , logTime= 1563419979567 , Energie= 32,00 , Alter= 26 , tt= 2019 - 07 - 18 , astatus= '03' , erstelleZeit= Null , updateTime= Null } 3 > POJO{Hilfe= 'ID000-3' , ein Stil = 'STIL000-2' , ein Name = 'NAME-0' , logTime= 1563419993612 , Energie= 542,00 , Alter= 26 , tt= 2019 - 07 - 18 , astatus= '01' , erstelleZeit= Null , updateTime= Null } flache Auswahl: {init=[POJO{aid= 'ID000-3' , ein Stil = 'STIL000-0' , ein Name = 'NAME-0' , logTime= 1563419967721 , Energie= 431,00 , Alter= 0 , tt= 2019 - 07 - 18 , astatus= '02' , erstelleZeit= Null , updateTime= Null }], Ende =[POJO{Hilfe= 'ID000-3' , ein Stil = 'STIL000-2' , ein Name = 'NAME-0' , logTime= 1563419993612 , Energie= 542,00 , Alter= 26 , tt= 2019 - 07 - 18 , astatus= '01' , erstelleZeit= Null , updateTime= Null }]} 3 > POJO{Hilfe= 'ID000-4' , ein Stil = 'STIL000-0' , ein Name = 'NAME-0' , logTime= 1563420063760 , Energie= 122,00 , Alter= 0 , tt= 2019 - 07 - 18 , astatus= '02' , erstelleZeit= Null , updateTime= Null } 3 > POJO{Hilfe= 'ID000-4' , ein Stil = 'STIL000-0' , ein Name = 'NAME-0' , logTime= 1563420078008 , Energie= 275,00 , Alter= 0 , tt= 2019 - 07 - 18 , astatus= '03' , erstelleZeit= Null , updateTime= Null } Zeitüberschreitung bei Init:ID000- 4 3 > POJO{Hilfe= 'ID000-4' , ein Stil = 'STIL000-0' , ein Name = 'NAME-0' , logTime= 1563420063760 , Energie= 122,00 , Alter= 0 , tt= 2019 - 07 - 18 , astatus= '02' , erstelleZeit= Null , updateTime= Null } Time-out Ende : Null Zusammenfassen Oben sind die Schritte zur Implementierung der Timeout-Statusüberwachung in Apache FlinkCEP, die ich Ihnen vorgestellt habe. Ich hoffe, sie werden Ihnen hilfreich sein. Wenn Sie Fragen haben, hinterlassen Sie mir bitte eine Nachricht und ich werde Ihnen rechtzeitig antworten! Das könnte Sie auch interessieren:
|
<<: Detaillierte Erläuterung der ECharts-Mausereignisverarbeitungsmethode
>>: Lösung für das Problem der Installation der MySQL-komprimierten Version von Zip
In MySQL können Sie mit der REVOKE-Anweisung best...
1. CSS-Boxmodell Die Box beinhaltet: Rand, Rahmen...
In diesem Artikel wird der spezifische Code für J...
Inhaltsverzeichnis 2. Komma-Operator 3. JavaScrip...
Beim Aktualisieren eines Datensatzes in MySQL ist...
Busybox: Ein Schweizer Taschenmesser voller klein...
Ich habe gehört, dass es eine Interviewfrage gibt...
Der zu erzielende Effekt ist: Festes Vergrößern a...
1. Was ist Als Auszeichnungssprache hat CSS eine ...
Vorwort Durch die Verwendung von Docker und VS Co...
Das Transaktionsprotokoll zeichnet die Vorgänge a...
Popup-Fenster werden in der tatsächlichen Entwick...
In diesem Artikel wird der spezifische Code für J...
1. Docker durchsucht MySQL查看mysql版本 2. Docker Pul...
Inhaltsverzeichnis 1. Wirkungsdemonstration 2. Im...