So verwenden Sie worker_threads zum Erstellen neuer Threads in nodejs

So verwenden Sie worker_threads zum Erstellen neuer Threads in nodejs

Einführung

Wie im vorherigen Artikel erwähnt, gibt es in NodeJS zwei Arten von Threads. Einer ist die Ereignisschleife, die verwendet wird, um auf Benutzeranforderungen zu reagieren und verschiedene Rückrufe zu verarbeiten. Der andere ist der Arbeiterpool, der zum Erledigen verschiedener zeitaufwändiger Vorgänge verwendet wird.

Auf der offiziellen Website von nodejs wird eine Bibliothek namens „Webworker-Threads“ erwähnt, die den lokalen Worker-Pool von nodejs verwenden kann.

Leider ist das letzte Update der Webworker-Threads 2 Jahre her und es kann im neuesten Node.js 12 überhaupt nicht verwendet werden.

Der Autor der Webworker-Threads hat eine neue Bibliothek namens Webworker empfohlen.

Web-Worker basiert auf den Worker_Threads von Node.JS. Dieser Artikel erläutert die Verwendung von Worker_Threads und Web-Worker im Detail.

Arbeiter_Threads

Der Quellcode des Moduls worker_threads stammt aus lib/worker_threads.js und verweist auf den Worker-Thread, der einen neuen Thread starten kann, um JavaScript-Programme parallel auszuführen.

Worker_Threads werden hauptsächlich zum Verarbeiten CPU-intensiver Vorgänge und nicht von E/A-Vorgängen verwendet, da die asynchrone E/A von Node.JS selbst bereits sehr leistungsstark ist.

Es gibt 5 Hauptattribute, 3 Klassen und 3 Hauptmethoden in worker_threads. Als nächstes werden wir sie einzeln erklären.

istMainThread

isMainThread wird verwendet, um zu bestimmen, ob der Code im Hauptthread ausgeführt wird. Sehen wir uns ein Anwendungsbeispiel an:

const { Worker, istHauptthread } = erforderlich('worker_threads');

wenn (istHauptthread) {
 console.log('im Hauptthread');
 neuer Worker(__Dateiname);
} anders {
 console.log('im Arbeitsthread');
 console.log(isMainThread); // gibt „false“ aus.
}

Im obigen Beispiel haben wir Worker und isMainThread aus dem Modul worker_threads eingeführt. Worker ist die Hauptklasse des Worker-Threads, den wir später ausführlich erklären werden. Hier verwenden wir Worker, um einen Worker-Thread zu erstellen.

Nachrichtenkanal

MessageChannel stellt einen asynchronen bidirektionalen Kommunikationskanal dar. Es gibt keine Methoden im MessageChannel. MessageChannel wird hauptsächlich verwendet, um die MessagePorts an beiden Enden zu verbinden.

Klasse MessageChannel {
  schreibgeschützter Port1: MessagePort;
  schreibgeschützter Port2: MessagePort;
 }

Wenn wir den neuen MessageChannel() verwenden, werden automatisch zwei MessagePorts erstellt.

const { MessageChannel } = erfordern ('worker_threads');

const { port1, port2 } = neuer MessageChannel();
port1.on('Nachricht', (Nachricht) => console.log('empfangen', Nachricht));
port2.postMessage({ foo: 'bar' });
// Druckt: empfangen { foo: 'bar' } vom Listener `port1.on('message')`

Über MessageChannel können wir zwischen MessagePorts kommunizieren.

parentPort und MessagePort

parentPort ist ein MessagePort-Typ, der hauptsächlich für die Nachrichteninteraktion zwischen Worker-Threads und Haupt-Threads verwendet wird.

Über parentPort.postMessage() gesendete Nachrichten sind im Hauptthread über worker.on('message') verfügbar.

Im Hauptthread über worker.postMessage() gesendete Nachrichten werden im Worker-Thread über parentPort.on('message') empfangen.

Schauen wir uns die Definition von MessagePort an:

Klasse MessagePort erweitert EventEmitter {
  schließen(): ungültig;
  postMessage(Wert: beliebig, Übertragungsliste?: Array<ArrayBuffer | MessagePort>): void;
  ref(): ungültig;
  unref(): ungültig;
  start(): ungültig;

  addListener(Ereignis: "schließen", listener: () => void): dies;
  addListener(Ereignis: "Nachricht", Listener: (Wert: beliebig) => void): dies;
  addListener(Ereignis: Zeichenfolge | Symbol, Listener: (...Argumente: beliebig[]) => void): dies;

  emittieren(Ereignis: "schließen"): boolesch;
  emittieren(Ereignis: "Nachricht", Wert: beliebig): boolesch;
  emittieren(Ereignis: Zeichenfolge | Symbol, ...Argumente: beliebig[]): Boolesch;

  bei (Ereignis: "schließen", Listener: () => void): dies;
  bei (Ereignis: „Nachricht“, Listener: (Wert: beliebig) => ungültig): dies;
  bei (Ereignis: Zeichenfolge | Symbol, Listener: (...Argumente: beliebig[]) => void): dies;

  einmal (Ereignis: „schließen“, Listener: () => void): dies;
  einmal (Ereignis: „Nachricht“, Listener: (Wert: beliebig) => ungültig): dies;
  einmal (Ereignis: Zeichenfolge | Symbol, Listener: (...Argumente: beliebig[]) => ungültig): dies;

  prependListener(Ereignis: "schließen", listener: () => void): dies;
  prependListener(Ereignis: "Nachricht", Listener: (Wert: beliebig) => void): dies;
  prependListener(Ereignis: Zeichenfolge | Symbol, Listener: (...Argumente: beliebig[]) => void): dies;

  prependOnceListener(Ereignis: "schließen", listener: () => void): dies;
  prependOnceListener(Ereignis: "Nachricht", Listener: (Wert: beliebig) => void): dies;
  prependOnceListener(Ereignis: Zeichenfolge | Symbol, Listener: (...Argumente: beliebig[]) => void): dies;

  removeListener(Ereignis: "schließen", listener: () => void): dies;
  removeListener(Ereignis: "Nachricht", Listener: (Wert: beliebig) => void): dies;
  removeListener(Ereignis: Zeichenfolge | Symbol, Listener: (...Argumente: beliebig[]) => void): dies;

  aus(Ereignis: "schließen", Listener: () => void): dies;
  aus (Ereignis: „Nachricht“, Listener: (Wert: beliebig) => ungültig): dies;
  aus (Ereignis: Zeichenfolge | Symbol, Listener: (...Argumente: beliebig[]) => ungültig): dies;
 }

MessagePort erbt von EventEmitter, der ein Ende eines asynchronen bidirektionalen Kommunikationskanals darstellt. Dieser Kanal wird MessageChannel genannt und MessagePort kommuniziert über MessageChannel.

Wir können MessagePort verwenden, um Strukturdaten, Speicherbereiche oder andere MessagePorts zu übertragen.

Aus dem Quellcode können wir ersehen, dass es in MessagePort zwei Ereignisse gibt: „Close“ und „Message“.

Das Schließereignis wird ausgelöst, wenn die Verbindung zu einem der beiden Enden des Kanals getrennt wird, und das Nachrichtenereignis wird ausgelöst, wenn port.postMessage aufgerufen wird. Sehen wir uns ein Beispiel an:

const { MessageChannel } = erfordern ('worker_threads');
const { port1, port2 } = neuer MessageChannel();

// Druckt:
// foobar
// geschlossen!
port2.on('Nachricht', (Nachricht) => console.log(Nachricht));
port2.on('schließen', () => console.log('geschlossen!'));

port1.postMessage('foobar');
port1.schließen();

port.on('message') fügt tatsächlich einen Listener für das Nachrichtenereignis hinzu. Port bietet auch die Methode addListener, um manuell einen Listener hinzuzufügen.

port.on('message') löst automatisch die Methode port.start() aus, die den Start eines Ports anzeigt.

Wenn ein Port einen Listener hat, bedeutet das, dass der Port einen Verweis hat. Wenn ein Verweis vorhanden ist, wird das Programm nicht beendet. Wir können diesen Verweis abbrechen, indem wir die Methode port.unref aufrufen.

Als nächstes schauen wir uns an, wie Nachrichten über den Port übertragen werden:

port.postMessage(Wert[, Übertragungsliste])

postMessage kann zwei Parameter akzeptieren, der erste Parameter ist value, ein JavaScript-Objekt. Der zweite Parameter ist transferList.

Sehen wir uns zunächst einen Fall an, in dem ein Parameter übergeben wird:

const { MessageChannel } = erfordern ('worker_threads');
const { port1, port2 } = neuer MessageChannel();

port1.on('Nachricht', (Nachricht) => console.log(Nachricht));

const circularData = {};
circularData.foo = kreisförmigeDaten;
// Druckt: { foo: [Rundschreiben] }
port2.postMessage(circularData);

Normalerweise sind per PostMessage gesendete Objekte Kopien von Werten. Wenn Sie jedoch eine Transferliste angeben, werden die Objekte in der Transferliste an das empfangende Ende des Kanals übertragen und existieren auf der sendenden Seite nicht mehr, genau wie beim Senden der Objekte.

transferList ist eine Liste und die Objekte in der Liste können ArrayBuffer, MessagePort und FileHandle sein.

Wenn der Wert ein SharedArrayBuffer-Objekt enthält, kann das Objekt nicht in die Transferliste aufgenommen werden.

Schauen wir uns ein Beispiel mit zwei Parametern an:

const { MessageChannel } = erfordern ('worker_threads');
const { port1, port2 } = neuer MessageChannel();

port1.on('Nachricht', (Nachricht) => console.log(Nachricht));

const uint8Array = neues Uint8Array([ 1, 2, 3, 4 ]);
// Kopie von uint8Array posten:
port2.postMessage(uint8Array);

port2.postMessage(uint8Array, [ uint8Array.buffer ]);

//port2.postMessage(uint8Array);

Das obige Beispiel gibt Folgendes aus:

Uint8Array(4) [ 1, 2, 3, 4 ]
Uint8Array(4) [ 1, 2, 3, 4 ]

Die erste PostMessage ist eine Kopie und die zweite PostMessage ist eine Übertragung des zugrunde liegenden Puffers von Uint8Array.

Wenn wir port2.postMessage(uint8Array) erneut aufrufen, erhalten wir den folgenden Fehler:

DOMException [DataCloneError]: Ein ArrayBuffer wurde getrennt und konnte nicht geklont werden.

Buffer ist die zugrunde liegende Speicherstruktur von TypedArray. Wenn Buffer übertragen wird, ist das vorherige TypedArray nicht mehr verfügbar.

markAsUntransferable

Um dieses Problem zu vermeiden, können wir markAsUntransferable aufrufen, um den Puffer als nicht übertragbar zu markieren. Sehen wir uns ein Beispiel für markAsUntransferable an:

const { MessageChannel, markAsUntransferable } = require('worker_threads');

const pooledBuffer = neuer ArrayBuffer(8);
const typedArray1 = neues Uint8Array(pooledBuffer);
const typedArray2 = neues Float64Array(pooledBuffer);

markAlsNichtÜbertragbarMarkieren(pooledBuffer);

const { port1 } = neuer MessageChannel();
port1.postMessage(typedArray1, [typedArray1.buffer]);

console.log(typedArray1);
console.log(typedArray2);

TEILEN_UMGEBUNG

SHARE_ENV ist eine Umgebungsvariable, die an den Worker-Konstruktor übergeben wird. Durch Festlegen dieser Variable können wir gemeinsame Umgebungsvariablen zwischen dem Hauptthread und dem Worker-Thread lesen und schreiben.

const { Worker, SHARE_ENV } = erfordern('worker_threads');
neuer Worker('process.env.SET_IN_WORKER = "foo"', { eval: true, env: SHARE_ENV })
 .on('exit', () => {
 console.log(process.env.SET_IN_WORKER); // Gibt „foo“ aus.
 });

ArbeiterDaten

Zusätzlich zu postMessage() können Sie Daten auch vom Hauptthread an den Worker übergeben, indem Sie workerData an den Worker-Konstruktor im Hauptthread übergeben:

const { Worker, isMainThread, workerData } = erfordern('worker_threads');

wenn (istHauptthread) {
 const worker = new Worker(__filename, { workerData: 'Hallo Welt!' });
} anders {
 console.log(workerData); // Gibt „Hallo Welt!“ aus.
}

Arbeiterklasse

Schauen wir uns zunächst die Definition des Begriffs „Arbeitnehmer“ an:

 Klasse Worker erweitert EventEmitter {
  schreibgeschützte Standardeingabe: Beschreibbar | null;
  schreibgeschützter Standardout: Lesbar;
  schreibgeschützter stderr: Lesbar;
  schreibgeschützte Thread-ID: Nummer;
  schreibgeschützte Ressourcengrenzen?: Ressourcengrenzen;

  Konstruktor (Dateiname: Zeichenfolge | URL, Optionen?: WorkerOptions);

  postMessage(Wert: beliebig, Übertragungsliste?: Array<ArrayBuffer | MessagePort>): void;
  ref(): ungültig;
  unref(): ungültig;

  beenden(): Versprechen<Zahl>;

  getHeapSnapshot(): Promise<Lesbar>;

  addListener(Ereignis: "Fehler", Listener: (err: Fehler) => void): dies;
  addListener(Ereignis: "exit", Listener: (ExitCode: Nummer) => void): dies;
  addListener(Ereignis: "Nachricht", Listener: (Wert: beliebig) => void): dies;
  addListener(Ereignis: "online", listener: () => void): dies;
  addListener(Ereignis: Zeichenfolge | Symbol, Listener: (...Argumente: beliebig[]) => void): dies;

  ... 
 }

Worker erbt von EventEmitter und umfasst 4 wichtige Ereignisse: Fehler, Beenden, Nachricht und Online.

Ein Worker stellt einen unabhängigen JavaScript-Ausführungsthread dar. Wir können einen Worker erstellen, indem wir einen Dateinamen oder eine URL übergeben.

Jeder Worker verfügt über ein Paar integrierter MessagePorts, die bei der Erstellung des Workers miteinander verknüpft werden. Der Worker verwendet dieses Paar integrierter MessagePorts, um mit dem übergeordneten Thread zu kommunizieren.

Über parentPort.postMessage() gesendete Nachrichten sind im Hauptthread über worker.on('message') verfügbar.

Im Hauptthread über worker.postMessage() gesendete Nachrichten werden im Worker-Thread über parentPort.on('message') empfangen.

Natürlich kann man auch explizit ein MessageChannel-Objekt erstellen und den MessagePort dann als Nachricht an andere Threads übergeben. Schauen wir uns ein Beispiel an:

const assert = erfordern('assert');
Konstante {
 Worker, Nachrichtenkanal, Nachrichtenport, isMainThread, übergeordneter Port
} = erfordern('worker_threads');
wenn (istHauptthread) {
 const worker = neuer Worker(__Dateiname);
 const subChannel = neuer MessageChannel();
 worker.postMessage({ hierIstIhrPort: subChannel.port1 }, [subChannel.port1]);
 subChannel.port2.on('Nachricht', (Wert) => {
 console.log('Empfangen:', Wert);
 });
} anders {
 parentPort.once('Nachricht', (Wert) => {
 assert(Wert.hierIstIhrPort Instanzvon MessagePort);
 value.hereIsYourPort.postMessage('Der Arbeitsthread sendet diese Nachricht');
 Wert.hierIstIhrPort.schließen();
 });
}

Im obigen Beispiel haben wir die Messaging-Funktionen des Worker- und ParentPorts ausgenutzt, um einen MessagePort in einem expliziten MessageChannel zu übergeben.

Die Nachricht wird dann über den MessagePort verteilt.

Nachricht auf Port empfangen

Zusätzlich zur on('message')-Methode des Ports können wir auch receiveMessageOnPort verwenden, um Nachrichten manuell zu empfangen:

const { MessageChannel, receiveMessageOnPort } = erfordern('worker_threads');
const { port1, port2 } = neuer MessageChannel();
port1.postMessage({ hallo: 'Welt' });

console.log(Nachricht auf Port empfangen(Port2));
// Druckt: { Nachricht: { Hallo: 'Welt' } }
console.log(Nachricht auf Port empfangen(Port2));
// Druckt: undefiniert

VerschiebeMessagePortToContext

Lassen Sie uns zunächst das Kontextkonzept in nodejs verstehen. Wir können einen Kontext aus der VM erstellen. Es handelt sich um eine isolierte Kontextumgebung, die die Sicherheit verschiedener Betriebsumgebungen gewährleistet. Sehen wir uns ein Beispiel für einen Kontext an:

const vm = erfordern('vm');

konstant x = 1;

const Kontext = { x: 2 };
vm.createContext(context); // Kontextisolationsobjekt.

const-Code = 'x += 40; var y = 17;';
// „x“ und „y“ sind globale Variablen im Kontext.
// Der Wert von x ist anfangs 2, da dies der Wert von context.x ist.
vm.runInContext(Code, Kontext);

console.log(Kontext.x); // 42
konsole.log(Kontext.y); // 17

console.log(x); // 1; y ist nicht definiert.

Im Worker können wir einen MessagePort in einen anderen Kontext verschieben.

worker.moveMessagePortToContext(Port, kontextbezogene Sandbox)

Diese Methode empfängt zwei Parameter, der erste Parameter ist der zu verschiebende MessagePort und der zweite Parameter ist das von vm.createContext() erstellte Kontextobjekt.

worker_threads-Threadpool

Oben haben wir die Verwendung eines einzelnen Arbeitsthreads erwähnt, aber jetzt reicht ein Thread im Programm oft nicht mehr aus. Wir müssen einen Thread-Pool erstellen, um die Arbeitsthread-Objekte zu verwalten.

Nodejs stellt die Klasse AsyncResource als Erweiterung asynchroner Ressourcen bereit.

Die Klasse AsyncResource befindet sich im Modul async_hooks.

Sehen wir uns als Nächstes an, wie mit der Klasse AsyncResource ein Worker-Thread-Pool erstellt wird.

Angenommen, wir haben eine Aufgabe zum Addieren zweier Zahlen und der Skriptname lautet task_processor.js:

const { parentPort } = erfordern('worker_threads');
parentPort.on('Nachricht', (Aufgabe) => {
 parentPort.postMessage(task.a + task.b);
});

Hier ist die Implementierung des Worker-Pools:

const { AsyncResource } = erfordern('async_hooks');
const { EventEmitter } = erfordern('Ereignisse');
const path = require('Pfad');
const { Worker } = erfordern ('worker_threads');

const kTaskInfo = Symbol('kTaskInfo');
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');

Klasse WorkerPoolTaskInfo erweitert AsyncResource {
 Konstruktor(Rückruf) {
 super('WorkerPoolTaskInfo');
 this.callback = Rückruf;
 }

 erledigt(Fehler, Ergebnis) {
 this.runInAsyncScope(this.callback, null, err, Ergebnis);
 this.emitDestroy(); // `TaskInfo`s werden nur einmal verwendet.
 }
}

Klasse WorkerPool erweitert EventEmitter {
 Konstruktor(AnzahlThreads) {
 super();
 this.numThreads = AnzahlThreads;
 diese.arbeiter = [];
 dies.freieWorker = [];

 für (lass i = 0; i < numThreads; i++)
  dies.addNewWorker();
 }

 addNewWorker() {
 const worker = neuer Worker(Pfad.resolve(__dirname, 'task_processor.js'));
 worker.on('Nachricht', (Ergebnis) => {
  // Im Erfolgsfall: Rufen Sie den Callback auf, der an `runTask` übergeben wurde,
  // Entfernen Sie die mit dem Worker verknüpfte „TaskInfo“ und markieren Sie sie als frei
  // wieder.
  Arbeiter[kTaskInfo].done(null, Ergebnis);
  Arbeiter[kTaskInfo] = null;
  dies.freeWorkers.push(Arbeiter);
  dies.emit(kWorkerFreedEvent);
 });
 worker.on('Fehler', (Fehler) => {
  // Im Falle einer nicht abgefangenen Ausnahme: Rufen Sie den Callback auf, der an
  // `runTask` mit dem Fehler.
  wenn (Arbeitnehmer[kTaskInfo])
  Arbeiter[kTaskInfo].done(err, null);
  anders
  dies.emit('Fehler', err);
  // Entfernen Sie den Worker aus der Liste und starten Sie einen neuen Worker, um den
  // aktuelles.
  dies.workers.splice(dieses.workers.indexOf(worker), 1);
  dies.addNewWorker();
 });
 dies.Arbeiter.push(Arbeiter);
 dies.freeWorkers.push(Arbeiter);
 dies.emit(kWorkerFreedEvent);
 }

 runTask(Aufgabe, Rückruf) {
 wenn (this.freeWorkers.length === 0) {
  // Keine freien Threads, warten bis ein Worker-Thread frei wird.
  dies.once(kWorkerFreedEvent, () => dies.runTask(Aufgabe, Rückruf));
  zurückkehren;
 }

 const worker = this.freeWorkers.pop();
 Arbeiter[kTaskInfo] = neue WorkerPoolTaskInfo(Rückruf);
 Arbeiter.postMessage(Aufgabe);
 }

 schließen() {
 für (const worker von this.workers) worker.terminate();
 }
}

module.exports = ArbeiterPool;

Wir erstellen eine neue kTaskInfo-Eigenschaft für den Worker, kapseln den asynchronen Rückruf in WorkerPoolTaskInfo und weisen ihn worker.kTaskInfo zu.

Als nächstes können wir workerPool verwenden:

const WorkerPool = erfordern('./worker_pool.js');
const os = erfordern('os');

const pool = neuer WorkerPool(os.cpus().length);

lass fertig = 0;
für (sei i = 0; i < 10; i++) {
 pool.runTask({ a: 42, b: 100 }, (fehler, ergebnis) => {
 console.log(i, err, Ergebnis);
 wenn (++fertig === 10)
  pool.schließen();
 });
}

Dies ist das Ende dieses Artikels über die Verwendung von worker_threads in nodejs zum Erstellen neuer Threads. Weitere Informationen zur Verwendung von worker_threads in nodejs zum Erstellen von Threads finden Sie in früheren Artikeln auf 123WORDPRESS.COM oder in den folgenden verwandten Artikeln. Ich hoffe, Sie werden 123WORDPRESS.COM auch in Zukunft unterstützen!

Das könnte Sie auch interessieren:
  • Javascript Web Worker mit Prozessanalyse
  • Detaillierte Erklärung von Yii2 kombiniert mit Workermans WebSocket-Beispiel
  • Forschung zur Web Worker Multithreading API in JavaScript
  • Tiefgreifendes Verständnis von Worker-Threads in Node.js
  • Detailliertes Beispiel für sharedWorker in JavaScript zur Realisierung einer mehrseitigen Kommunikation
  • Codebeispiel für einen Javascript Worker-Sub-Thread
  • Grundlegendes zur Worker-Event-API in JavaScript
  • So verwenden Sie webWorker in JS

<<:  MySQL count: ausführliche Erklärung und Funktionsbeispielcode

>>:  Tutorial zur Installation der Internetzugriffskonfiguration für Linux 7.2 auf einer virtuellen VMware-Maschine unter Win7

Artikel empfehlen

Tutorial zur Installation von MYSQL5.7 aus dem OEL7.6-Quellcode

Laden Sie zunächst das Installationspaket von der...

Der vollständige Implementierungsprozess von Sudoku mit JavaScript

Inhaltsverzeichnis Vorwort So lösen Sie Sudoku Fü...

So verwenden Sie MQTT im Uniapp-Projekt

Inhaltsverzeichnis 1. Referenz-Plugins im Uniapp ...

Befehlsliste des Baota Linux-Panels

Inhaltsverzeichnis Pagoda installieren Management...

So lösen Sie das Phantomleseproblem in MySQL

Inhaltsverzeichnis Vorwort 1. Was ist Phantomlese...

Details nach dem Setzen des src des Iframes auf about:blank

Nachdem die Quelle des Iframes auf „about:blank“ g...

Verwendung des optionalen Verkettungsoperators von JS

Vorwort Der optionale Verkettungsoperator (?.) er...

Implementierung des WeChat-Applet-Nachrichten-Pushs in Nodejs

Auswählen oder Erstellen einer Abonnementnachrich...

Get/Delete-Methode zum Übergeben von Array-Parametern in Vue

Wenn Front-End und Back-End interagieren, müssen ...