Multiprocessing: Wie verwende ich Pool.map für eine in einer Klasse definierte Funktion?

178

Wenn ich so etwas wie:

from multiprocessing import Pool

p = Pool(5)
def f(x):
     return x*x

p.map(f, [1,2,3])

es funktioniert gut. Setzen Sie dies jedoch als Funktion einer Klasse:

class calculate(object):
    def run(self):
        def f(x):
            return x*x

        p = Pool()
        return p.map(f, [1,2,3])

cl = calculate()
print cl.run()

Gibt mir folgenden Fehler:

Exception in thread Thread-1:
Traceback (most recent call last):
  File "/sw/lib/python2.6/threading.py", line 532, in __bootstrap_inner
    self.run()
  File "/sw/lib/python2.6/threading.py", line 484, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/sw/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

Ich habe einen Beitrag von Alex Martelli gesehen, der sich mit der gleichen Art von Problem befasst, aber er war nicht explizit genug.

Mermoz
quelle
1
"das als Funktion einer Klasse"? Können Sie den Code posten, der tatsächlich den tatsächlichen Fehler erhält? Ohne den eigentlichen Code können wir nur raten, was Sie falsch machen.
S.Lott
Generell gibt es Beizmodule, die leistungsfähiger sind als das Standard- Beizmodul von Python (wie das in dieser Antwort erwähnte Picloud- Modul ).
klaus se
1
Ich hatte ein ähnliches Problem mit Schließungen in IPython.Parallel, aber dort konnte man das Problem umgehen, indem man die Objekte auf die Knoten schob. Es scheint ziemlich ärgerlich, dieses Problem mit Multiprocessing zu umgehen.
Alex S
Hier calculateist picklable, also scheint es, dass dies gelöst werden kann, indem 1) ein Funktionsobjekt mit einem Konstruktor erstellt wird, der über eine calculateInstanz kopiert, und dann 2) eine Instanz dieses Funktionsobjekts an Pooldie mapMethode übergeben wird. Nein?
Rd11
1
@math Ich glaube nicht, dass eine der "letzten Änderungen" von Python hilfreich sein wird. Einige Einschränkungen des multiprocessingModuls sind auf das Ziel einer plattformübergreifenden Implementierung und das Fehlen eines fork(2)ähnlichen Systemaufrufs in Windows zurückzuführen. Wenn Sie sich nicht für die Win32-Unterstützung interessieren, gibt es möglicherweise eine einfachere prozessbasierte Problemumgehung. Oder wenn Sie bereit Threads anstelle von Prozessen zu verwenden, können Sie ersetzen from multiprocessing import Poolmit from multiprocessing.pool import ThreadPool as Pool.
Aya

Antworten:

69

Ich war auch verärgert über Einschränkungen, welche Art von Funktionen pool.map akzeptieren konnte. Ich habe folgendes geschrieben, um dies zu umgehen. Es scheint zu funktionieren, auch für die rekursive Verwendung von Parmap.

from multiprocessing import Process, Pipe
from itertools import izip

def spawn(f):
    def fun(pipe, x):
        pipe.send(f(x))
        pipe.close()
    return fun

def parmap(f, X):
    pipe = [Pipe() for x in X]
    proc = [Process(target=spawn(f), args=(c, x)) for x, (p, c) in izip(X, pipe)]
    [p.start() for p in proc]
    [p.join() for p in proc]
    return [p.recv() for (p, c) in pipe]

if __name__ == '__main__':
    print parmap(lambda x: x**x, range(1, 5))
mrule
quelle
1
Das hat bei mir sehr gut funktioniert, danke. Ich habe eine Schwachstelle gefunden: Ich habe versucht, Parmap für einige Funktionen zu verwenden, die ein Standarddikt umgaben, und habe den PicklingError erneut erhalten. Ich habe keine Lösung dafür gefunden, sondern nur meinen Code überarbeitet, um das Standarddiktat nicht zu verwenden.
sans
2
Dies funktioniert nicht in Python 2.7.2 (Standard, 12. Juni 2011, 15:08:59) [MSC v.1500 32 Bit (Intel)] auf win32
ubershmekel
3
Dies funktioniert unter Python 2.7.3 Aug 1,2012, 05:14:39. Dies funktioniert nicht bei riesigen Iterables -> es verursacht einen OSError: [Errno 24] Zu viele geöffnete Dateien aufgrund der Anzahl der geöffneten Pipes.
Eiyrioü von Kauyf
Diese Lösung erzeugt einen Prozess für jedes Arbeitselement. Die unten stehende Lösung von "klaus se" ist effizienter.
Ypnos
84

Ich konnte die bisher veröffentlichten Codes nicht verwenden, da die Codes, die "multiprocessing.Pool" verwenden, nicht mit Lambda-Ausdrücken funktionieren und die Codes, die "multiprocessing.Pool" nicht verwenden, so viele Prozesse erzeugen, wie es Arbeitselemente gibt.

Ich habe den Code so angepasst, dass er eine vordefinierte Anzahl von Arbeitern erzeugt und die Eingabeliste nur dann durchläuft, wenn ein inaktiver Arbeiter vorhanden ist. Ich habe auch den "Daemon" -Modus für die Worker aktiviert. Strg-c funktioniert wie erwartet.

import multiprocessing


def fun(f, q_in, q_out):
    while True:
        i, x = q_in.get()
        if i is None:
            break
        q_out.put((i, f(x)))


def parmap(f, X, nprocs=multiprocessing.cpu_count()):
    q_in = multiprocessing.Queue(1)
    q_out = multiprocessing.Queue()

    proc = [multiprocessing.Process(target=fun, args=(f, q_in, q_out))
            for _ in range(nprocs)]
    for p in proc:
        p.daemon = True
        p.start()

    sent = [q_in.put((i, x)) for i, x in enumerate(X)]
    [q_in.put((None, None)) for _ in range(nprocs)]
    res = [q_out.get() for _ in range(len(sent))]

    [p.join() for p in proc]

    return [x for i, x in sorted(res)]


if __name__ == '__main__':
    print(parmap(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8]))
klaus se
quelle
2
Wie würden Sie einen Fortschrittsbalken erhalten, um mit dieser parmapFunktion richtig zu arbeiten ?
Schockbrenner
2
Eine Frage - Ich habe diese Lösung verwendet, aber festgestellt, dass die von mir erzeugten Python-Prozesse im Speicher aktiv blieben. Haben Sie einen kurzen Gedanken darüber, wie Sie diese töten können, wenn Ihre Parmap beendet wird?
CompEcon
1
@ klaus-se Ich weiß, wir werden davon abgehalten, uns nur in Kommentaren zu bedanken, aber deine Antwort ist einfach zu wertvoll für mich, ich konnte nicht widerstehen. Ich wünschte, ich könnte Ihnen mehr als nur einen Ruf geben ...
Deshtop
2
@greole, das (None, None)als letztes Element übergeben wird, zeigt an, fundass das Ende der Elementsequenz für jeden Prozess erreicht wurde.
aganders3
4
@deshtop: Sie können mit einem Kopfgeld, wenn Sie selbst genug Ruf haben :-)
Mark
57

Multiprocessing und Pickling sind fehlerhaft und begrenzt, es sei denn, Sie springen außerhalb der Standardbibliothek.

Wenn Sie eine Verzweigung von multiprocessingaufgerufen verwenden pathos.multiprocesssing, können Sie Klassen und Klassenmethoden direkt in den mapFunktionen der Mehrfachverarbeitung verwenden. Dies liegt daran dill, dass anstelle von pickleoder verwendet cPicklewird und dillfast alles in Python serialisiert werden kann.

pathos.multiprocessingauch bietet eine asynchrone Map - Funktion ... und es können mapFunktionen mit mehreren Argumenten (zB map(math.pow, [1,2,3], [4,5,6]))

Siehe Diskussionen: Was können Multiprocessing und Dill zusammen tun?

und: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization

Es verarbeitet sogar den Code, den Sie ursprünglich geschrieben haben, ohne Änderungen und vom Interpreter. Warum etwas anderes tun, das fragiler und spezifischer für einen einzelnen Fall ist?

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> class calculate(object):
...  def run(self):
...   def f(x):
...    return x*x
...   p = Pool()
...   return p.map(f, [1,2,3])
... 
>>> cl = calculate()
>>> print cl.run()
[1, 4, 9]

Den Code erhalten Sie hier: https://github.com/uqfoundation/pathos

Und um ein bisschen mehr zu zeigen, was es kann:

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> 
>>> p = Pool(4)
>>> 
>>> def add(x,y):
...   return x+y
... 
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>> 
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>> 
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> 
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>> 
>>> res = p.amap(t.plus, x, y)
>>> res.get()
[4, 6, 8, 10]
Mike McKerns
quelle
1
pathos.multiprocessing verfügt auch über eine asynchrone Zuordnung ( amap), die die Verwendung von Fortschrittsbalken und anderer asynchroner Programmierung ermöglicht.
Mike McKerns
Ich mag pathos.multiprocessing, das fast als Ersatz für nicht parallele Karten dienen kann, während ich die Multiprocessing-Funktion genieße. Ich habe einen einfachen Wrapper von pathos.multiprocessing.map, so dass es speichereffizienter ist, wenn eine schreibgeschützte große Datenstruktur über mehrere Kerne hinweg verarbeitet wird. Weitere Informationen finden Sie in diesem Git-Repository .
Fashandge
Scheint interessant, wird aber nicht installiert. Dies ist die Nachricht, die pip gibt:Could not find a version that satisfies the requirement pp==1.5.7-pathos (from pathos)
xApple
1
Ja. Ich habe seit einiger Zeit nicht mehr veröffentlicht, da ich die Funktionalität in separate Pakete aufgeteilt und auch in 2/3 kompatiblen Code konvertiert habe. Vieles davon wurde modularisiert, multiprocesswobei es zu 2/3 kompatibel ist. Siehe stackoverflow.com/questions/27873093/… und pypi.python.org/pypi/multiprocess .
Mike McKerns
3
@xApple: Genau wie ein Follow-up, pathoshat eine neue stabile Version und ist auch 2.x und 3.x kompatibel.
Mike McKerns
40

Soweit ich weiß, gibt es derzeit keine Lösung für Ihr Problem: Auf die Funktion, die Sie vergeben, map()muss über einen Import Ihres Moduls zugegriffen werden können. Aus diesem Grund funktioniert der Code von robert: Die Funktion f()kann durch Importieren des folgenden Codes erhalten werden:

def f(x):
    return x*x

class Calculate(object):
    def run(self):
        p = Pool()
        return p.map(f, [1,2,3])

if __name__ == '__main__':
    cl = Calculate()
    print cl.run()

Ich habe tatsächlich einen "Haupt" -Abschnitt hinzugefügt, da dieser den Empfehlungen für die Windows-Plattform folgt ("Stellen Sie sicher, dass das Hauptmodul von einem neuen Python-Interpreter sicher importiert werden kann, ohne unbeabsichtigte Nebenwirkungen zu verursachen").

Ich habe auch einen Großbuchstaben vor hinzugefügt Calculate, um PEP 8 zu folgen . :) :)

Eric O Lebigot
quelle
18

Die Lösung von mrule ist korrekt, weist jedoch einen Fehler auf: Wenn das Kind eine große Datenmenge zurücksendet, kann es den Pipe-Puffer füllen und den Puffer des Kindes blockieren pipe.send(), während das Elternteil darauf wartet, dass das Kind beendet wird pipe.join(). Die Lösung besteht darin, die Daten join()des Kindes vor dem Kind zu lesen . Darüber hinaus sollte das Kind das Rohrende des Elternteils schließen, um einen Deadlock zu verhindern. Der folgende Code behebt das. Beachten Sie auch, dass dadurch parmapein Prozess pro Element in erstellt wird X. Eine fortgeschrittenere Lösung besteht darin multiprocessing.cpu_count(), sie Xin mehrere Blöcke aufzuteilen und die Ergebnisse vor der Rückkehr zusammenzuführen. Ich überlasse das dem Leser als Übung, um die Prägnanz der netten Antwort von mrule nicht zu verderben. ;)

from multiprocessing import Process, Pipe
from itertools import izip

def spawn(f):
    def fun(ppipe, cpipe,x):
        ppipe.close()
        cpipe.send(f(x))
        cpipe.close()
    return fun

def parmap(f,X):
    pipe=[Pipe() for x in X]
    proc=[Process(target=spawn(f),args=(p,c,x)) for x,(p,c) in izip(X,pipe)]
    [p.start() for p in proc]
    ret = [p.recv() for (p,c) in pipe]
    [p.join() for p in proc]
    return ret

if __name__ == '__main__':
    print parmap(lambda x:x**x,range(1,5))
Bob McElrath
quelle
Wie wählen Sie die Anzahl der Prozesse?
Patapouf_ai
Es stirbt jedoch ziemlich schnell aufgrund des Fehlers OSError: [Errno 24] Too many open files. Ich denke, es muss eine Art Begrenzung für die Anzahl der Prozesse geben, damit es richtig funktioniert ...
patapouf_ai
13

Ich habe auch damit zu kämpfen. Ich hatte Funktionen als Datenelemente einer Klasse, als vereinfachtes Beispiel:

from multiprocessing import Pool
import itertools
pool = Pool()
class Example(object):
    def __init__(self, my_add): 
        self.f = my_add  
    def add_lists(self, list1, list2):
        # Needed to do something like this (the following line won't work)
        return pool.map(self.f,list1,list2)  

Ich musste die Funktion self.f in einem Pool.map () -Aufruf innerhalb derselben Klasse verwenden, und self.f nahm kein Tupel als Argument. Da diese Funktion in eine Klasse eingebettet war, war mir nicht klar, wie ich den Wrapper-Typ schreiben sollte, den andere Antworten vorgeschlagen hatten.

Ich habe dieses Problem gelöst, indem ich einen anderen Wrapper verwendet habe, der ein Tupel / eine Liste verwendet, wobei das erste Element die Funktion ist und die verbleibenden Elemente die Argumente für diese Funktion sind, die als eval_func_tuple (f_args) bezeichnet werden. Auf diese Weise kann die problematische Zeile durch return pool.map (eval_func_tuple, itertools.izip (itertools.repeat (self.f), list1, list2)) ersetzt werden. Hier ist der vollständige Code:

Datei: util.py

def add(a, b): return a+b

def eval_func_tuple(f_args):
    """Takes a tuple of a function and args, evaluates and returns result"""
    return f_args[0](*f_args[1:])  

Datei: main.py.

from multiprocessing import Pool
import itertools
import util  

pool = Pool()
class Example(object):
    def __init__(self, my_add): 
        self.f = my_add  
    def add_lists(self, list1, list2):
        # The following line will now work
        return pool.map(util.eval_func_tuple, 
            itertools.izip(itertools.repeat(self.f), list1, list2)) 

if __name__ == '__main__':
    myExample = Example(util.add)
    list1 = [1, 2, 3]
    list2 = [10, 20, 30]
    print myExample.add_lists(list1, list2)  

Wenn Sie main.py ausführen, erhalten Sie [11, 22, 33]. Sie können dies jederzeit verbessern. Beispielsweise kann eval_func_tuple auch so geändert werden, dass Schlüsselwortargumente verwendet werden.

In einer anderen Antwort, in einer anderen Antwort, kann die Funktion "Parmap" für den Fall von mehr Prozessen effizienter gemacht werden als die Anzahl der verfügbaren CPUs. Ich kopiere unten eine bearbeitete Version. Dies ist mein erster Beitrag und ich war mir nicht sicher, ob ich die ursprüngliche Antwort direkt bearbeiten sollte. Ich habe auch einige Variablen umbenannt.

from multiprocessing import Process, Pipe  
from itertools import izip  

def spawn(f):  
    def fun(pipe,x):  
        pipe.send(f(x))  
        pipe.close()  
    return fun  

def parmap(f,X):  
    pipe=[Pipe() for x in X]  
    processes=[Process(target=spawn(f),args=(c,x)) for x,(p,c) in izip(X,pipe)]  
    numProcesses = len(processes)  
    processNum = 0  
    outputList = []  
    while processNum < numProcesses:  
        endProcessNum = min(processNum+multiprocessing.cpu_count(), numProcesses)  
        for proc in processes[processNum:endProcessNum]:  
            proc.start()  
        for proc in processes[processNum:endProcessNum]:  
            proc.join()  
        for proc,c in pipe[processNum:endProcessNum]:  
            outputList.append(proc.recv())  
        processNum = endProcessNum  
    return outputList    

if __name__ == '__main__':  
    print parmap(lambda x:x**x,range(1,5))         
Brandt
quelle
8

Ich nahm die Antwort von klaus se und aganders3 und erstellte ein dokumentiertes Modul, das besser lesbar ist und in einer Datei enthalten ist. Sie können es einfach zu Ihrem Projekt hinzufügen. Es hat sogar einen optionalen Fortschrittsbalken!

"""
The ``processes`` module provides some convenience functions
for using parallel processes in python.

Adapted from http://stackoverflow.com/a/16071616/287297

Example usage:

    print prll_map(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8], 32, verbose=True)

Comments:

"It spawns a predefined amount of workers and only iterates through the input list
 if there exists an idle worker. I also enabled the "daemon" mode for the workers so
 that KeyboardInterupt works as expected."

Pitfalls: all the stdouts are sent back to the parent stdout, intertwined.

Alternatively, use this fork of multiprocessing: 
https://github.com/uqfoundation/multiprocess
"""

# Modules #
import multiprocessing
from tqdm import tqdm

################################################################################
def apply_function(func_to_apply, queue_in, queue_out):
    while not queue_in.empty():
        num, obj = queue_in.get()
        queue_out.put((num, func_to_apply(obj)))

################################################################################
def prll_map(func_to_apply, items, cpus=None, verbose=False):
    # Number of processes to use #
    if cpus is None: cpus = min(multiprocessing.cpu_count(), 32)
    # Create queues #
    q_in  = multiprocessing.Queue()
    q_out = multiprocessing.Queue()
    # Process list #
    new_proc  = lambda t,a: multiprocessing.Process(target=t, args=a)
    processes = [new_proc(apply_function, (func_to_apply, q_in, q_out)) for x in range(cpus)]
    # Put all the items (objects) in the queue #
    sent = [q_in.put((i, x)) for i, x in enumerate(items)]
    # Start them all #
    for proc in processes:
        proc.daemon = True
        proc.start()
    # Display progress bar or not #
    if verbose:
        results = [q_out.get() for x in tqdm(range(len(sent)))]
    else:
        results = [q_out.get() for x in range(len(sent))]
    # Wait for them to finish #
    for proc in processes: proc.join()
    # Return results #
    return [x for i, x in sorted(results)]

################################################################################
def test():
    def slow_square(x):
        import time
        time.sleep(2)
        return x**2
    objs    = range(20)
    squares = prll_map(slow_square, objs, 4, verbose=True)
    print "Result: %s" % squares

EDIT : @ alexander-mcfarlane Vorschlag und eine Testfunktion hinzugefügt

xApple
quelle
Ein Problem mit Ihrem Fortschrittsbalken ... Der Balken misst nur, wie ineffizient die Arbeitslast auf die Prozessoren aufgeteilt wurde. Wenn die Arbeitslast perfekt aufgeteilt ist, werden alle Prozessoren join()gleichzeitig ausgeführt und Sie erhalten nur einen Blitz von 100%abgeschlossen im tqdmDisplay. Das einzige Mal, wenn es nützlich ist, ist, wenn jeder Prozessor eine voreingenommene Arbeitslast hat
Alexander McFarlane
1
Bewegen Sie sich tqdm(), um die Linie zu wickeln: result = [q_out.get() for _ in tqdm(sent)]und es funktioniert viel besser - große Anstrengung, obwohl wirklich zu schätzen, so +1
Alexander McFarlane
Vielen Dank für diesen Rat, ich werde es versuchen und dann die Antwort aktualisieren!
xApple
Die Antwort wird aktualisiert und der Fortschrittsbalken funktioniert viel besser!
xApple
8

Ich weiß, dass dies vor über 6 Jahren gefragt wurde, wollte aber nur meine Lösung hinzufügen, da einige der obigen Vorschläge schrecklich kompliziert erscheinen, aber meine Lösung war eigentlich sehr einfach.

Ich musste lediglich den Aufruf pool.map () in eine Hilfsfunktion einbinden. Übergeben des Klassenobjekts zusammen mit Argumenten für die Methode als Tupel, das ein bisschen so aussah.

def run_in_parallel(args):
    return args[0].method(args[1])

myclass = MyClass()
method_args = [1,2,3,4,5,6]
args_map = [ (myclass, arg) for arg in method_args ]
pool = Pool()
pool.map(run_in_parallel, args_map)
Nachteule
quelle
7

In Klassen definierte Funktionen (auch innerhalb von Funktionen innerhalb von Klassen) sind nicht wirklich hilfreich. Dies funktioniert jedoch:

def f(x):
    return x*x

class calculate(object):
    def run(self):
        p = Pool()
    return p.map(f, [1,2,3])

cl = calculate()
print cl.run()
Robert
quelle
15
danke, aber ich finde es etwas schmutzig, die Funktion außerhalb der Klasse zu definieren. Die Klasse sollte alles bündeln, was sie benötigt, um eine bestimmte Aufgabe zu erfüllen.
Mermoz
3
@Memoz: "Die Klasse sollte alles bündeln, was sie braucht" Wirklich? Ich kann nicht viele Beispiele dafür finden. Die meisten Klassen hängen von anderen Klassen oder Funktionen ab. Warum eine Klassenabhängigkeit "schmutzig" nennen? Was ist falsch an einer Abhängigkeit?
S.Lott
Nun, die Funktion sollte vorhandene Klassendaten nicht ändern, da sie die Version im anderen Prozess ändern würde. Es könnte sich also um eine statische Methode handeln. Sie können eine statische Methode auswählen : stackoverflow.com/questions/1914261/… Oder für etwas so Triviales können Sie ein Lambda verwenden.
Robert
6

Ich weiß, dass diese Frage vor 8 Jahren und 10 Monaten gestellt wurde, aber ich möchte Ihnen meine Lösung vorstellen:

from multiprocessing import Pool

class Test:

    def __init__(self):
        self.main()

    @staticmethod
    def methodForMultiprocessing(x):
        print(x*x)

    def main(self):
        if __name__ == "__main__":
            p = Pool()
            p.map(Test.methodForMultiprocessing, list(range(1, 11)))
            p.close()

TestObject = Test()

Sie müssen nur Ihre Klasse in eine statische Methode umwandeln. Es ist aber auch mit einer Klassenmethode möglich:

from multiprocessing import Pool

class Test:

    def __init__(self):
        self.main()

    @classmethod
    def methodForMultiprocessing(cls, x):
        print(x*x)

    def main(self):
        if __name__ == "__main__":
            p = Pool()
            p.map(Test.methodForMultiprocessing, list(range(1, 11)))
            p.close()

TestObject = Test()

Getestet in Python 3.7.3

TornaxO7
quelle
3

Ich habe die Methode von klaus se geändert, weil sie bei kleinen Listen hängen blieb, wenn die Anzahl der Elemente ~ 1000 oder mehr betrug. Anstatt die Jobs einzeln mit der NoneStoppbedingung zu verschieben, lade ich die Eingabewarteschlange auf einmal hoch und lasse die Prozesse nur daran arbeiten, bis sie leer sind.

from multiprocessing import cpu_count, Queue, Process

def apply_func(f, q_in, q_out):
    while not q_in.empty():
        i, x = q_in.get()
        q_out.put((i, f(x)))

# map a function using a pool of processes
def parmap(f, X, nprocs = cpu_count()):
    q_in, q_out   = Queue(), Queue()
    proc = [Process(target=apply_func, args=(f, q_in, q_out)) for _ in range(nprocs)]
    sent = [q_in.put((i, x)) for i, x in enumerate(X)]
    [p.start() for p in proc]
    res = [q_out.get() for _ in sent]
    [p.join() for p in proc]

    return [x for i,x in sorted(res)]

Bearbeiten: Leider tritt jetzt auf meinem System dieser Fehler auf: Das maximale Limit für die Multiprocessing Queue beträgt 32767 , hoffentlich helfen die Problemumgehungen dort.

aganders3
quelle
1

Sie können Ihren Code ohne Probleme ausführen, wenn Sie das PoolObjekt aus der Liste der Objekte in der Klasse manuell ignorieren, da es nicht in der pickleLage ist, wie der Fehler besagt. Sie können dies mit der __getstate__Funktion (siehe auch hier ) wie folgt tun . Das PoolObjekt wird versuchen , die zu finden __getstate__und __setstate__Funktionen und sie ausführen , wenn er sie findet , wenn Sie laufen map, map_asyncetc:

class calculate(object):
    def __init__(self):
        self.p = Pool()
    def __getstate__(self):
        self_dict = self.__dict__.copy()
        del self_dict['p']
        return self_dict
    def __setstate__(self, state):
        self.__dict__.update(state)

    def f(self, x):
        return x*x
    def run(self):
        return self.p.map(self.f, [1,2,3])

Dann mach:

cl = calculate()
cl.run()

gibt Ihnen die Ausgabe:

[1, 4, 9]

Ich habe den obigen Code in Python 3.x getestet und es funktioniert.

Amir
quelle
0

Ich bin mir nicht sicher, ob dieser Ansatz gewählt wurde, aber eine Arbeit, die ich verwende, ist:

from multiprocessing import Pool

t = None

def run(n):
    return t.f(n)

class Test(object):
    def __init__(self, number):
        self.number = number

    def f(self, x):
        print x * self.number

    def pool(self):
        pool = Pool(2)
        pool.map(run, range(10))

if __name__ == '__main__':
    t = Test(9)
    t.pool()
    pool = Pool(2)
    pool.map(run, range(10))

Die Ausgabe sollte sein:

0
9
18
27
36
45
54
63
72
81
0
9
18
27
36
45
54
63
72
81
CpILL
quelle
0
class Calculate(object):
  # Your instance method to be executed
  def f(self, x, y):
    return x*y

if __name__ == '__main__':
  inp_list = [1,2,3]
  y = 2
  cal_obj = Calculate()
  pool = Pool(2)
  results = pool.map(lambda x: cal_obj.f(x, y), inp_list)

Es besteht die Möglichkeit, dass Sie diese Funktion für jede unterschiedliche Instanz der Klasse anwenden möchten. Dann ist hier auch die Lösung dafür

class Calculate(object):
  # Your instance method to be executed
  def __init__(self, x):
    self.x = x

  def f(self, y):
    return self.x*y

if __name__ == '__main__':
  inp_list = [Calculate(i) for i in range(3)]
  y = 2
  pool = Pool(2)
  results = pool.map(lambda x: x.f(y), inp_list)
ShikharDua
quelle
0

Hier ist meine Lösung, die ich für etwas weniger hackisch halte als die meisten anderen hier. Es ist ähnlich wie bei Nightowl.

someclasses = [MyClass(), MyClass(), MyClass()]

def method_caller(some_object, some_method='the method'):
    return getattr(some_object, some_method)()

othermethod = partial(method_caller, some_method='othermethod')

with Pool(6) as pool:
    result = pool.map(othermethod, someclasses)
Erlend Aune
quelle
0

Von http://www.rueckstiess.net/research/snippets/show/ca1d7d90 und http://qingkaikong.blogspot.com/2016/12/python-parallel-method-in-class.html

Wir können eine externe Funktion erstellen und sie mit dem Klassen-Selbstobjekt festlegen:

from joblib import Parallel, delayed
def unwrap_self(arg, **kwarg):
    return square_class.square_int(*arg, **kwarg)

class square_class:
    def square_int(self, i):
        return i * i

    def run(self, num):
        results = []
        results = Parallel(n_jobs= -1, backend="threading")\
            (delayed(unwrap_self)(i) for i in zip([self]*len(num), num))
        print(results)

ODER ohne Joblib:

from multiprocessing import Pool
import time

def unwrap_self_f(arg, **kwarg):
    return C.f(*arg, **kwarg)

class C:
    def f(self, name):
        print 'hello %s,'%name
        time.sleep(5)
        print 'nice to meet you.'

    def run(self):
        pool = Pool(processes=2)
        names = ('frank', 'justin', 'osi', 'thomas')
        pool.map(unwrap_self_f, zip([self]*len(names), names))

if __name__ == '__main__':
    c = C()
    c.run()
Bob Baxley
quelle
0

Dies ist vielleicht keine sehr gute Lösung, aber in meinem Fall löse ich sie so.

from multiprocessing import Pool

def foo1(data):
    self = data.get('slf')
    lst = data.get('lst')
    return sum(lst) + self.foo2()

class Foo(object):
    def __init__(self, a, b):
        self.a = a
        self.b = b

    def foo2(self):
        return self.a**self.b   

    def foo(self):
        p = Pool(5)
        lst = [1, 2, 3]
        result = p.map(foo1, (dict(slf=self, lst=lst),))
        return result

if __name__ == '__main__':
    print(Foo(2, 4).foo())

Ich musste zu selfmeiner Funktion übergehen , da ich über diese Funktion auf Attribute und Funktionen meiner Klasse zugreifen muss. Das funktioniert bei mir. Korrekturen und Vorschläge sind immer willkommen.

Muhammad Hassan
quelle
0

Hier ist ein Boilerplate, das ich für die Verwendung von Multiprocessing Pool in Python3 geschrieben habe. Insbesondere Python3.7.7 wurde zum Ausführen der Tests verwendet. Ich habe meine schnellsten Läufe mit imap_unordered. Schließen Sie einfach Ihr Szenario an und probieren Sie es aus. Sie können verwenden timeitoder nur time.time()um herauszufinden, welche für Sie am besten funktioniert.

import multiprocessing
import time

NUMBER_OF_PROCESSES = multiprocessing.cpu_count()
MP_FUNCTION = 'starmap'  # 'imap_unordered' or 'starmap' or 'apply_async'

def process_chunk(a_chunk):
    print(f"processig mp chunk {a_chunk}")
    return a_chunk


map_jobs = [1, 2, 3, 4]

result_sum = 0

s = time.time()
if MP_FUNCTION == 'imap_unordered':
    pool = multiprocessing.Pool(processes=NUMBER_OF_PROCESSES)
    for i in pool.imap_unordered(process_chunk, map_jobs):
        result_sum += i
elif MP_FUNCTION == 'starmap':
    pool = multiprocessing.Pool(processes=NUMBER_OF_PROCESSES)
    try:
        map_jobs = [(i, ) for i in map_jobs]
        result_sum = pool.starmap(process_chunk, map_jobs)
        result_sum = sum(result_sum)
    finally:
        pool.close()
        pool.join()
elif MP_FUNCTION == 'apply_async':
    with multiprocessing.Pool(processes=NUMBER_OF_PROCESSES) as pool:
        result_sum = [pool.apply_async(process_chunk, [i, ]).get() for i in map_jobs]
    result_sum = sum(result_sum)
print(f"result_sum is {result_sum}, took {time.time() - s}s")

Im obigen Szenario imap_unorderedscheint es tatsächlich das schlechteste für mich zu sein. Probieren Sie Ihren Fall aus und vergleichen Sie ihn mit der Maschine, auf der Sie ihn ausführen möchten. Informieren Sie sich auch über Prozesspools . Prost!

radtek
quelle