Wenn ich so etwas wie:
from multiprocessing import Pool
p = Pool(5)
def f(x):
return x*x
p.map(f, [1,2,3])
es funktioniert gut. Setzen Sie dies jedoch als Funktion einer Klasse:
class calculate(object):
def run(self):
def f(x):
return x*x
p = Pool()
return p.map(f, [1,2,3])
cl = calculate()
print cl.run()
Gibt mir folgenden Fehler:
Exception in thread Thread-1:
Traceback (most recent call last):
File "/sw/lib/python2.6/threading.py", line 532, in __bootstrap_inner
self.run()
File "/sw/lib/python2.6/threading.py", line 484, in run
self.__target(*self.__args, **self.__kwargs)
File "/sw/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
Ich habe einen Beitrag von Alex Martelli gesehen, der sich mit der gleichen Art von Problem befasst, aber er war nicht explizit genug.
python
multiprocessing
pickle
Mermoz
quelle
quelle
IPython.Parallel
, aber dort konnte man das Problem umgehen, indem man die Objekte auf die Knoten schob. Es scheint ziemlich ärgerlich, dieses Problem mit Multiprocessing zu umgehen.calculate
ist picklable, also scheint es, dass dies gelöst werden kann, indem 1) ein Funktionsobjekt mit einem Konstruktor erstellt wird, der über einecalculate
Instanz kopiert, und dann 2) eine Instanz dieses Funktionsobjekts anPool
diemap
Methode übergeben wird. Nein?multiprocessing
Moduls sind auf das Ziel einer plattformübergreifenden Implementierung und das Fehlen einesfork(2)
ähnlichen Systemaufrufs in Windows zurückzuführen. Wenn Sie sich nicht für die Win32-Unterstützung interessieren, gibt es möglicherweise eine einfachere prozessbasierte Problemumgehung. Oder wenn Sie bereit Threads anstelle von Prozessen zu verwenden, können Sie ersetzenfrom multiprocessing import Pool
mitfrom multiprocessing.pool import ThreadPool as Pool
.Antworten:
Ich war auch verärgert über Einschränkungen, welche Art von Funktionen pool.map akzeptieren konnte. Ich habe folgendes geschrieben, um dies zu umgehen. Es scheint zu funktionieren, auch für die rekursive Verwendung von Parmap.
quelle
Ich konnte die bisher veröffentlichten Codes nicht verwenden, da die Codes, die "multiprocessing.Pool" verwenden, nicht mit Lambda-Ausdrücken funktionieren und die Codes, die "multiprocessing.Pool" nicht verwenden, so viele Prozesse erzeugen, wie es Arbeitselemente gibt.
Ich habe den Code so angepasst, dass er eine vordefinierte Anzahl von Arbeitern erzeugt und die Eingabeliste nur dann durchläuft, wenn ein inaktiver Arbeiter vorhanden ist. Ich habe auch den "Daemon" -Modus für die Worker aktiviert. Strg-c funktioniert wie erwartet.
quelle
parmap
Funktion richtig zu arbeiten ?(None, None)
als letztes Element übergeben wird, zeigt an,fun
dass das Ende der Elementsequenz für jeden Prozess erreicht wurde.Multiprocessing und Pickling sind fehlerhaft und begrenzt, es sei denn, Sie springen außerhalb der Standardbibliothek.
Wenn Sie eine Verzweigung von
multiprocessing
aufgerufen verwendenpathos.multiprocesssing
, können Sie Klassen und Klassenmethoden direkt in denmap
Funktionen der Mehrfachverarbeitung verwenden. Dies liegt darandill
, dass anstelle vonpickle
oder verwendetcPickle
wird unddill
fast alles in Python serialisiert werden kann.pathos.multiprocessing
auch bietet eine asynchrone Map - Funktion ... und es könnenmap
Funktionen mit mehreren Argumenten (zBmap(math.pow, [1,2,3], [4,5,6])
)Siehe Diskussionen: Was können Multiprocessing und Dill zusammen tun?
und: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization
Es verarbeitet sogar den Code, den Sie ursprünglich geschrieben haben, ohne Änderungen und vom Interpreter. Warum etwas anderes tun, das fragiler und spezifischer für einen einzelnen Fall ist?
Den Code erhalten Sie hier: https://github.com/uqfoundation/pathos
Und um ein bisschen mehr zu zeigen, was es kann:
quelle
amap
), die die Verwendung von Fortschrittsbalken und anderer asynchroner Programmierung ermöglicht.Could not find a version that satisfies the requirement pp==1.5.7-pathos (from pathos)
multiprocess
wobei es zu 2/3 kompatibel ist. Siehe stackoverflow.com/questions/27873093/… und pypi.python.org/pypi/multiprocess .pathos
hat eine neue stabile Version und ist auch 2.x und 3.x kompatibel.Soweit ich weiß, gibt es derzeit keine Lösung für Ihr Problem: Auf die Funktion, die Sie vergeben,
map()
muss über einen Import Ihres Moduls zugegriffen werden können. Aus diesem Grund funktioniert der Code von robert: Die Funktionf()
kann durch Importieren des folgenden Codes erhalten werden:Ich habe tatsächlich einen "Haupt" -Abschnitt hinzugefügt, da dieser den Empfehlungen für die Windows-Plattform folgt ("Stellen Sie sicher, dass das Hauptmodul von einem neuen Python-Interpreter sicher importiert werden kann, ohne unbeabsichtigte Nebenwirkungen zu verursachen").
Ich habe auch einen Großbuchstaben vor hinzugefügt
Calculate
, um PEP 8 zu folgen . :) :)quelle
Die Lösung von mrule ist korrekt, weist jedoch einen Fehler auf: Wenn das Kind eine große Datenmenge zurücksendet, kann es den Pipe-Puffer füllen und den Puffer des Kindes blockieren
pipe.send()
, während das Elternteil darauf wartet, dass das Kind beendet wirdpipe.join()
. Die Lösung besteht darin, die Datenjoin()
des Kindes vor dem Kind zu lesen . Darüber hinaus sollte das Kind das Rohrende des Elternteils schließen, um einen Deadlock zu verhindern. Der folgende Code behebt das. Beachten Sie auch, dass dadurchparmap
ein Prozess pro Element in erstellt wirdX
. Eine fortgeschrittenere Lösung besteht darinmultiprocessing.cpu_count()
, sieX
in mehrere Blöcke aufzuteilen und die Ergebnisse vor der Rückkehr zusammenzuführen. Ich überlasse das dem Leser als Übung, um die Prägnanz der netten Antwort von mrule nicht zu verderben. ;)quelle
OSError: [Errno 24] Too many open files
. Ich denke, es muss eine Art Begrenzung für die Anzahl der Prozesse geben, damit es richtig funktioniert ...Ich habe auch damit zu kämpfen. Ich hatte Funktionen als Datenelemente einer Klasse, als vereinfachtes Beispiel:
Ich musste die Funktion self.f in einem Pool.map () -Aufruf innerhalb derselben Klasse verwenden, und self.f nahm kein Tupel als Argument. Da diese Funktion in eine Klasse eingebettet war, war mir nicht klar, wie ich den Wrapper-Typ schreiben sollte, den andere Antworten vorgeschlagen hatten.
Ich habe dieses Problem gelöst, indem ich einen anderen Wrapper verwendet habe, der ein Tupel / eine Liste verwendet, wobei das erste Element die Funktion ist und die verbleibenden Elemente die Argumente für diese Funktion sind, die als eval_func_tuple (f_args) bezeichnet werden. Auf diese Weise kann die problematische Zeile durch return pool.map (eval_func_tuple, itertools.izip (itertools.repeat (self.f), list1, list2)) ersetzt werden. Hier ist der vollständige Code:
Datei: util.py
Datei: main.py.
Wenn Sie main.py ausführen, erhalten Sie [11, 22, 33]. Sie können dies jederzeit verbessern. Beispielsweise kann eval_func_tuple auch so geändert werden, dass Schlüsselwortargumente verwendet werden.
In einer anderen Antwort, in einer anderen Antwort, kann die Funktion "Parmap" für den Fall von mehr Prozessen effizienter gemacht werden als die Anzahl der verfügbaren CPUs. Ich kopiere unten eine bearbeitete Version. Dies ist mein erster Beitrag und ich war mir nicht sicher, ob ich die ursprüngliche Antwort direkt bearbeiten sollte. Ich habe auch einige Variablen umbenannt.
quelle
Ich nahm die Antwort von klaus se und aganders3 und erstellte ein dokumentiertes Modul, das besser lesbar ist und in einer Datei enthalten ist. Sie können es einfach zu Ihrem Projekt hinzufügen. Es hat sogar einen optionalen Fortschrittsbalken!
EDIT : @ alexander-mcfarlane Vorschlag und eine Testfunktion hinzugefügt
quelle
join()
gleichzeitig ausgeführt und Sie erhalten nur einen Blitz von100%
abgeschlossen imtqdm
Display. Das einzige Mal, wenn es nützlich ist, ist, wenn jeder Prozessor eine voreingenommene Arbeitslast hattqdm()
, um die Linie zu wickeln:result = [q_out.get() for _ in tqdm(sent)]
und es funktioniert viel besser - große Anstrengung, obwohl wirklich zu schätzen, so +1Ich weiß, dass dies vor über 6 Jahren gefragt wurde, wollte aber nur meine Lösung hinzufügen, da einige der obigen Vorschläge schrecklich kompliziert erscheinen, aber meine Lösung war eigentlich sehr einfach.
Ich musste lediglich den Aufruf pool.map () in eine Hilfsfunktion einbinden. Übergeben des Klassenobjekts zusammen mit Argumenten für die Methode als Tupel, das ein bisschen so aussah.
quelle
In Klassen definierte Funktionen (auch innerhalb von Funktionen innerhalb von Klassen) sind nicht wirklich hilfreich. Dies funktioniert jedoch:
quelle
Ich weiß, dass diese Frage vor 8 Jahren und 10 Monaten gestellt wurde, aber ich möchte Ihnen meine Lösung vorstellen:
Sie müssen nur Ihre Klasse in eine statische Methode umwandeln. Es ist aber auch mit einer Klassenmethode möglich:
Getestet in Python 3.7.3
quelle
Ich habe die Methode von klaus se geändert, weil sie bei kleinen Listen hängen blieb, wenn die Anzahl der Elemente ~ 1000 oder mehr betrug. Anstatt die Jobs einzeln mit der
None
Stoppbedingung zu verschieben, lade ich die Eingabewarteschlange auf einmal hoch und lasse die Prozesse nur daran arbeiten, bis sie leer sind.Bearbeiten: Leider tritt jetzt auf meinem System dieser Fehler auf: Das maximale Limit für die Multiprocessing Queue beträgt 32767 , hoffentlich helfen die Problemumgehungen dort.
quelle
Sie können Ihren Code ohne Probleme ausführen, wenn Sie das
Pool
Objekt aus der Liste der Objekte in der Klasse manuell ignorieren, da es nicht in derpickle
Lage ist, wie der Fehler besagt. Sie können dies mit der__getstate__
Funktion (siehe auch hier ) wie folgt tun . DasPool
Objekt wird versuchen , die zu finden__getstate__
und__setstate__
Funktionen und sie ausführen , wenn er sie findet , wenn Sie laufenmap
,map_async
etc:Dann mach:
gibt Ihnen die Ausgabe:
Ich habe den obigen Code in Python 3.x getestet und es funktioniert.
quelle
Ich bin mir nicht sicher, ob dieser Ansatz gewählt wurde, aber eine Arbeit, die ich verwende, ist:
Die Ausgabe sollte sein:
quelle
Es besteht die Möglichkeit, dass Sie diese Funktion für jede unterschiedliche Instanz der Klasse anwenden möchten. Dann ist hier auch die Lösung dafür
quelle
Hier ist meine Lösung, die ich für etwas weniger hackisch halte als die meisten anderen hier. Es ist ähnlich wie bei Nightowl.
quelle
Von http://www.rueckstiess.net/research/snippets/show/ca1d7d90 und http://qingkaikong.blogspot.com/2016/12/python-parallel-method-in-class.html
Wir können eine externe Funktion erstellen und sie mit dem Klassen-Selbstobjekt festlegen:
ODER ohne Joblib:
quelle
Dies ist vielleicht keine sehr gute Lösung, aber in meinem Fall löse ich sie so.
Ich musste zu
self
meiner Funktion übergehen , da ich über diese Funktion auf Attribute und Funktionen meiner Klasse zugreifen muss. Das funktioniert bei mir. Korrekturen und Vorschläge sind immer willkommen.quelle
Hier ist ein Boilerplate, das ich für die Verwendung von Multiprocessing Pool in Python3 geschrieben habe. Insbesondere Python3.7.7 wurde zum Ausführen der Tests verwendet. Ich habe meine schnellsten Läufe mit
imap_unordered
. Schließen Sie einfach Ihr Szenario an und probieren Sie es aus. Sie können verwendentimeit
oder nurtime.time()
um herauszufinden, welche für Sie am besten funktioniert.Im obigen Szenario
imap_unordered
scheint es tatsächlich das schlechteste für mich zu sein. Probieren Sie Ihren Fall aus und vergleichen Sie ihn mit der Maschine, auf der Sie ihn ausführen möchten. Informieren Sie sich auch über Prozesspools . Prost!quelle