Tomcat verwendet Thread-Pool zur Verarbeitung gleichzeitiger Remote-Anfragen

Tomcat verwendet Thread-Pool zur Verarbeitung gleichzeitiger Remote-Anfragen

Wenn wir verstehen, wie Tomcat gleichzeitige Anfragen verarbeitet, können wir Thread-Pools, Sperren, Warteschlangen und unsichere Klassen verstehen. Der folgende Hauptcode stammt von

java-jre:

sun.misc.Unsicher
java.util.concurrent.ThreadPoolExecutor
java.util.concurrent.ThreadPoolExecutor.Worker
java.util.concurrent.locks.AbstractQueuedSynchronizer
java.util.concurrent.locks.AbstractQueuedLongSynchronizer
java.util.concurrent.LinkedBlockingQueue

Kater:

org.apache.tomcat.util.net.NioEndpoint
org.apache.tomcat.util.threads.ThreadPoolExecutor
org.apache.tomcat.util.threads.TaskThreadFactory
org.apache.tomcat.util.threads.TaskQueue

ThreadPoolExecutor

Es handelt sich um eine Thread-Pool-Implementierungsklasse, die Threads verwaltet und den Thread-Overhead reduziert. Sie kann verwendet werden, um die Effizienz der Aufgabenausführung zu verbessern.

Die Parameter im Konstruktor sind

öffentlicher ThreadPoolExecutor(
 int corePoolSize,
 int maximalePoolgröße,
 lange KeepAliveTime,
 TimeUnit-Einheit,
 BlockingQueue<Ausführbare Datei> workQueue,
 ThreadFactory ThreadFactory,
 RejectedExecutionHandler-Handler) {
 
}

corePoolSize ist die Anzahl der Kern-Threads
maximumPoolSize ist die maximale Anzahl von Threads
keepAliveTime Maximale Leerlaufzeit von Nicht-Kern-Threads (beendet, wenn die Zeit überschritten wird)
Einheit Zeiteinheit
workQueue-Warteschlange: Wenn zu viele Aufgaben vorhanden sind, speichern Sie sie zuerst in der Warteschlange
threadFactory Thread Factory, eine Fabrik, die Threads erstellt
Entscheidungsstrategie für den Handler: Was ist zu tun, wenn zu viele Aufgaben vorhanden sind und die Warteschlange nicht mehr genügend Aufgaben speichern kann? Dieses Objekt übernimmt das Problem. Dies ist eine Schnittstelle, Sie können die Verarbeitungsmethode anpassen

Anwendung von ThreadPoolExecutor in HTTP-Anfragen in Tomcat

Dieser Threadpool wird von Tomcat verwendet, um jede Anfrage nach dem Empfang einer Remote-Anfrage als separate Aufgabe zu verarbeiten. Bei jedem Aufruf von execute(Runnable)

Initialisierung

org.apache.tomcat.util.net.NioEndpoint

Wenn NioEndpoint initialisiert wird, wird ein Thread-Pool erstellt

öffentliche Leere createExecutor() {
 internalExecutor = wahr;
 TaskQueue taskqueue = neue TaskQueue();
 //TaskQueue ist eine unbegrenzte Warteschlange und kann jederzeit erweitert werden, daher ist der Handler gleichbedeutend mit einer ungültigen TaskThreadFactory tf = neue TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
 Executor = neuer ThreadPoolExecutor (getMinSpareThreads(), getMaxThreads(), 60, Zeiteinheit.SEKUNDEN, Taskwarteschlange, tf);
 taskqueue.setParent( (ThreadPoolExecutor) Executor);
 }

Wenn der Thread-Pool erstellt ist, rufen Sie prestartAllCoreThreads() auf, um den Core-Worker-Thread zu initialisieren und zu starten

öffentliche int prestartAllCoreThreads() {
 Int. n = 0;
 während (addWorker(null, true))
  ++n;
 Rückkehr n;
 }

Wenn die Anzahl der addWorker gleich der corePoolSize ist, gibt addWorker(null,true) „false“ zurück und stoppt die Erstellung von Worker-Threads.

Übermitteln von Aufgaben an die Warteschlange

Jedes Mal, wenn ein Client eine Anfrage (http) stellt, wird eine Verarbeitungsaufgabe übermittelt.

Der Worker ruft die Aufgabe aus der Warteschlange ab und führt sie aus. Nachfolgend sehen Sie den Logikcode zum Einfügen der Aufgabe in die Warteschlange.

ThreadPoolExecutor.execute(Runnable) übermittelt die Aufgabe:

public void execute(Ausführbarer Befehl) {
 wenn (Befehl == null)
  wirf eine neue NullPointerException();
 
 int c = ctl.get();
 	// Ist die Anzahl der Worker kleiner als die Anzahl der Core-Threads? Nach der Initialisierung in Tomcat ist die erste Bedingung im Allgemeinen nicht erfüllt und addWorker wird nicht aufgerufen.
 wenn (workerCountOf(c) < corePoolSize) {
  wenn (addWorker(Befehl, wahr))
  zurückkehren;
  c = ctl.get();
 }
 	// workQueue.offer(command), füge die Aufgabe zur Warteschlange hinzu,
 wenn (isRunning(c) && workQueue.offer(Befehl)) {
  int erneut prüfen = ctl.get();
  wenn (!isRunning(erneut prüfen) und entfernen(Befehl))
  ablehnen (Befehl);
  sonst wenn (workerCountOf(recheck) == 0)
  addWorker(null, false);
 }
 sonst wenn (!addWorker(Befehl, false))
  ablehnen (Befehl);
 }

workQueue.offer(Befehl) schließt die Aufgabenübermittlung ab (wenn Tomcat Remote-HTTP-Anfragen verarbeitet).

Arbeitswarteschlange.Angebot

TaskQueue ist die konkrete Implementierungsklasse von BlockingQueue, der eigentliche Code von workQueue.offer(command) lautet:

öffentliches boolesches Angebot(E e) {
 wenn (e == null) wirf eine neue NullPointerException();
 endgültige Anzahl von AtomicIntegern = this.count;
 wenn (Anzahl.get() == Kapazität)
 gibt false zurück;
 int c = -1;
 Knoten<E> Knoten = neuer Knoten<E>(e);
 endgültiges ReentrantLock putLock = this.putLock;
 setLock.lock();
 versuchen {
 if (count.get() < Kapazität) {
  enqueue(node); //Hier Aufgaben zur Warteschlange hinzufügen c = count.getAndIncrement();
  wenn (c + 1 < Kapazität)
  nichtVoll.signal();
 }
 Endlich
 setzeLock.unlock();
 }
 wenn (c == 0)
 signalNotEmpty();
 gibt c >= 0 zurück;
}

// Aufgaben zur Warteschlange hinzufügen/**
 * Verknüpft den Knoten am Ende der Warteschlange.
 *
 * @param node der Knoten
 */
private void enqueue(Node<E> node) {
 // behaupten putLock.isHeldByCurrentThread();
 // behaupten, last.next == null;
 letzte = letzte.nächste = Knoten; //Linklistenstruktur letzte.nächste = Knoten; letzte = Knoten
}

Danach ist es die Aufgabe des Workers. In der Run-Methode holt sich der Worker durch Aufruf von getTask() die hier übermittelte Aufgabe und führt sie aus.

Wie verarbeitet der Thread-Pool neu übermittelte Aufgaben?

Senden Sie nach dem Hinzufügen eines Workers eine Aufgabe. Da die Anzahl der Worker corePoolSize erreicht, wird die Aufgabe in die Warteschlange gestellt, und die Run-Methode des Workers führt eine Schleife aus, um die Aufgaben in der Warteschlange abzurufen (sofern sie nicht leer ist).

Worker-Run-Methode:

/** Delegiert die Hauptlaufschleife an den äußeren RunWorker */
 öffentliche Leere ausführen() {
  führeWorker aus (dies);
 }

Durchlaufen Sie die Aufgaben in der Warteschlange

Code der RunWorker(worker)-Methodenschleife:

final void runWorker(Arbeiter w) {
 : Thread wt = Thread.currentThread();
 Ausführbare Aufgabe = w.firstTask;
 w.firstTask = null;
 w.unlock(); // Unterbrechungen zulassen
 Boolescher Wert „abrupt abgeschlossen“ = true;
 versuchen {
  while (task != null || (task = getTask()) != null) { //Schleife um Aufgaben in der Warteschlange abzurufen w.lock(); // Sperren try {
   //Vorabverarbeitung vor der Ausführung beforeExecute(wt, task);
   //Die Aufgaben in der Warteschlange beginnen mit der Ausführung von task.run();
   // Nachbearbeitung ausführen afterExecute(task, thrown);
  Endlich
   Aufgabe = null;
   w.abgeschlosseneAufgaben++;
   w.unlock(); // Sperre aufheben}
  }
  abgeschlossenAbrupt = falsch;
 Endlich
  ProzessWorkerExit(w, abrupt abgeschlossen);
 }
 }

task.run() führt die Aufgabe aus

Anwendung sperren

ThreadPoolExecutor verwendet Sperren, um zwei Dinge sicherzustellen:
1. Fügen Sie der Warteschlange Aufgaben hinzu, um sicherzustellen, dass andere Threads die Warteschlange nicht bedienen können
2. Holen Sie sich die Aufgabe der Warteschlange, um sicherzustellen, dass andere Threads die Warteschlange nicht gleichzeitig betreiben können

Aufgabensperre zur Warteschlange hinzufügen

öffentliches boolesches Angebot(E e) {
 wenn (e == null) wirf eine neue NullPointerException();
 endgültige Anzahl von AtomicIntegern = this.count;
 wenn (Anzahl.get() == Kapazität)
  gibt false zurück;
 int c = -1;
 Knoten<E> Knoten = neuer Knoten<E>(e);
 endgültiges ReentrantLock putLock = this.putLock;
 putLock.lock(); //Sperren versuchen {
  if (count.get() < Kapazität) {
  in Warteschlange einreihen (Knoten);
  c = zählen.getAndIncrement();
  wenn (c + 1 < Kapazität)
   nichtVoll.signal();
  }
 Endlich
  putLock.unlock(); //Sperre aufheben}
 wenn (c == 0)
  signalNotEmpty();
 gibt c >= 0 zurück;
 }

Warteschlangenaufgabensperre abrufen

private Ausführbare getTask() {
 boolean timedOut = false; // Ist beim letzten poll() eine Zeitüberschreitung aufgetreten?
		// ...weglassen für (;;) {
  versuchen {
  Ausführbar r = zeitgesteuert?
   workQueue.poll(keepAliveTime, Zeiteinheit.NANOSEKUNDEN):
   workQueue.take(); //Holen Sie sich eine Aufgabe in die Warteschlange, wenn (r != null)
   Rückkehr r;
  Zeitüberschreitung = wahr;
  } Fang (InterruptedException-Wiederholung) {
  timedOut = falsch;
  }
 }
 }
öffentliche E take() wirft InterruptedException {
 Ex;
 int c = -1;
 endgültige Anzahl von AtomicIntegern = this.count;
 letztes ReentrantLock takeLock = this.takeLock;
 takeLock.lockInterruptibly(); // Sperrversuch {
  während (Anzahl.get() == 0) {
  notEmpty.await(); //Wenn sich keine Aufgabe in der Warteschlange befindet, warten}
  x = aus der Warteschlange nehmen();
  c = zählen.getAndDecrement();
  wenn (c > 1)
  notEmpty.signal();
 Endlich
  takeLock.unlock(); // Sperre aufheben}
 wenn (c == Kapazität)
  signalNotFull();
 gebe x zurück;
 }

flüchtig

In gleichzeitigen Szenarien wird dieses Schlüsselwort häufig verwendet, um Membervariablen zu ändern.

Der Hauptzweck besteht darin, dass eine öffentliche Variable, wenn sie von einem Thread geändert wird, für andere Threads sichtbar ist (Echtzeit).

sun.misc.Unsichere Klassen mit hoher Parallelität

Bei Verwendung des Thread-Pools wird häufig die Unsafe-Klasse verwendet. Diese Klasse kann einige atomare CAS-Operationen ausführen, Threads sperren, Threads freigeben usw. mit hoher Parallelität.

sun.misc.Unsafe ist eine Low-Level-Klasse.

Atomare Datenmanipulation

Die Klasse java.util.concurrent.locks.AbstractQueuedSynchronizer verfügt über Code, um atomare Operationen sicherzustellen

geschützter finaler boolean-Vergleich und SetState(int expect, int update) {
 // Siehe unten für die Einrichtung der Intrinsics, um dies zu unterstützen
 gibt unsafe.compareAndSwapInt(diesen, stateOffset, erwarten, aktualisieren) zurück;
 }

Der der Unsafe-Klasse entsprechende Code:

//Die entsprechende Java-Basisebene ist eigentlich die native Methode, die dem C++-Code entspricht/**
* Aktualisieren Sie die Java-Variable atomar auf <tt>x</tt>, wenn sie aktuell
* Halten <tt>erwartet</tt>.
* @return <tt>true</tt> bei Erfolg
*/
öffentliches final natives boolean compareAndSwapInt(Objekt o, langer Offset,
      int erwartet,
      int x);

Die Funktion der Methode besteht einfach darin, einen Wert zu aktualisieren, um einen atomaren Vorgang sicherzustellen. Wenn Sie den offset einer Membervariable eines Objekts o bearbeiten möchten, ändern Sie o.offset.
Um die Genauigkeit bei hoher Parallelität sicherzustellen, sollten Sie beim Bedienen von o.offset den richtigen Wert lesen und dieser kann nicht von anderen Threads in der Mitte geändert werden, um die Wirksamkeit von Datenvorgängen in einer Umgebung mit hoher Parallelität sicherzustellen.

Das heißt, wenn der erwartete Wert derselbe ist wie der Wert im Speicher und „erwartet == Wert im Speicher“, dann ist der aktualisierte Wert x und „true“ wird zurückgegeben, um anzuzeigen, dass die Änderung erfolgreich war.

Andernfalls unterscheidet sich der erwartete Wert vom Speicherwert, was darauf hinweist, dass der Wert von anderen Threads geändert wurde und nicht auf x aktualisiert werden kann. False wird zurückgegeben, um dem Operator mitzuteilen, dass die atomare Änderung fehlgeschlagen ist.

Blockieren und Aktivieren von Threads

public native void park(boolean isAbsolute, long time); //Blockiere den aktuellen Thread

Die Worker-Rolle des Thread-Pools durchläuft eine Schleife, um Warteschlangenaufgaben abzurufen. Wenn sich keine Aufgabe in der Warteschlange befindet, wartet worker.run noch und beendet den Thread nicht. Der Code verwendet notEmpty.await() um diesen Worker-Thread zu unterbrechen und ihn in eine wartende Thread-Warteschlange (anders als die Aufgabenwarteschlange) zu stellen. Wenn eine neue Aufgabe erforderlich ist, wird notEmpty.signal() verwendet, um diesen Thread aufzuwecken.

Die unteren Schichten sind
unsafe.park() blockiert den aktuellen Thread
öffentlicher nativer void park (Boolescher Wert ist absolut, lange Zeit);

unsafe.unpark() weckt den Thread
öffentliches natives void unpark (Objekt-Thread);

Dieser Vorgang ist entsprechend. Wenn blockiert, wird der Thread zuerst in die Warteschlange gestellt. Wenn er aufgeweckt wird, wird der blockierte Thread aus der Warteschlange genommen und unsafe.unpark(thread) weckt den angegebenen Thread auf.

java.util.concurrent.locks.AbstractQueuedLongSynchronizer.ConditionObject -Klasse

Thread-Informationen über verknüpfte Listen speichern

// Einen blockierenden Thread hinzufügen private Node addConditionWaiter() {
  Knoten t = letzterWaiter;
  // Wenn lastWaiter abgebrochen wird, bereinigen.
  wenn (t != null und t.waitStatus != Node.CONDITION) {
  unlinkCancelledWaiters();
  t = letzterKellner;
  }
  Knoten Knoten = neuer Knoten (Thread.currentThread(), Knoten.BEDINGUNG);
  wenn (t == null)
  firstWaiter = Knoten;
  anders
  t.nextWaiter = Knoten;
  lastWaiter = node; //Setze den neu blockierten Thread an das Ende der verknüpften Liste return node;
 }

// Einen blockierten Thread herausnehmen public final void signal() {
  wenn (!isHeldExclusively())
  wirf eine neue IllegalMonitorStateException();
  Node first = firstWaiter; //Der erste blockierte Thread in der verknüpften Liste, wenn (first != null)
  macheSignal(zuerst);
 }

//Nachdem du es erhalten hast, aktiviere diesen Thread final boolean transferForSignal(Node node) {
  LockSupport.unpark(node.thread);
 gibt true zurück;
 }
öffentliche statische Leere unparken (Thread Thread) {
 wenn (Thread != null)
  UNSAFE.unpark(thread);
 }

Dies ist das Ende dieses Artikels darüber, wie Tomcat Thread-Pools verwendet, um Remote-Parallelanforderungen zu verarbeiten. Weitere Informationen zum Verarbeiten von Remote-Parallelanforderungen durch Tomcat-Thread-Pools finden Sie in früheren Artikeln auf 123WORDPRESS.COM oder in den folgenden verwandten Artikeln. Ich hoffe, Sie werden 123WORDPRESS.COM auch in Zukunft unterstützen!

Das könnte Sie auch interessieren:
  • Betrachten des Threadmodells von Tomcat aus der Connector-Komponente – BIO-Modus (empfohlen)
  • Tomcat-Quellcodeanalyse und -Verarbeitung
  • Detaillierte Erläuterung des Thread-Modells von Tomcat zur Verarbeitung von Anforderungen

<<:  Beschreibung der Standardtransaktionsisolationsebene von MySQL und Oracle

>>:  Tutorial zur HTML-Tabellenauszeichnung (37): Hintergrundbild-Attribut BACKGROUND

Artikel empfehlen

Verwendung und Demonstration von ref in Vue

Ref-Definition: Wird verwendet, um Referenzinform...

Detaillierte Erklärung gängiger Docker Compose-Befehle

1. Die Verwendung von Docker Compose ist der Verw...

Tutorial zu HTML-Formular-Tags (3): Eingabe-Tag

Tutorial zu HTML-Formular-Tags. In diesem Abschni...

Detaillierte Erläuterung des MySQL 5.7.9-Shutdown-Syntaxbeispiels

mysql-5.7.9 bietet endlich eine Shutdown-Syntax: ...

MySQL-Platzhalter (erweiterte SQL-Filterung)

Inhaltsverzeichnis Lassen Sie uns zunächst kurz P...

So schreiben Sie den Einführungsinhalt der Infoseite der Website

Alle Websites, ob offiziell, E-Commerce, soziale ...

Typische Fälle von MySQL-Indexfehlern

Inhaltsverzeichnis Typische Fälle Anhang: Häufige...

Lösung für Docker-Container, der Schriftarten wie Songti nicht erkennt

Problemhintergrund: Wenn Sie Docker zum Bereitste...

js-Entwicklungs-Plugin zum Erzielen eines Tab-Effekts

In diesem Artikelbeispiel wird der spezifische Co...

Detaillierte Erklärung der JS-Array-Methoden

Inhaltsverzeichnis 1. Das ursprüngliche Array wir...

CSS Transition erweitert und reduziert Elemente durch Ändern der Höhe

Ein allgemeines Entwicklungsbedürfnis besteht dar...

Natives JS zum Erreichen von Spezialeffekt-Meldungsfeldern

In diesem Artikel wird ein Nachrichtenfeld mit Sp...

Sprechen Sie kurz über MySQL Left Join Inner Join

Vorwort Ich war kürzlich damit beschäftigt, ein K...

Analyse der Implementierung der MySQL-Anweisungssperre

Zusammenfassung: Analyse von zwei MySQL SQL-Anwei...