Ich habe einen Knotenserver, der mit fork()
IPC einen untergeordneten Prozess erstellt . Irgendwann sendet das Kind die Ergebnisse im Rahmen einer lang laufenden Aufgabe mit etwa 10 Hz an die Eltern zurück. Wenn die an übergebene Nutzlast process.send()
klein ist, funktioniert alles einwandfrei: Jede von mir gesendete Nachricht wird sofort empfangen und vom übergeordneten Element verarbeitet.
Wenn die Nutzlast jedoch "groß" ist - ich habe die genaue Größenbeschränkung nicht festgelegt -, werden alle Nutzdaten zuerst gesendet, anstatt sofort vom Elternteil empfangen zu werden, und erst wenn das Kind seine langfristige Aufgabe erledigt hat, erhält das Elternteil und verarbeiten Sie die Nachrichten.
tl; dr visuell:
Gut (passiert mit kleiner Nutzlast):
child: send()
parent: receive()
child: send()
parent: receive()
child: send()
parent: receive()
...
Schlecht (passiert mit großer Nutzlast):
child: send()
child: send()
child: send()
(repeat many times over many seconds)
...
parent: receive()
parent: receive()
parent: receive()
parent: receive()
...
- Ist das ein Fehler? (Bearbeiten: Verhalten tritt nur unter OS X auf, nicht unter Windows oder Linux)
- Gibt es eine Möglichkeit, dies zu vermeiden, außer zu versuchen, meine IPC-Nutzlast klein zu halten?
Bearbeiten 2 : Der folgende Beispielcode verwendet sowohl den Zeit- als auch den Iterationszähler, um auszuwählen, wann ein Update gesendet werden soll. (In meinem eigentlichen Code ist es auch möglich, ein Update nach n Iterationen oder nachdem die Schleife bestimmte Ergebnisse erzielt hat zu senden .) Daher ist ein Umschreiben des zu verwendenden Codes setInterval
/ setTimeout
anstelle einer Schleife ein letzter Ausweg für mich, da dies mich erfordert Funktionen entfernen.
Bearbeiten : Hier ist Testcode, der das Problem reproduziert. Es wird jedoch nur unter OS X reproduziert, nicht unter Windows oder Linux:
server.js
const opts = {stdio:['inherit', 'inherit', 'inherit', 'ipc']};
const child = require('child_process').fork('worker.js', [], opts);
child.on('message', msg => console.log(`parent: receive() ${msg.data.length} bytes`, Date.now()));
require('http').createServer((req, res) => {
console.log(req.url);
const match = /\d+/.exec(req.url);
if (match) {
child.send(match[0]*1);
res.writeHead(200, {'Content-Type':'text/plain'});
res.end(`Sending packets of size ${match[0]}`);
} else {
res.writeHead(404, {'Content-Type':'text/plain'});
res.end('what?');
}
}).listen(8080);
worker.js
if (process.send) process.on('message', msg => run(msg));
function run(messageSize) {
const msg = new Array(messageSize+1).join('x');
let lastUpdate = Date.now();
for (let i=0; i<1e7; ++i) {
const now = Date.now();
if ((now-lastUpdate)>200 || i%5000==0) {
console.log(`worker: send() > ${messageSize} bytes`, now);
process.send({action:'update', data:msg});
lastUpdate = Date.now();
}
Math.sqrt(Math.random());
}
console.log('worker done');
}
Gegen 8k tritt das Problem auf. Zum Beispiel bei der Abfrage von http://localhost:8080/15
vs.http://localhost:8080/123456
/15
worker: send() > 15 bytes 1571324249029
parent: receive() 15 bytes 1571324249034
worker: send() > 15 bytes 1571324249235
parent: receive() 15 bytes 1571324249235
worker: send() > 15 bytes 1571324249436
parent: receive() 15 bytes 1571324249436
worker done
/123456
worker: send() > 123456 bytes 1571324276973
worker: send() > 123456 bytes 1571324277174
worker: send() > 123456 bytes 1571324277375
child done
parent: receive() 123456 bytes 1571324277391
parent: receive() 123456 bytes 1571324277391
parent: receive() 123456 bytes 1571324277393
Erfahrung sowohl auf Node v12.7 als auch auf v12.12.
quelle
setInterval()
.run()
einewhile
Schleife darin ist? Schlagen Sie vor, dass das Umschalten aufsetInterval()
mein Problem löst? Um die Frage zu beantworten, die Sie meiner Meinung nach stellen: Ich verwende einewhile
Schleife, da diese Funktion der einzige Zweck dieses Arbeitsprozesses ist und (bei kleinen IPC-Nutzdaten) kein Problem verursacht hat, das ich sehen konnte.setInterval()
wird die Ereignisschleife freigegeben, um E / A im Hintergrund auszuführen. Ich sage nicht, dass es dieses Problem definitiv lösen wird, aber es scheint eine seltsame Wahl zu sein, es so zu schreiben, wie Sie es haben, nur weil Sie es können.setTimeout()
odersetInterval()
. Die Änderung hier ist trivial.Antworten:
lDie lange laufende und blockierende while-Schleife in Kombination mit Sockets oder Dateideskriptoren im Knoten ist immer ein Hinweis darauf, dass etwas falsch gemacht wurde.
Ohne das gesamte Setup testen zu können, ist es schwer zu sagen, ob meine Behauptung wirklich korrekt ist, aber Kurznachrichten können wahrscheinlich direkt in einem Block an das Betriebssystem weitergeleitet werden, das sie dann an den anderen Prozess weiterleitet. Bei größeren Nachrichten müsste der Knoten warten, bis das Betriebssystem mehr Daten empfangen kann, sodass das Senden in die Warteschlange gestellt wird. Wenn Sie eine Blockierung haben, wird
while
das Senden bis zumloop
Ende der Warteschlange in die Warteschlange gestellt .Also auf deine Frage, nicht das ist kein Fehler.
Wenn Sie eine neuere nodejs-Version verwenden, würde ich ein
await
undasync
anstelle von und ein nicht blockierendes verwenden,while
ähnlich demsleep
in dieser Antwort . Dasawait
wird den Knoten Ereignisschleife ermöglichen , wenn interceptprocessSome
kehrt Versprechen anhängig.Für Ihren Code, der nicht wirklich einen realen Anwendungsfall widerspiegelt, ist es schwer zu sagen, wie er richtig gelöst werden kann. Wenn Sie nichts Asynchrones tun
processSome
, um die E / A abfangen zu können, müssen Sie dies regelmäßig manuell tun, zawait new Promise(setImmediate);
. B. mit a .quelle
processSome()
Code aus derwhile
Schleife verschoben haben . (Oder vielleicht fehlt mir etwas Entscheidendes im Zusammenhang mit Versprechungen.)process.send({action:'update', data:status()});
wird, wenn sieevery10Hz
wahr ist, undprocessSome
für jede Iteration derwhile
. Dasawait
sollte es dem EvenLoop des Knotens ermöglichen, abzufangen, auch wennprocessSome
es kein Versprechen zurückgibt . Der Grund für Ihr Problem ist jedoch immer noch, dass die Schleife blockiert.processSome()
kein Versprechen zurückgegeben wird, blockiert dieser Ansatz weiterhin die E / A (Mikrotasks wie die von dieserawait
Anweisung erzeugte Fortsetzung werden vor dem E / A verarbeitet ). Dies führt auch dazu, dass die Iteration aufgrund der in der Warteschlange befindlichen Mikrotask bei jeder Iteration viel langsamer ausgeführt wird.Zu Ihrer ersten Frage
Dies ist definitiv kein Fehler und ich könnte es auf meinem Windows 10 (für die Größe 123456) reproduzieren. Dies liegt hauptsächlich an der zugrunde liegenden Kernel-Pufferung und der Kontextumschaltung durch das Betriebssystem, da zwei separate Prozesse (nicht getrennt) über einen IPC-Deskriptor kommunizieren.
Zu Ihrer zweiten Frage
Wenn ich das Problem richtig verstehe, versuchen Sie, für jede http-Anforderung jedes Mal, wenn der Worker einen Block an den Server zurücksendet, zu lösen, dass der Server ihn verarbeitet, bevor Sie den nächsten Block erhalten. So verstehe ich, als Sie Synchronisierungsverarbeitung sagten
Es gibt einen Weg, Versprechen zu verwenden, aber ich würde gerne Generatoren bei den Arbeitern einsetzen. Es ist besser, den Fluss zwischen Server und Worker zu koordinieren
Fließen:
server.js
worker.js
Wenn wir den genauen Anwendungsfall kennen, gibt es möglicherweise bessere Möglichkeiten, ihn zu programmieren. Dies ist nur eine Möglichkeit, den untergeordneten Prozess zu synchronisieren und dabei einen Großteil Ihres ursprünglichen Quellcodes beizubehalten.
quelle
Wenn Sie sicherstellen müssen, dass eine Nachricht empfangen wird, bevor Sie die nächste senden, können Sie warten, bis der Master den Empfang bestätigt. Dies verzögert natürlich das Senden der nächsten Nachricht, aber da Ihre Logik sowohl auf der Zeit als auch auf der Iterationsnummer beruht, um zu bestimmen, ob eine Nachricht gesendet werden soll, ist dies möglicherweise in Ihrem Fall in Ordnung.
Bei der Implementierung muss jeder Mitarbeiter für jede gesendete Nachricht ein Versprechen erstellen und auf eine Antwort des Masters warten, bevor er das Versprechen löst. Dies bedeutet auch, dass Sie anhand einer Nachrichten-ID oder einer eindeutigen Nachricht identifizieren müssen, welche Nachricht bestätigt wird, wenn Sie mehr als eine Nachricht oder einen Mitarbeiter gleichzeitig haben.
Hier ist der geänderte Code
server.js
worker.js
ps Ich habe den Code nicht getestet, daher muss er möglicherweise angepasst werden, aber die Idee sollte gelten.
quelle
Während ich anderen zustimme, dass die optimale Lösung eine wäre, bei der der untergeordnete Prozess am Ende jeder Schleife freiwillig die Kontrolle aufgeben kann, sodass die Pufferleerprozesse ausgeführt werden können, gibt es eine einfache / schnelle / schmutzige Lösung, die Sie fast synchronisiert Verhalten, und das ist, um das Kind
send
Anrufe blockieren zu lassen.Verwenden Sie dasselbe
server.js
wie zuvor und fast dasselbeworker.js
, wobei nur eine Zeile hinzugefügt wird:worker.js
Ausgabe:
quelle