Detaillierte Schritte zur Implementierung der Timeout-Statusüberwachung in Apache FlinkCEP

Detaillierte Schritte zur Implementierung der Timeout-Statusüberwachung in Apache FlinkCEP

CEP – Komplexe Ereignisverarbeitung.

Die Zahlung wurde nicht innerhalb einer bestimmten Zeitspanne nach der Bestellung bestätigt.

Die Taxibestellung wurde generiert, jedoch wurde dem Fahrgast nicht bestätigt, dass er innerhalb einer bestimmten Zeitspanne in das Taxi einsteigen kann.

Es liegt keine Lieferbestätigung für das Essen zum Mitnehmen innerhalb eines bestimmten Zeitraums nach der geplanten Lieferzeit vor.

Apache FlinkCEP API

CEPTimentoEventJob

Kurze Analyse des FlinkCEP-Quellcodes

DataStream und PatternStream

DataStream besteht im Allgemeinen aus Ereignissen oder Elementen desselben Typs. Ein DataStream kann durch eine Reihe von Transformationsvorgängen wie Filtern und Zuordnen in einen anderen DataStream umgewandelt werden.

PatternStream ist eine Abstraktion des CEP-Mustervergleichsstreams, der DataStream und Pattern kombiniert und Methoden wie select und flatSelect bereitstellt. PatternStream ist kein DataStream. Es bietet eine Methode zum Senden einer Map, die aus einer passenden Mustersequenz und den zugehörigen Ereignissen (also Map<Mustername, List<Ereignis>>) besteht, an SingleOutputStreamOperator, der ein DataStream ist.

Die Methoden und Variablen in der Toolklasse CEPOperatorUtils werden mit „PatternStream“ benannt, zum Beispiel:

öffentlich
 
statisch
 <EIN, AUS> 
Einzelausgabestreamoperator
<OUT> createPatternStream(...){...}
öffentlich

statisch
 <IN, OUT1, OUT2> 
Einzelausgabestreamoperator
<OUT1> erstelleTimeoutPatternStream(...){...}

Finale
 
Einzelausgabestreamoperator
<OUT> Musterstrom;

Einzelausgabestreamoperator

@Öffentlich

öffentlich
 
Klasse
 
Einzelausgabestreamoperator
<T> 
erweitert
 
Datenstrom
<T> {...}

PatternStream-Konstruktionsmethode:

MusterStream
(
Finale
 
Datenstrom
<T> Eingabestream, 
Finale
 
Muster
<T, ?> Muster) {

  
Das
.inputStream = EingabeStream;

  
Das
.muster = Muster;

  
Das
.comparator = 
Null
;

}



MusterStream
(
Finale
 
Datenstrom
<T> Eingabestream, 
Finale
 
Muster
<T, ?> Muster, 
Finale
 
Ereigniskomparator
<T> Komparator) {

  
Das
.inputStream = EingabeStream;

  
Das
.muster = Muster;

  
Das
.comparator = Komparator;

}

Muster, Quantifizierer und EventComparator

Pattern ist die Basisklasse für die Musterdefinition im Builder-Modus. Das definierte Muster wird von NFACompiler zum Generieren von NFA verwendet.

Wenn Sie ähnliche Methoden wie „next“ und „folgtBy“ selbst implementieren möchten, z. B. „timeEnd“, sollte es möglich sein, „Pattern“ zu erweitern und neu zu schreiben.

öffentlich
Klasse
Muster
<T, F 
erweitert
 T>
/** Modusname */
Privat
Finale
Zeichenfolge
 Name;
/**Vorheriges Muster*/
Privat
Finale
Muster
<T, ? 
erweitert
 T>vorheriges;
/**Einschränkungen, die ein Ereignis erfüllen muss, um mit dem aktuellen Muster übereinzustimmen*/
Privat
IterativeBedingung
<F> Zustand;
/** Länge des Zeitfensters, Mustervergleich innerhalb der Zeitlänge*/
Privat
Zeit
 FensterZeit;
/** Musterquantifizierer, d. h. ein Muster passt zu mehreren Ereignissen usw. Standardmäßig passt es zu einem*/
Privat
Quantor
 Quantifizierer = 
Quantor
.eins(
Konsumstrategie
.STRIKT);
/** Die Bedingungen, die Ereignisse erfüllen müssen, um das Sammeln von Ereignissen im Schleifenzustand zu beenden*/
Privat
IterativeBedingung
<F> bisZustand;
/**
   * Gilt für den Modus {@code times} und wird verwendet, um die Häufigkeit beizubehalten, mit der ein Ereignis in diesem Modus nacheinander auftreten kann.*/
Privat
Mal
 mal;
// Überspringe die Strategie nach dem Abgleichen des privaten Ereignisses
Finale
NachMatchSkipStrategy
 nachMatchSkipStrategy;
  ...
}

Quantifizierer werden verwendet, um bestimmte Musterverhalten zu beschreiben. Es gibt drei Hauptkategorien:

Einzel-Einzel-Übereinstimmung, Schleife-Schleife-Übereinstimmung, Zeit-Übereinstimmung innerhalb einer bestimmten Anzahl von Malen oder eines Zeitbereichs.

Jedes Muster kann optional sein (Einzelübereinstimmung oder Schleifenübereinstimmung) und kann über eine festgelegte ConsumingStrategy verfügen.

Schleifen und Zeiten verfügen außerdem über eine zusätzliche interne ConsumingStrategy, die zwischen im Muster empfangenen Ereignissen verwendet wird.

öffentlich
Klasse
Quantor
 {
  ...
/**
   * 5 Attribute, können kombiniert werden, aber nicht alle Kombinationen sind gültig*/
öffentlich
Aufzählung
QuantifiziererEigenschaft
 {
    EINZEL,
    Schleifen,
    MAL,
    OPTIONAL,
    GIERIG
  }
/**
   * Eine Strategie, die beschreibt, welche Ereignisse in diesem Muster übereinstimmen */
öffentlich
Aufzählung
Konsumstrategie
 {
    STRIKT,
    SKIP_TILL_NEXT,
    ÜBERSPRINGEN_BIS_JEDER,
    NICHT_FOLGEN,
    NICHT_WEITER
  }
/**
   * Beschreibt, wie oft ein Ereignis im aktuellen Muster hintereinander auftreten kann. Eine Musterbedingung ist beispielsweise nichts anderes als ein Boolescher Wert, und ein Ereignis, das die wahre Bedingung erfüllt, tritt hintereinander oder in einem bestimmten Zeitbereich auf, z. B. 2 bis 4 Mal. 2 Mal, 3 Mal und 4 Mal werden alle mit dem aktuellen Muster abgeglichen, sodass dasselbe Ereignis wiederholt abgeglichen wird.*/
öffentlich
statisch
Klasse
Mal
 {
Privat
Finale
int
 aus;
Privat
Finale
int
 Zu;
Privat
Mal
(
int
 aus, 
int
 Zu)
Voraussetzungen
.checkArgument(von > 
0
, 
„Das Von-Zeichen sollte eine positive Zahl größer als 0 sein.“
);
Voraussetzungen
.checkArgument(bis >= von, 
„Das Bis sollte eine Zahl größer oder gleich dem Von sein: „
 + ab + 
"."
);
Das
.from = von;
Das
.zu = zu;
    }
öffentlich
int
 getFrom() {
zurückkehren
 aus;
    }
öffentlich
int
 getTo() {
zurückkehren
 Zu;
    }
//Nummernbereich öffentlich
statisch
Mal
 von(
int
 aus, 
int
 Zu)
zurückkehren
neu
Mal
(von, bis);
    }
//Geben Sie die genaue Anzahl der öffentlichen
statisch
Mal
 von(
int
 mal)
zurückkehren
neu
Mal
(mal, mal);
    }
@Überschreiben
öffentlich
Boolescher Wert
 ist gleich(
Objekt
 o) {
Wenn
 (
Das
 == o) {
zurückkehren
WAHR
;
      }
Wenn
 (o == 
Null
 || getClass() != o.getClass()) {
zurückkehren
FALSCH
;
      }
Mal
 mal = (
Mal
) o;
zurückkehren
 von == mal.von &&
        bis == mal.bis;
    }
@Überschreiben
öffentlich
int
 hashCode() {
zurückkehren
Objekte
.hash(von, nach);
    }
  }
  ...
}

EventComparator, benutzerdefinierter Ereigniskomparator, implementiert die EventComparator-Schnittstelle.

öffentlich
 
Schnittstelle
 
Ereigniskomparator
<T> 
erweitert
 
Komparator
<T>, 
Serialisierbar
 {
lang
 serialVersionUID = 
1 Liter
;
}

NFACompiler und NFA

NFACompiler bietet Methoden zum Kompilieren von Pattern in NFA oder NFAFactory. Mit NFAFactory können mehrere NFAs erstellt werden.

öffentlich
Klasse
NFA-Compiler
 {
  ...
/**
   * NFAFactory erstellt eine Schnittstelle für NFA*
   * @param <T> Typ der Eingabeereignisse, die vom NFA verarbeitet werden
   */
öffentlich
Schnittstelle
NFA-Fabrik
<T> 
erweitert
Serialisierbar
 {
    NFA<T> erstelleNFA();
  }
  
/**
   * Konkrete Implementierung von NFAFactory: NFAFactoryImpl
   *
   * <p>Die Implementierung übernimmt den Serialisierer des Eingabetyps, die Fensterzeit und den Satz von
   * Zustände und deren Übergänge, um daraus einen NFA erstellen zu können.
   *
   * @param <T> Typ der Eingabeereignisse, die vom NFA verarbeitet werden
   */
Privat
statisch
Klasse
NFAFactoryImpl
<T> 
implementiert
NFA-Fabrik
<T> {
    
Privat
statisch
Finale
lang
 serialVersionUID = 
8939783698296714379L
;
    
Privat
Finale
lang
 FensterZeit;
Privat
Finale
Sammlung
<
Zustand
<T>> Zustände;
Privat
Finale
Boolescher Wert
 Zeitüberschreitungsbehandlung;
    
Privat
NFAFactoryImpl
(
lang
 Fensterzeit,
Sammlung
<
Zustand
<T>> Zustände,
Boolescher Wert
 Timeout-Behandlung) {
      
Das
.windowTime = Fensterzeit;
Das
.states = Staaten;
Das
.timeoutHandling = Zeitüberschreitungsbehandlung;
    }
    
@Überschreiben
öffentlich
 NFA<T> erstelleNFA() {
// Ein NFA besteht aus einem Statussatz, der Länge des Zeitfensters und der Angabe, ob Timeouts behandelt werden sollen.
neu
 NFA<>(Zustände, Fensterzeit, Timeout-Behandlung);
    }
  }
}

NFA: Nichtdeterministischer endlicher Automat – nichtdeterministischer endlicher (Zustands-)Automat.

Weitere Informationen finden Sie unter

https://zh.wikipedia.org/wiki/Nichtdeterministischer endlicher Automat

öffentlich
Klasse
 NFA<T> {
/**
   * Die Menge aller gültigen NFA-Zustände, die vom NFACompiler zurückgegeben werden. * Diese werden direkt aus dem benutzerdefinierten Muster abgeleitet.
   */
Privat
Finale
Karte
<
Zeichenfolge
, 
Zustand
<T>> Zustände;
  
/**
   * Pattern.within(Time) gibt die Länge des Zeitfensters an */
Privat
Finale
lang
 FensterZeit;
  
/**
   * Ein Timeout-Matchmarker */
Privat
Finale
Boolescher Wert
 Zeitüberschreitung behandeln;
  ...
}

PatternSelectFunction und PatternFlatSelectFunction

Wenn über den Musternamen auf eine Karte mit übereinstimmenden Ereignissen zugegriffen werden kann, wird die Methode select() von PatternSelectFunction aufgerufen. Der Mustername wird bei der Definition des Musters angegeben. Die Methode select() gibt genau ein Ergebnis zurück. Wenn Sie mehrere Ergebnisse zurückgeben müssen, können Sie PatternFlatSelectFunction implementieren.

öffentlich
 
Schnittstelle
 
Musterauswahlfunktion
<EIN, AUS> 
erweitert
 
Funktion
, 
Serialisierbar
 {



  
/**

   * Generieren Sie ein Ergebnis aus der angegebenen Ereigniskarte. Diese Ereignisse werden eindeutig durch den Namen des Schemas identifiziert, mit dem sie verknüpft sind */

  OUT auswählen(
Karte
<
Zeichenfolge
, 
Liste
<IN>>-Muster) 
wirft
 
Ausnahme
;

}

Anstatt ein OUT zurückzugeben, verwendet PatternFlatSelectFunction Collector, um die übereinstimmenden Ereignisse zu sammeln.

öffentlich
Schnittstelle
MusterFlatSelectFunktion
<EIN, AUS> 
erweitert
Funktion
, 
Serialisierbar
 {
  
/**
   * Ein oder mehrere Ergebnisse generieren */
Leere
 flacheAuswahl(
Karte
<
Zeichenfolge
, 
Liste
<IN>> Muster, 
Kollektor
<OUT> Ausgang) 
wirft
Ausnahme
;
}

SelectTimeoutCepOperator, MusterTimeoutFunktion

SelectTimeoutCepOperator wird erstellt, wenn die Methode createTimeoutPatternStream() in CEPOperatorUtils aufgerufen wird.

Die Methoden in SelectTimeoutCepOperator, die von der Operatoriteration aufgerufen werden, sind processMatchedSequences() und processTimedOutSequences().

Die Vorlagenmethode ... entspricht der Methode processEvent() und der Methode advanceTime() in der abstrakten Klasse AbstractKeyedCEPPatternOperator.

Es gibt auch den FlatSelectTimeoutCepOperator und die entsprechende PatternFlatTimeoutFunction.

öffentlich
Klasse
Wählen Sie TimeoutCepOperator
<IN, OUT1, OUT2, TASTE>
erweitert
AbstraktKeyedCEPPatternOperator
<IN, KEY, OUT1, 
Wählen Sie TimeoutCepOperator
.
Wählen Sie Wrapper
<IN, OUT1, OUT2>> {
Privat
Ausgabetag
<OUT2> zeitgesteuertesOutputTag;
öffentlich
Wählen Sie TimeoutCepOperator
(
TypSerializer
<IN> inputSerializer,
Boolescher Wert
 istVerarbeitungszeit,
NFA-Compiler
.
NFA-Fabrik
<IN> nfaFactory,
Finale
Ereigniskomparator
<IN> Komparator,
NachMatchSkipStrategy
 überspringenStrategie,
// Die Benennung von Parametern ist verwirrend … einschließlich der Benennung von Mitgliedern in der SelectWrapper-Klasse …
Musterauswahlfunktion
<IN, OUT1> FlatSelect-Funktion,
MusterTimeoutFunktion
<IN, OUT2> flatTimeoutFunktion,
Ausgabetag
<OUT2> Ausgabetag,
Ausgabetag
<IN> lateDataOutputTag) {
super
(
      Eingabeserializer,
      istVerarbeitungszeit,
      nfaFabrik,
      Komparator,
      überspringenStrategie,
neu
Wählen Sie Wrapper
<>(flacheAuswahlfunktion, flacheTimeoutfunktion),
      lateDataOutputTag);
Das
.timedOutOutputTag = Ausgabetag;
  }
  ...
}
öffentlich
Schnittstelle
MusterTimeoutFunktion
<EIN, AUS> 
erweitert
Funktion
, 
Serialisierbar
 {
  OUT-Timeout (
Karte
<
Zeichenfolge
, 
Liste
<IN>> Muster, 
lang
 TimeoutZeitstempel) 
wirft
Ausnahme
;
}
öffentlich
Schnittstelle
MusterFlatTimeoutFunktion
<EIN, AUS> 
erweitert
Funktion
, 
Serialisierbar
 {
Leere
 Time-out(
Karte
<
Zeichenfolge
, 
Liste
<IN>> Muster, 
lang
 TimeoutZeitstempel, 
Kollektor
<OUT> Ausgang) 
wirft
Ausnahme
;
}

CEP und CEPOperatorUtils

CEP ist eine Toolklasse zum Erstellen von PatternStream. PatternStream ist lediglich eine Kombination aus DataStream und Pattern.

öffentlich
Klasse
 CEP {
  
öffentlich
statisch
 <T> 
MusterStream
<T> Muster(
Datenstrom
<T>-Eingang, 
Muster
<T, ?> Muster) {
zurückkehren
neu
MusterStream
<>(Eingabe, Muster);
  }
  
öffentlich
statisch
 <T> 
MusterStream
<T> Muster(
Datenstrom
<T>-Eingang, 
Muster
<T, ?> Muster, 
Ereigniskomparator
<T> Komparator) {
zurückkehren
neu
MusterStream
<>(Eingabe, Muster, Komparator);
  }
}

CEPOperatorUtils erstellt SingleOutputStreamOperator (DataStream), wenn die select()-Methode und die flatSelect()-Methode von PatternStream aufgerufen werden.

öffentlich
Klasse
CEPOperatorUtils
 {
  ...
Privat
statisch
 <IN, OUT, K> 
Einzelausgabestreamoperator
<OUT>erstellePatternStream(
Finale
Datenstrom
<IN> Eingabestream,
Finale
Muster
<IN, ?> Muster,
Finale
Typinformation
<OUT> outTypeInfo,
Finale
Boolescher Wert
 Timeout-Behandlung,
Finale
Ereigniskomparator
<IN> Komparator,
Finale
OperatorBuilder
<IN, OUT> OperatorBuilder) {
Finale
TypSerializer
<IN> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig());
    
// prüfen, ob wir Verarbeitungszeit verwenden
Finale
Boolescher Wert
 isProcessingTime = inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() == 
Zeitmerkmal
.
Verarbeitungszeit
;
    
// Kompilieren Sie unser Muster in eine NFAFactory, um später NFAs zu instanziieren
Finale
NFA-Compiler
.
NFA-Fabrik
<IN> nfaFactory = 
NFA-Compiler
.compileFactory(Muster, TimeoutHandling);
    
Finale
Einzelausgabestreamoperator
<OUT> Musterstrom;
    
Wenn
 (EingabeStream 
Instanz von
KeyedStream
) {
KeyedStream
<IN, K> keyedStream = (
KeyedStream
<IN, K>) Eingabestream;
      MusterStream = keyedStream.transform(
        operatorBuilder.getKeyedOperatorName(),
        outTypeInfo,
        OperatorBuilder.build(
          Eingabeserializer,
          istVerarbeitungszeit,
          nfaFabrik,
          Komparator,
          Muster.getAfterMatchSkipStrategy()));
    } 
anders
 {
Schlüsselauswahl
<IN, 
Byte
> Schlüsselselektor = 
neu
NullByteKeySelector
<>();
      MusterStream = EingabeStream.keyBy(Schlüsselselektor).transform(
        operatorBuilder.getOperatorName(),
        outTypeInfo,
        OperatorBuilder.build(
          Eingabeserializer,
          istVerarbeitungszeit,
          nfaFabrik,
          Komparator,
          Muster.getAfterMatchSkipStrategy()
        )).forceNonParallel();
    }
    
zurückkehren
 MusterStream;
  }
  ...
}

Schritte zur FlinkCEP-Implementierung

  1. IN: Datenquelle -> Datenstrom -> Transformationen -> Datenstrom
  2. Muster: Muster.beginnen.wo.nächstes.wo...mal...
  3. PatternStream: CEP.pattern(Datenstrom, Muster)
  4. Datenstrom: PatternStream.select(Musterauswahlfunktion) PatternStream.flatSelect(Musterauswahlfunktion)
  5. AUS: DataStream -> Transformationen -> DataStream -> DataSink

Schritte zur Implementierung des FlinkCEP-Matching-Timeouts

Der Stream von TimeoutCEP benötigt keyBy, also KeyedStream. Wenn der inputStream nicht KeyedStream ist, wird ein neuer 0-Byte-Schlüssel erstellt (siehe oben im CEPOperatorUtils-Quellcode).

Schlüsselauswahl
<IN, 
Byte
> Schlüsselselektor = 
neu
 
NullByteKeySelector
<>();

Pattern ruft schließlich auf, um die Fensterzeit festzulegen. Wenn Sie nach Primärschlüssel gruppieren, wird in einem Zeitfenster höchstens ein Timeout-Ereignis abgeglichen, sodass Sie PatternStream.select(...) verwenden können.

  1. IN: Datenquelle -> Datenstrom -> Transformationen -> Datenstrom -> keyBy -> KeyedStream
  2. Muster: Muster.beginnen.wo.nächstes.wo...innerhalb(ZeitfensterZeit)
  3. MusterStream: CEP.pattern(KeyedStream, Muster)
  4. Ausgabetag: neuer Ausgabetag(...)
  5. SingleOutputStreamOperator: PatternStream.flatSelect(Ausgabetag, PatternFlatTimeoutFunction, PatternFlatSelectFunction)
  6. Datenstrom: SingleOutputStreamOperator.getSideOutput(OutputTag)
  7. AUS: DataStream -> Transformationen -> DataStream -> DataSink

Das FlinkCEP-Timeout ist unzureichend

Ähnlich wie bei der Flink-Fensteraggregation müssen, wenn Sie zum Fortschreiten die Ereigniszeit und von abhängigen Ereignissen generierte Wasserzeichen verwenden, nachfolgende Ereignisse eintreffen, bevor das Fenster zum Berechnen und Ausgeben von Ergebnissen ausgelöst wird.

FlinkCEP-Timeout – vollständige Demo

öffentlich
Klasse
CEPTimentoEventJob
 {
Privat
statisch
Finale
Zeichenfolge
 LOCAL_KAFKA_BROKER = 
"localhost:9092"
;
Privat
statisch
Finale
Zeichenfolge
 GROUP_ID = 
CEPTimentoEventJob
.
Klasse
.getSimpleName();
Privat
statisch
Finale
Zeichenfolge
 GRUPPENTHEMEN = GRUPPENTHEMEN-ID;
  
öffentlich
statisch
Leere
 hauptsächlich(
Zeichenfolge
[] Argumente) 
wirft
Ausnahme
 {
// ParameterTool
 Parameter = 
ParameterTool
.fromArgs(args);
    
StreamAusführungsumgebung
 umgebung = 
StreamAusführungsumgebung
.getExecutionEnvironment();
// Ereigniszeit verwenden env.setStreamTimeCharacteristic(
Zeitmerkmal
.
Ereigniszeit
);
    env.enableCheckpointing(
5000
);
    env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig
.
ExternalisierteCheckpointCleanup
.RETAIN_ON_CANCELLATION);
    env.getConfig().disableSysoutLogging();
    env.getConfig().setRestartStrategy(
NeustartStrategien
.fixedDelayRestart(
5
, 
10000
));
    
// Verwenden Sie nicht die endgültige POJO-Zeit
AssignerWithPeriodicWatermarks
 Extraktor = 
neu
IngestionTimeExtractor
<POJO>();
    
// Bleiben Sie konsistent mit Partitionenv.setParallelism( des Kafka-Themas
3
);
    
Eigenschaften
 kafkaProps = 
neu
Eigenschaften
();
    kafkaProps.setProperty(
"bootstrap.servers"
, LOCAL_KAFKA_BROKER);
    kafkaProps.setProperty(
"Gruppen-ID"
, GRUPPEN-ID);
    
// Zugriff auf Kafka-Nachrichten FlinkKafkaConsumer011
<POJO> Verbraucher = 
neu
FlinkKafkaConsumer011
<>(GRUPPENTHEMA, 
neu
POJOSchema
(), kafkaProps);
Datenstrom
<POJO> pojoDataStream = env.addSource(Verbraucher)
        .assignTimestampsAndWatermarks(Extraktor);
    pojoDataStream.print();
    
// Gruppieren nach Primärschlüsselhilfe, d. h., bei jedem POJO-Ereignis eine Übereinstimmungserkennung durchführen [Unterschiedliche POJO-Typen können unterschiedliche Zeiträume verwenden]
// 1.
Datenstrom
<POJO> keyedPojos = pojoDataStream
        .keyBy(
"Hilfe"
);
    
// Von der Initialisierung bis zum Endzustand – eine vollständige POJO-Ereignissequenz // 2.
Muster
<POJO, POJO> abgeschlossenPojo =
Muster
.<POJO>beginnen(
"init"
)
            .Wo(
neu
Einfache Bedingung
<POJO>() {
Privat
statisch
Finale
lang
 serialVersionUID = -
6847788055093903603L
;
              
@Überschreiben
öffentlich
Boolescher Wert
 Filter (POJO pojo) 
wirft
Ausnahme
 {
zurückkehren
"02"
.equals(pojo.getAstatus());
              }
            })
            .gefolgt von(
"Ende"
)
// .next("Ende")
            .Wo(
neu
Einfache Bedingung
<POJO>() {
Privat
statisch
Finale
lang
 serialVersionUID = -
2655089736460847552L
;
              
@Überschreiben
öffentlich
Boolescher Wert
 Filter (POJO pojo) 
wirft
Ausnahme
 {
zurückkehren
"00"
.equals(pojo.getAstatus()) || 
"01"
.equals(pojo.getAstatus());
              }
            });
    
// Finde die Ereignishilfe, die den Endzustand nicht innerhalb von 1 Minute erreicht hat [zu Testzwecken]
// Wenn es für verschiedene Typen unterschiedliche Zeitspannen gibt, z. B. einige eine Zeitüberschreitung von 1 Minute und andere eine Zeitüberschreitung von 1 Stunde haben, generieren Sie mehrere PatternStreams.
// 3.
MusterStream
<POJO> Musterstrom = CEP.Muster(keyedPojos, completedPojo.innerhalb(
Zeit
.Minuten(
1
)));
    
// Timeout für die Nebenausgabe definieren
// 4.
Ausgabetag
<POJO> Zeitüberschreitung = 
neu
Ausgabetag
<POJO>(
"Zeitüberschreitung"
) {
Privat
statisch
Finale
lang
 serialVersionUID = 
773503794597666247L
;
    };
    
// Ausgabetag<L> timeoutOutputTag, MusterFlatTimeoutFunktion<T, L> MusterFlatTimeoutFunktion, MusterFlatSelectFunktion<T, R> MusterFlatSelectFunktion
// 5.
Einzelausgabestreamoperator
<POJO> timeoutPojos = patternStream.flatSelect(
        Zeitüberschreitung,
neu
POJOZeitüberschreitung
(),
neu
FlacheAuswahlNichts
()
    );
    
//Drucken Sie das Timeout-POJO aus
// 6.7.
    timeoutPojos.getSideOutput(timedout).print();
    timeoutPojos.print();
    umgebung.execute(
CEPTimentoEventJob
.
Klasse
.getSimpleName());
  }
  
/**
   * Sammeln Sie die Timeout-Ereignisse */
öffentlich
statisch
Klasse
POJOZeitüberschreitung
implementiert
MusterFlatTimeoutFunktion
<POJO, POJO> {
Privat
statisch
Finale
lang
 serialVersionUID = -
4214641891396057732L
;
    
@Überschreiben
öffentlich
Leere
 Time-out(
Karte
<
Zeichenfolge
, 
Liste
<POJO>> Karte, 
lang
 ich, 
Kollektor
<POJO>-Sammler) 
wirft
Ausnahme
 {
Wenn
 (
Null
 != Karte.get(
"init"
)) {
für
 (POJO pojoInit: map.get(
"init"
)) {
System
.out.println(
"Zeitüberschreitung bei Initialisierung:"
 + pojoInit.getAid());
          Sammler.sammeln(pojoInit);
        }
      }
// Da das Ende abgelaufen ist und nicht empfangen wurde, kann das Ende hier nicht abgerufen werden.
.out.println(
"Zeitüberschreitung Ende: "
 + Karte.get(
"Ende"
));
    }
  }
  
/**
   * Normalerweise geschieht nichts, aber Sie können auch alle übereinstimmenden Ereignisse weiterleiten. Bei lockerer Nähe können ignorierte oder durchdrungene Ereignisse nicht ausgewählt und weitergeleitet werden. * Vervollständigen Sie die Initialisierungs- und Enddaten innerhalb einer Minute. *
   * @param <T>
   */
öffentlich
statisch
Klasse
FlacheAuswahlNichts
<T> 
implementiert
MusterFlatSelectFunktion
<T, T> {
Privat
statisch
Finale
lang
 serialVersionUID = -
3029589950677623844L
;
    
@Überschreiben
öffentlich
Leere
 flacheAuswahl(
Karte
<
Zeichenfolge
, 
Liste
<T>> Muster, 
Kollektor
<T> Sammler) {
System
.out.println(
"FlatSelect: "
 + Muster);
    }
  }
}

Testergebnisse (gefolgt von):

3
> POJO{Hilfe=
'ID000-0'
, ein Stil =
'STIL000-0'
, ein Name =
'NAME-0'
, logTime=
1563419728242
, Energie=
529,00
, Alter=
0
, tt=
2019
-
07
-
18
, astatus=
'02'
, erstelleZeit=
Null
, updateTime=
Null
}
3
> POJO{Hilfe=
'ID000-1'
, ein Stil =
'STIL000-2'
, ein Name =
'NAME-1'
, logTime=
1563419728783
, Energie=
348,00
, Alter=
26
, tt=
2019
-
07
-
18
, astatus=
'02'
, erstelleZeit=
Null
, updateTime=
Null
}
3
> POJO{Hilfe=
'ID000-0'
, ein Stil =
'STIL000-0'
, ein Name =
'NAME-0'
, logTime=
1563419749259
, Energie=
492,00
, Alter=
0
, tt=
2019
-
07
-
18
, astatus=
'00'
, erstelleZeit=
Null
, updateTime=
Null
}
flache Auswahl: {init=[POJO{aid=
'ID000-0'
, ein Stil =
'STIL000-0'
, ein Name =
'NAME-0'
, logTime=
1563419728242
, Energie=
529,00
, Alter=
0
, tt=
2019
-
07
-
18
, astatus=
'02'
, erstelleZeit=
Null
, updateTime=
Null
}], 
Ende
=[POJO{Hilfe=
'ID000-0'
, ein Stil =
'STIL000-0'
, ein Name =
'NAME-0'
, logTime=
1563419749259
, Energie=
492,00
, Alter=
0
, tt=
2019
-
07
-
18
, astatus=
'00'
, erstelleZeit=
Null
, updateTime=
Null
}]}
Zeitüberschreitung bei Init:ID000-
1
3
> POJO{Hilfe=
'ID000-1'
, ein Stil =
'STIL000-2'
, ein Name =
'NAME-1'
, logTime=
1563419728783
, Energie=
348,00
, Alter=
26
, tt=
2019
-
07
-
18
, astatus=
'02'
, erstelleZeit=
Null
, updateTime=
Null
}
Time-out 
Ende
: 
Null
3
> POJO{Hilfe=
'ID000-2'
, ein Stil =
'STIL000-0'
, ein Name =
'NAME-0'
, logTime=
1563419829639
, Energie=
467,00
, Alter=
0
, tt=
2019
-
07
-
18
, astatus=
'03'
, erstelleZeit=
Null
, updateTime=
Null
}
3
> POJO{Hilfe=
'ID000-2'
, ein Stil =
'STIL000-0'
, ein Name =
'NAME-0'
, logTime=
1563419841394
, Energie=
107,00
, Alter=
0
, tt=
2019
-
07
-
18
, astatus=
'00'
, erstelleZeit=
Null
, updateTime=
Null
}
3
> POJO{Hilfe=
'ID000-3'
, ein Stil =
'STIL000-0'
, ein Name =
'NAME-0'
, logTime=
1563419967721
, Energie=
431,00
, Alter=
0
, tt=
2019
-
07
-
18
, astatus=
'02'
, erstelleZeit=
Null
, updateTime=
Null
}
3
> POJO{Hilfe=
'ID000-3'
, ein Stil =
'STIL000-2'
, ein Name =
'NAME-0'
, logTime=
1563419979567
, Energie=
32,00
, Alter=
26
, tt=
2019
-
07
-
18
, astatus=
'03'
, erstelleZeit=
Null
, updateTime=
Null
}
3
> POJO{Hilfe=
'ID000-3'
, ein Stil =
'STIL000-2'
, ein Name =
'NAME-0'
, logTime=
1563419993612
, Energie=
542,00
, Alter=
26
, tt=
2019
-
07
-
18
, astatus=
'01'
, erstelleZeit=
Null
, updateTime=
Null
}
flache Auswahl: {init=[POJO{aid=
'ID000-3'
, ein Stil =
'STIL000-0'
, ein Name =
'NAME-0'
, logTime=
1563419967721
, Energie=
431,00
, Alter=
0
, tt=
2019
-
07
-
18
, astatus=
'02'
, erstelleZeit=
Null
, updateTime=
Null
}], 
Ende
=[POJO{Hilfe=
'ID000-3'
, ein Stil =
'STIL000-2'
, ein Name =
'NAME-0'
, logTime=
1563419993612
, Energie=
542,00
, Alter=
26
, tt=
2019
-
07
-
18
, astatus=
'01'
, erstelleZeit=
Null
, updateTime=
Null
}]}
3
> POJO{Hilfe=
'ID000-4'
, ein Stil =
'STIL000-0'
, ein Name =
'NAME-0'
, logTime=
1563420063760
, Energie=
122,00
, Alter=
0
, tt=
2019
-
07
-
18
, astatus=
'02'
, erstelleZeit=
Null
, updateTime=
Null
}
3
> POJO{Hilfe=
'ID000-4'
, ein Stil =
'STIL000-0'
, ein Name =
'NAME-0'
, logTime=
1563420078008
, Energie=
275,00
, Alter=
0
, tt=
2019
-
07
-
18
, astatus=
'03'
, erstelleZeit=
Null
, updateTime=
Null
}
Zeitüberschreitung bei Init:ID000-
4
3
> POJO{Hilfe=
'ID000-4'
, ein Stil =
'STIL000-0'
, ein Name =
'NAME-0'
, logTime=
1563420063760
, Energie=
122,00
, Alter=
0
, tt=
2019
-
07
-
18
, astatus=
'02'
, erstelleZeit=
Null
, updateTime=
Null
}
Time-out 
Ende
: 
Null

Zusammenfassen

Oben sind die Schritte zur Implementierung der Timeout-Statusüberwachung in Apache FlinkCEP, die ich Ihnen vorgestellt habe. Ich hoffe, sie werden Ihnen hilfreich sein. Wenn Sie Fragen haben, hinterlassen Sie mir bitte eine Nachricht und ich werde Ihnen rechtzeitig antworten!

Das könnte Sie auch interessieren:
  • Beispiel für die Verarbeitung von Domänennamen in einer Flink-Einstiegsanwendung
  • Analysieren Sie die Kernprinzipien von Flink und implementieren Sie Kernabstraktionen
  • Ein praktisches Tutorial zum Ausführen von Flink-Aufgaben in IDEA
  • So erstellen und testen Sie die Flink-Entwicklungsumgebung in IDEA
  • Analyse der Praxis von Apache Hudi in Kombination mit Flink zum Speichern von Milliarden von Daten im See

<<:  Detaillierte Erläuterung der ECharts-Mausereignisverarbeitungsmethode

>>:  Lösung für das Problem der Installation der MySQL-komprimierten Version von Zip

Artikel empfehlen

MySQL REVOKE zum Löschen von Benutzerberechtigungen

In MySQL können Sie mit der REVOKE-Anweisung best...

Beispielcode zur Implementierung von Dreiecken und Pfeilen durch CSS-Rahmen

1. CSS-Boxmodell Die Box beinhaltet: Rand, Rahmen...

JavaScript zum Erzielen eines Lupeneffekts

In diesem Artikel wird der spezifische Code für J...

Zusammenfassung ungewöhnlicher JS-Operationsoperatoren

Inhaltsverzeichnis 2. Komma-Operator 3. JavaScrip...

So fügen Sie schnell 10 Millionen Datensätze in MySQL ein

Ich habe gehört, dass es eine Interviewfrage gibt...

Vue3.0 implementiert die Fallstudie zum Lupeneffekt

Der zu erzielende Effekt ist: Festes Vergrößern a...

So verkleinern Sie die Protokolldatei in MYSQL SERVER

Das Transaktionsprotokoll zeichnet die Vorgänge a...

JavaScript-Code zum Erzielen eines einfachen Kalendereffekts

In diesem Artikel wird der spezifische Code für J...

Docker stellt MySQL bereit, um Beispielcode für eine Remoteverbindung zu erreichen

1. Docker durchsucht MySQL查看mysql版本 2. Docker Pul...

Beispielcode von layim zum Integrieren des Rechtsklickmenüs in JavaScript

Inhaltsverzeichnis 1. Wirkungsdemonstration 2. Im...