Ich habe große Probleme damit, zu verstehen, wie die Multiprocessing-Warteschlange in Python funktioniert und wie sie implementiert wird. Nehmen wir an, ich habe zwei Python-Module, die auf Daten aus einer gemeinsam genutzten Datei zugreifen. Nennen wir diese beiden Module einen Writer und einen Reader. Mein Plan ist es, dass sowohl der Leser als auch der Schreiber Anforderungen in zwei separate Mehrfachverarbeitungswarteschlangen stellen und diese Anforderungen dann von einem dritten Prozess in einer Schleife platzieren und als solche ausführen.
Mein Hauptproblem ist, dass ich wirklich nicht weiß, wie multiprocessing.queue korrekt implementiert wird. Sie können das Objekt nicht für jeden Prozess instanziieren, da es sich um separate Warteschlangen handelt. Wie stellen Sie sicher, dass sich alle Prozesse auf eine gemeinsam genutzte Warteschlange beziehen (oder in diesem Fall Warteschlangen)
Antworten:
Dies ist ein einfaches Beispiel für einen Leser und einen Schreiber, die sich eine einzelne Warteschlange teilen ... Der Schreiber sendet eine Reihe von Ganzzahlen an den Leser. Wenn dem Schreiber die Zahlen ausgehen, sendet er 'DONE', wodurch der Leser weiß, dass er aus der Leseschleife ausbrechen kann.
quelle
in "
from queue import Queue
" wird kein Modul aufgerufenqueue
, sondernmultiprocessing
sollte verwendet werden. Daher sollte es wie "from multiprocessing import Queue
" aussehenquelle
multiprocessing.Queue
korrekt. Das NormalQueue.Queue
wird für Python- Threads verwendet . Wenn Sie versuchen, dieQueue.Queue
Mehrfachverarbeitung zu verwenden, werden in jedem untergeordneten Prozess Kopien des Warteschlangenobjekts erstellt und die untergeordneten Prozesse werden niemals aktualisiert. Funktioniert grundsätzlichQueue.Queue
mit einem globalen gemeinsam genutzten Objekt undmultiprocessing.Queue
mit IPC. Siehe: stackoverflow.com/questions/925100/…Hier ist eine kinderleichte Verwendung von
multiprocessing.Queue
undmultiprocessing.Process
ermöglicht es Anrufern, ein "Ereignis" plus Argumente an einen separaten Prozess zu senden, der das Ereignis an eine "do_" -Methode des Prozesses sendet. (Python 3.4+)Verwendung:
Das
send
passiert im übergeordneten Prozess, dasdo_*
passiert im untergeordneten Prozess.Ich habe jede Ausnahmebehandlung ausgelassen, die offensichtlich die Ausführungsschleife unterbrechen und den untergeordneten Prozess beenden würde. Sie können es auch anpassen, indem Sie es überschreiben
run
, um das Blockieren oder was auch immer zu steuern.Dies ist wirklich nur in Situationen nützlich, in denen Sie einen einzelnen Arbeitsprozess haben, aber ich denke, es ist eine relevante Antwort auf diese Frage, um ein allgemeines Szenario mit etwas mehr Objektorientierung zu demonstrieren.
quelle
Wir haben zwei Versionen davon implementiert, eine einfache Multi- Thread- Pool, die viele Arten von Callables ausführen kann, was unser Leben erheblich erleichtert, und die zweite Version, die Prozesse verwendet , die in Bezug auf Callables weniger flexibel sind und einen zusätzlichen Aufruf zum Dill erfordern.
Wenn Sie gefroren_pool auf true setzen, wird die Ausführung eingefroren, bis finish_pool_queue in einer der Klassen aufgerufen wird.
Thread-Version:
Prozessversion:
Rufen Sie an mit:
oder
quelle
Ich habe mir mehrere Antworten über den Stapelüberlauf und das Web hinweg angesehen, während ich versucht habe, eine Methode für die Mehrfachverarbeitung mithilfe von Warteschlangen für die Weitergabe großer Pandas-Datenrahmen einzurichten. Es schien mir, dass jede Antwort die gleiche Art von Lösungen wiederholte, ohne die Vielzahl von Randfällen zu berücksichtigen, auf die man bei der Erstellung solcher Berechnungen definitiv stoßen wird. Das Problem ist, dass viele Dinge gleichzeitig im Spiel sind. Die Anzahl der Aufgaben, die Anzahl der Mitarbeiter, die Dauer jeder Aufgabe und mögliche Ausnahmen während der Ausführung der Aufgabe. All dies macht die Synchronisation schwierig und die meisten Antworten beziehen sich nicht darauf, wie Sie vorgehen können. Das ist also meine Einstellung, nachdem ich ein paar Stunden herumgespielt habe. Hoffentlich ist dies allgemein genug, damit die meisten Leute es nützlich finden.
Einige Gedanken vor Codierungsbeispielen. Da
queue.Empty
oderqueue.qsize()
oder ein anderes ähnliches Verfahren für die Flusskontrolle unzuverlässig ist, kann jeder Code dergleichen verwendet werdenist falsch. Dies wird den Arbeiter töten, selbst wenn Millisekunden später eine andere Aufgabe in der Warteschlange auftaucht. Der Arbeiter wird sich nicht erholen und nach einer Weile verschwinden ALLE Arbeiter, da sie die Warteschlange zufällig für einen Moment leer finden. Das Endergebnis ist, dass die Haupt-Multiprozessor-Funktion (die mit dem Join () für die Prozesse) zurückgegeben wird, ohne dass alle Aufgaben abgeschlossen sind. Nett. Viel Glück beim Debuggen, wenn Sie Tausende von Aufgaben haben und einige fehlen.
Das andere Problem ist die Verwendung von Sentinel-Werten. Viele Leute haben vorgeschlagen, einen Sentinel-Wert in die Warteschlange aufzunehmen, um das Ende der Warteschlange zu kennzeichnen. Aber um es genau wem zu kennzeichnen? Wenn es N Worker gibt, unter der Annahme, dass N die Anzahl der verfügbaren Kerne ist, die geben oder nehmen, markiert ein einzelner Sentinel-Wert nur das Ende der Warteschlange für einen Worker. Alle anderen Arbeiter werden sitzen und auf weitere Arbeit warten, wenn keine mehr übrig ist. Typische Beispiele, die ich gesehen habe, sind
Ein Mitarbeiter erhält den Sentinel-Wert, während der Rest auf unbestimmte Zeit wartet. In keinem Beitrag, auf den ich gestoßen bin, wurde erwähnt, dass Sie den Sentinel-Wert mindestens so oft an die Warteschlange senden müssen, wie Sie Mitarbeiter haben, damit ALLE ihn erhalten.
Das andere Problem ist die Behandlung von Ausnahmen während der Taskausführung. Auch diese sollten gefangen und verwaltet werden. Wenn Sie eine
completed_tasks
Warteschlange haben, sollten Sie außerdem unabhängig und deterministisch zählen, wie viele Elemente sich in der Warteschlange befinden, bevor Sie entscheiden, dass die Aufgabe erledigt ist. Das Verlassen auf Warteschlangengrößen schlägt erneut fehl und gibt unerwartete Ergebnisse zurück.Im folgenden Beispiel
par_proc()
erhält die Funktion eine Liste von Aufgaben, einschließlich der Funktionen, mit denen diese Aufgaben ausgeführt werden sollen, sowie benannte Argumente und Werte.Und hier ist ein Test, gegen den der obige Code ausgeführt werden kann
plus eine andere mit einigen Ausnahmen
Hoffe das ist hilfreich.
quelle
Ich habe gerade ein einfaches und allgemeines Beispiel für die Demonstration der Weitergabe einer Nachricht über eine Warteschlange zwischen zwei eigenständigen Programmen erstellt. Es beantwortet die Frage des OP nicht direkt, sollte aber klar genug sein, um das Konzept anzugeben.
Server:
multiprocessing-queue-manager-server.py
Klient:
multiprocessing-queue-manager-client.py
Verwendung
Server:
N
ist eine Ganzzahl, die angibt, wie viele Server erstellt werden sollen. Kopieren Sie eine der<server-address-N>
Ausgaben des Servers und machen Sie sie zum ersten Argumentmultiprocessing-queue-manager-client.py
.Klient:
Ergebnis
Server:
Inhalt: https://gist.github.com/89062d639e40110c61c2f88018a8b0e5
UPD : ein Paket Erstellt hier .
Server:
Klient:
quelle