Erzwingen von ~ synchronem Node.js IPC

8

Ich habe einen Knotenserver, der mit fork()IPC einen untergeordneten Prozess erstellt . Irgendwann sendet das Kind die Ergebnisse im Rahmen einer lang laufenden Aufgabe mit etwa 10 Hz an die Eltern zurück. Wenn die an übergebene Nutzlast process.send()klein ist, funktioniert alles einwandfrei: Jede von mir gesendete Nachricht wird sofort empfangen und vom übergeordneten Element verarbeitet.

Wenn die Nutzlast jedoch "groß" ist - ich habe die genaue Größenbeschränkung nicht festgelegt -, werden alle Nutzdaten zuerst gesendet, anstatt sofort vom Elternteil empfangen zu werden, und erst wenn das Kind seine langfristige Aufgabe erledigt hat, erhält das Elternteil und verarbeiten Sie die Nachrichten.

tl; dr visuell:

Gut (passiert mit kleiner Nutzlast):

child:  send()
parent: receive()
child:  send()
parent: receive()
child:  send()
parent: receive()
...

Schlecht (passiert mit großer Nutzlast):

child:  send()
child:  send()
child:  send()
(repeat many times over many seconds)
...
parent: receive()
parent: receive()
parent: receive()
parent: receive()
...
  1. Ist das ein Fehler? (Bearbeiten: Verhalten tritt nur unter OS X auf, nicht unter Windows oder Linux)
  2. Gibt es eine Möglichkeit, dies zu vermeiden, außer zu versuchen, meine IPC-Nutzlast klein zu halten?

Bearbeiten 2 : Der folgende Beispielcode verwendet sowohl den Zeit- als auch den Iterationszähler, um auszuwählen, wann ein Update gesendet werden soll. (In meinem eigentlichen Code ist es auch möglich, ein Update nach n Iterationen oder nachdem die Schleife bestimmte Ergebnisse erzielt hat zu senden .) Daher ist ein Umschreiben des zu verwendenden Codes setInterval/ setTimeoutanstelle einer Schleife ein letzter Ausweg für mich, da dies mich erfordert Funktionen entfernen.

Bearbeiten : Hier ist Testcode, der das Problem reproduziert. Es wird jedoch nur unter OS X reproduziert, nicht unter Windows oder Linux:

server.js

const opts = {stdio:['inherit', 'inherit', 'inherit', 'ipc']};
const child = require('child_process').fork('worker.js', [], opts);

child.on('message', msg => console.log(`parent: receive() ${msg.data.length} bytes`, Date.now()));

require('http').createServer((req, res) => {
   console.log(req.url);
   const match = /\d+/.exec(req.url);
   if (match) {
      child.send(match[0]*1);
      res.writeHead(200, {'Content-Type':'text/plain'});
      res.end(`Sending packets of size ${match[0]}`);
   } else {
      res.writeHead(404, {'Content-Type':'text/plain'});
      res.end('what?');
   }
}).listen(8080);

worker.js

if (process.send) process.on('message', msg => run(msg));

function run(messageSize) {
   const msg = new Array(messageSize+1).join('x');
   let lastUpdate = Date.now();
   for (let i=0; i<1e7; ++i) {
      const now = Date.now();
      if ((now-lastUpdate)>200 || i%5000==0) {
         console.log(`worker: send()  > ${messageSize} bytes`, now);
         process.send({action:'update', data:msg});
         lastUpdate = Date.now();
      }
      Math.sqrt(Math.random());
   }
   console.log('worker done');
}

Gegen 8k tritt das Problem auf. Zum Beispiel bei der Abfrage von http://localhost:8080/15vs.http://localhost:8080/123456

/15
worker: send()  > 15 bytes 1571324249029
parent: receive() 15 bytes 1571324249034
worker: send()  > 15 bytes 1571324249235
parent: receive() 15 bytes 1571324249235
worker: send()  > 15 bytes 1571324249436
parent: receive() 15 bytes 1571324249436
worker done
/123456
worker: send()  > 123456 bytes 1571324276973
worker: send()  > 123456 bytes 1571324277174
worker: send()  > 123456 bytes 1571324277375
child done
parent: receive() 123456 bytes 1571324277391
parent: receive() 123456 bytes 1571324277391
parent: receive() 123456 bytes 1571324277393

Erfahrung sowohl auf Node v12.7 als auch auf v12.12.

Phrogz
quelle
1
Verwenden Sie a, anstatt die Nachrichten in einer Blockierungsschleife in die Warteschlange zu stellen setInterval().
Patrick Roberts
@PatrickRoberts Fragen Sie sich, warum run()eine whileSchleife darin ist? Schlagen Sie vor, dass das Umschalten auf setInterval()mein Problem löst? Um die Frage zu beantworten, die Sie meiner Meinung nach stellen: Ich verwende eine whileSchleife, da diese Funktion der einzige Zweck dieses Arbeitsprozesses ist und (bei kleinen IPC-Nutzdaten) kein Problem verursacht hat, das ich sehen konnte.
Phrogz
1
Eine solche Blockierung hat keinen nützlichen Zweck. Durch die Verwendung eines nicht blockierenden Zeitmechanismus wie setInterval()wird die Ereignisschleife freigegeben, um E / A im Hintergrund auszuführen. Ich sage nicht, dass es dieses Problem definitiv lösen wird, aber es scheint eine seltsame Wahl zu sein, es so zu schreiben, wie Sie es haben, nur weil Sie es können.
Patrick Roberts
@PatrickRoberts Danke für die Eingabe. Ich habe es nicht so geschrieben, "nur weil ich kann", sondern weil der Code ursprünglich konsolenbasiert ohne IPC war. Eine while-Schleife, die regelmäßig Ergebnisse ausgibt, schien zu diesem Zeitpunkt vernünftig zu sein, es tritt jedoch dieses Problem auf (nur unter macOS).
Phrogz
Das Schreiben einer Blockierungsschleife, die die aktuelle Zeit abfragt, bis eine zeitbasierte Bedingung erfüllt ist, ist ein Antimuster in JavaScript. Es spielt keine Rolle, ob es zuvor IPC gab oder nicht. Bevorzugen Sie immer einen nicht blockierenden Ansatz mit setTimeout()oder setInterval(). Die Änderung hier ist trivial.
Patrick Roberts

Antworten:

3

lDie lange laufende und blockierende while-Schleife in Kombination mit Sockets oder Dateideskriptoren im Knoten ist immer ein Hinweis darauf, dass etwas falsch gemacht wurde.

Ohne das gesamte Setup testen zu können, ist es schwer zu sagen, ob meine Behauptung wirklich korrekt ist, aber Kurznachrichten können wahrscheinlich direkt in einem Block an das Betriebssystem weitergeleitet werden, das sie dann an den anderen Prozess weiterleitet. Bei größeren Nachrichten müsste der Knoten warten, bis das Betriebssystem mehr Daten empfangen kann, sodass das Senden in die Warteschlange gestellt wird. Wenn Sie eine Blockierung haben, wird whiledas Senden bis zum loopEnde der Warteschlange in die Warteschlange gestellt .

Also auf deine Frage, nicht das ist kein Fehler.

Wenn Sie eine neuere nodejs-Version verwenden, würde ich ein awaitund asyncanstelle von und ein nicht blockierendes verwenden, while ähnlich dem sleepin dieser Antwort . Das awaitwird den Knoten Ereignisschleife ermöglichen , wenn intercept processSomekehrt Versprechen anhängig.

Für Ihren Code, der nicht wirklich einen realen Anwendungsfall widerspiegelt, ist es schwer zu sagen, wie er richtig gelöst werden kann. Wenn Sie nichts Asynchrones tun processSome, um die E / A abfangen zu können, müssen Sie dies regelmäßig manuell tun, z await new Promise(setImmediate);. B. mit a .

async function run() {
  let interval = setInterval(() => {
    process.send({action:'update', data:status()});
    console.log('child:  send()');
  }, 1/10)

  while(keepGoing()) {
    await processSome();
  }

  clearInterval(interval)
}
t.niese
quelle
Vielen Dank für diese Antwort. Gemäß meiner Bearbeitung der Frage hat mein realer Code mehrere Bedingungen zum Senden eines Updates, von denen nur eine auf der Zeit basiert. Es scheint, dass Sie den processSome()Code aus der whileSchleife verschoben haben . (Oder vielleicht fehlt mir etwas Entscheidendes im Zusammenhang mit Versprechungen.)
Phrogz
1
@Phrogz ah ok nein, ich habe versehentlich die Zahnspange falsch gelesen. Ich habe die Antwort so aktualisiert, dass sie ausgeführt process.send({action:'update', data:status()});wird, wenn sie every10Hzwahr ist, und processSomefür jede Iteration der while. Das awaitsollte es dem EvenLoop des Knotens ermöglichen, abzufangen, auch wenn processSomees kein Versprechen zurückgibt . Der Grund für Ihr Problem ist jedoch immer noch, dass die Schleife blockiert.
t.niese
Zwei Kommentare zu dieser Antwort wie sie sind. Wenn processSome()kein Versprechen zurückgegeben wird, blockiert dieser Ansatz weiterhin die E / A (Mikrotasks wie die von dieser awaitAnweisung erzeugte Fortsetzung werden vor dem E / A verarbeitet ). Dies führt auch dazu, dass die Iteration aufgrund der in der Warteschlange befindlichen Mikrotask bei jeder Iteration viel langsamer ausgeführt wird.
Patrick Roberts
@PatrickRoberts ja du hast recht, es muss ein nicht gelöstes Versprechen zurückgeben.
t.niese
2

Zu Ihrer ersten Frage

Ist das ein Fehler? (Bearbeiten: Verhalten tritt nur unter OS X auf, nicht unter Windows oder Linux)

Dies ist definitiv kein Fehler und ich könnte es auf meinem Windows 10 (für die Größe 123456) reproduzieren. Dies liegt hauptsächlich an der zugrunde liegenden Kernel-Pufferung und der Kontextumschaltung durch das Betriebssystem, da zwei separate Prozesse (nicht getrennt) über einen IPC-Deskriptor kommunizieren.

Zu Ihrer zweiten Frage

Gibt es eine Möglichkeit, dies zu vermeiden, außer zu versuchen, meine IPC-Nutzlast klein zu halten?

Wenn ich das Problem richtig verstehe, versuchen Sie, für jede http-Anforderung jedes Mal, wenn der Worker einen Block an den Server zurücksendet, zu lösen, dass der Server ihn verarbeitet, bevor Sie den nächsten Block erhalten. So verstehe ich, als Sie Synchronisierungsverarbeitung sagten

Es gibt einen Weg, Versprechen zu verwenden, aber ich würde gerne Generatoren bei den Arbeitern einsetzen. Es ist besser, den Fluss zwischen Server und Worker zu koordinieren

Fließen:

  1. Der Server sendet eine Ganzzahl an den Worker, unabhängig davon, was er von der http-Anforderung erhält
  2. Der Worker erstellt und führt dann den Generator aus, um den ersten Block zu senden
  3. Arbeiter gibt nach dem Senden des Stücks nach
  4. Serveranfragen nach mehr
  5. Worker generiert mehr, da der Server mehr verlangt hat (nur wenn verfügbar)
  6. Wenn nicht mehr, sendet der Arbeiter das Ende der Brocken
  7. Der Server protokolliert nur, dass der Worker fertig ist und fordert nicht mehr an

server.js

const opts = {stdio:['inherit', 'inherit', 'inherit', 'ipc'], detached:false};
const child = require('child_process').fork('worker.js', [], opts);

child.on('message', (msg) => {
   //FLOW 7: Worker is done, just log
   if (msg.action == 'end'){
      console.log(`child ended for a particular request`)
   } else {
      console.log(`parent: receive(${msg.data.iter}) ${msg.data.msg.length} bytes`, Date.now())
      //FLOW 4: Server requests for more
      child.send('more')
   }   

});

require('http').createServer((req, res) => {
   console.log(req.url);
   const match = /\d+/.exec(req.url);   
   if (match) {
      //FLOW 1: Server sends integer to worker
      child.send(match[0]*1);
      res.writeHead(200, {'Content-Type':'text/plain'});
      res.end(`Sending packets of size ${match[0]}`);
   } else {
      res.writeHead(404, {'Content-Type':'text/plain'});
      res.end('what?');
   }
}).listen(8080);

worker.js

let runner
if (process.send) process.on('message', msg => {   
   //FLOW 2: Worker creates and runs a generator to send the first chunk
   if (parseInt(msg)) {
      runner = run(msg)
      runner.next()
   }
   //FLOW 5: Server asked more, so generate more chunks if available
   if (msg == "more") runner.next()

});

//generator function *
function* run(messageSize) {
   const msg = new Array(messageSize+1).join('x');
   let lastUpdate = Date.now();
   for (let i=0; i<1e7; ++i) {
      const now = Date.now();
      if ((now-lastUpdate)>200 || i%5000==0) {
         console.log(`worker: send(${i})  > ${messageSize} bytes`, now);
         let j = i         
         process.send({action:'update', data:{msg, iter:j}});
         //FLOW 3: Worker yields after sending the chunk
         yield
         lastUpdate = Date.now();
      }
      Math.sqrt(Math.random());
   }
   //FLOW 6: If no more, worker sends end signal
   process.send({action:'end'});
   console.log('worker done');
}

Wenn wir den genauen Anwendungsfall kennen, gibt es möglicherweise bessere Möglichkeiten, ihn zu programmieren. Dies ist nur eine Möglichkeit, den untergeordneten Prozess zu synchronisieren und dabei einen Großteil Ihres ursprünglichen Quellcodes beizubehalten.

manikawnth
quelle
1

Wenn Sie sicherstellen müssen, dass eine Nachricht empfangen wird, bevor Sie die nächste senden, können Sie warten, bis der Master den Empfang bestätigt. Dies verzögert natürlich das Senden der nächsten Nachricht, aber da Ihre Logik sowohl auf der Zeit als auch auf der Iterationsnummer beruht, um zu bestimmen, ob eine Nachricht gesendet werden soll, ist dies möglicherweise in Ihrem Fall in Ordnung.

Bei der Implementierung muss jeder Mitarbeiter für jede gesendete Nachricht ein Versprechen erstellen und auf eine Antwort des Masters warten, bevor er das Versprechen löst. Dies bedeutet auch, dass Sie anhand einer Nachrichten-ID oder einer eindeutigen Nachricht identifizieren müssen, welche Nachricht bestätigt wird, wenn Sie mehr als eine Nachricht oder einen Mitarbeiter gleichzeitig haben.

Hier ist der geänderte Code

server.js

const opts = {stdio:['inherit', 'inherit', 'inherit', 'ipc']};
const child = require('child_process').fork('worker.js', [], opts);

child.on('message', msg =>  {
    console.log(`parent: receive() ${msg.data.length} bytes`, Date.now())
    // reply to the child with the id
    child.send({ type: 'acknowledge', id: msg.id });
});

...

worker.js

const pendingMessageResolves = {};

if (process.send) process.on('message', msg => { 
    if (msg.type === 'acknowledge') {
        // call the stored resolve function
        pendingMessageResolves[msg.id]();
        // remove the function to allow the memory to be freed
        delete pendingMessageResolves[msg.id]
    } else {
        run(msg) 
    }
});

const sendMessageAndWaitForAcknowledge = (msg) => new Promise(resolve => {
    const id = new uuid(); // or any unique field
    process.send({ action:'update', data: msg, id });
    // store a reference to the resolve function
    pendingMessageResolves[id] = resolve;
})

async function run(messageSize) {
    const msg = new Array(messageSize+1).join('x');
    let lastUpdate = Date.now();
    for (let i=0; i<1e7; ++i) {
        const now = Date.now();
        if ((now-lastUpdate)>200 || i%5000==0) {
            console.log(`worker: send()  > ${messageSize} bytes`, now);
            await sendMessageAndWaitForAcknowledge(msg); // wait until master replies
            lastUpdate = Date.now();
        }
        Math.sqrt(Math.random());
    }
    console.log('worker done');
}

ps Ich habe den Code nicht getestet, daher muss er möglicherweise angepasst werden, aber die Idee sollte gelten.

Gafi
quelle
1

Während ich anderen zustimme, dass die optimale Lösung eine wäre, bei der der untergeordnete Prozess am Ende jeder Schleife freiwillig die Kontrolle aufgeben kann, sodass die Pufferleerprozesse ausgeführt werden können, gibt es eine einfache / schnelle / schmutzige Lösung, die Sie fast synchronisiert Verhalten, und das ist, um das Kind sendAnrufe blockieren zu lassen.

Verwenden Sie dasselbe server.jswie zuvor und fast dasselbe worker.js, wobei nur eine Zeile hinzugefügt wird:

worker.js

if (process.send) process.on('message', msg => run(msg));

// cause process.send to block until the message is actually sent                                                                                
process.channel.setBlocking(true);

function run(messageSize) {
   const msg = new Array(messageSize+1).join('x');
   let lastUpdate = Date.now();
   for (let i=0; i<1e6; ++i) {
      const now = Date.now();
      if ((now-lastUpdate)>200 || i%5000==0) {
         console.error(`worker: send()  > ${messageSize} bytes`, now);
         process.send({action:'update', data:msg});
         lastUpdate = Date.now();
      }
      Math.sqrt(Math.random());
   }
   console.log('worker done');
}

Ausgabe:

/123456
worker: send()  > 123456 bytes 1572113820591
worker: send()  > 123456 bytes 1572113820630
parent: receive() 123456 bytes 1572113820629
parent: receive() 123456 bytes 1572113820647
worker: send()  > 123456 bytes 1572113820659
parent: receive() 123456 bytes 1572113820665
worker: send()  > 123456 bytes 1572113820668
parent: receive() 123456 bytes 1572113820678
worker: send()  > 123456 bytes 1572113820678
parent: receive() 123456 bytes 1572113820683
worker: send()  > 123456 bytes 1572113820683
parent: receive() 123456 bytes 1572113820687
worker: send()  > 123456 bytes 1572113820687
worker: send()  > 123456 bytes 1572113820692
parent: receive() 123456 bytes 1572113820692
parent: receive() 123456 bytes 1572113820696
worker: send()  > 123456 bytes 1572113820696
parent: receive() 123456 bytes 1572113820700
worker: send()  > 123456 bytes 1572113820700
parent: receive() 123456 bytes 1572113820703
worker: send()  > 123456 bytes 1572113820703
parent: receive() 123456 bytes 1572113820706
worker: send()  > 123456 bytes 1572113820706
parent: receive() 123456 bytes 1572113820709
worker: send()  > 123456 bytes 1572113820709
parent: receive() 123456 bytes 1572113820713
worker: send()  > 123456 bytes 1572113820714
worker: send()  > 123456 bytes 1572113820721
parent: receive() 123456 bytes 1572113820722
parent: receive() 123456 bytes 1572113820725
worker: send()  > 123456 bytes 1572113820725
parent: receive() 123456 bytes 1572113820727
Alter Pro
quelle
Das Definieren der Blockierungsanweisung direkt im Quellcode ist eine kluge Idee. Dies führt zu einem Engpass, der nicht behoben werden kann. Der Grund dafür ist, dass der Quellcode auf der Festplatte gespeichert ist, was es problematisch macht, eine Regelengine zu verwenden, um das Verhalten im laufenden Betrieb zu ändern.
Manuel Rodriguez