<Typ 'instancemethod'> kann bei Verwendung von multiprocessing Pool.map () nicht ausgewählt werden.

218

Ich versuche zu verwenden , multiprocessing‚s - Pool.map()Funktion Arbeit gleichzeitig zu teilen aus. Wenn ich den folgenden Code verwende, funktioniert es einwandfrei:

import multiprocessing

def f(x):
    return x*x

def go():
    pool = multiprocessing.Pool(processes=4)        
    print pool.map(f, range(10))


if __name__== '__main__' :
    go()

Wenn ich es jedoch objektorientierter verwende, funktioniert es nicht. Die Fehlermeldung lautet:

PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
__builtin__.instancemethod failed

Dies tritt auf, wenn Folgendes mein Hauptprogramm ist:

import someClass

if __name__== '__main__' :
    sc = someClass.someClass()
    sc.go()

und das folgende ist meine someClassKlasse:

import multiprocessing

class someClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def go(self):
        pool = multiprocessing.Pool(processes=4)       
        print pool.map(self.f, range(10))

Weiß jemand, was das Problem sein könnte oder wie man es einfach umgehen kann?

Ventolin
quelle
4
Wenn f eine verschachtelte Funktion ist, gibt es einen ähnlichen FehlerPicklingError: Can't pickle <class 'function'>: attribute lookup builtins.function failed
ggg

Antworten:

122

Das Problem ist, dass bei der Mehrfachverarbeitung Dinge zwischen Prozessen verarbeitet werden müssen und gebundene Methoden nicht ausgewählt werden können. Die Problemumgehung (ob Sie es für "einfach" halten oder nicht ;-) besteht darin, die Infrastruktur zu Ihrem Programm hinzuzufügen, damit solche Methoden ausgewählt werden können, und sie bei der Standardbibliotheksmethode copy_reg zu registrieren .

Zum Beispiel zeigt Steven Bethards Beitrag zu diesem Thread (gegen Ende des Threads) einen perfekt praktikablen Ansatz, um das Beizen / Entpicken von Methoden über zu ermöglichen copy_reg.

Alex Martelli
quelle
Das ist super, Danke. Scheinen in gewisser Weise Fortschritte gemacht zu haben, jedenfalls: den Code bei Verwendung pastebin.ca/1693348 ich jetzt eine Runtime: maximale Rekursionstiefe überschritten. Ich habe mich umgesehen und in einem Forumsbeitrag wurde empfohlen, die maximale Tiefe auf 1500 (von der Standardeinstellung 1000) zu erhöhen, aber ich hatte dort keine Freude. Um ehrlich zu sein, kann ich nicht sehen, welcher Teil (zumindest meines Codes) außer Kontrolle geraten könnte, es sei denn, der Code wird aus irgendeinem Grund aufgrund geringfügiger Änderungen, die ich vorgenommen habe, in einer Schleife be- und aufgehoben Stevens Code OO'd?
Ventolin
1
Ihre _pickle_methodRückgabe self._unpickle_method, eine gebundene Methode; Natürlich versucht pickle jetzt, DAS zu beizen - und es tut, was Sie gesagt haben: indem Sie _pickle_methodrekursiv anrufen . Wenn OOSie den Code auf diese Weise eingeben, haben Sie unweigerlich eine unendliche Rekursion eingeführt. Ich schlage vor, zu Stevens Code zurückzukehren (und nicht am Altar von OO anzubeten, wenn dies nicht angemessen ist: Viele Dinge in Python lassen sich am besten funktionaler erledigen, und dies ist eine).
Alex Martelli
15
Für die Super-Super-Faulen sehen Sie die einzige Antwort, die sich die Mühe machte, den tatsächlichen nicht verstümmelten Code zu veröffentlichen ...
Cerin
2
Eine andere Möglichkeit, das Beizproblem
rocksportrocker
74

Alle diese Lösungen sind hässlich, da die Mehrfachverarbeitung und das Beizen unterbrochen und eingeschränkt sind, es sei denn, Sie springen außerhalb der Standardbibliothek.

Wenn Sie eine Verzweigung von multiprocessingaufgerufen verwenden pathos.multiprocesssing, können Sie Klassen und Klassenmethoden direkt in den mapFunktionen der Mehrfachverarbeitung verwenden. Dies liegt daran dill, dass anstelle von pickleoder verwendet cPicklewird und dillfast alles in Python serialisiert werden kann.

pathos.multiprocessingbietet auch eine asynchrone Zuordnungsfunktion… und kann mapmit mehreren Argumenten funktionieren (zmap(math.pow, [1,2,3], [4,5,6]) )

Siehe: Was können Multiprocessing und Dill zusammen tun?

und: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization/

>>> import pathos.pools as pp
>>> p = pp.ProcessPool(4)
>>> 
>>> def add(x,y):
...   return x+y
... 
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>> 
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>> 
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> 
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>> 
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]

Und um ganz klar zu sein, Sie können genau das tun, was Sie zuerst wollten, und Sie können es vom Dolmetscher aus tun, wenn Sie wollten.

>>> import pathos.pools as pp
>>> class someClass(object):
...   def __init__(self):
...     pass
...   def f(self, x):
...     return x*x
...   def go(self):
...     pool = pp.ProcessPool(4)
...     print pool.map(self.f, range(10))
... 
>>> sc = someClass()
>>> sc.go()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> 

Den Code erhalten Sie hier: https://github.com/uqfoundation/pathos

Mike McKerns
quelle
3
Können Sie diese Antwort bitte basierend auf pathos.pp aktualisieren, da pathos.multiprocessing nicht mehr existiert?
Saheel Godhane
10
Ich bin der pathosAutor. Die Version, auf die Sie sich beziehen, ist mehrere Jahre alt. Probieren Sie die Version auf github aus. Sie können sie verwenden pathos.ppoder github.com/uqfoundation/ppft .
Mike McKerns
1
oder github.com/uqfoundation/pathos . @SaheelGodhane: Eine neue Version ist längst überfällig, sollte aber in Kürze erscheinen.
Mike McKerns
3
Erst pip install setuptoolsdann pip install git+https://github.com/uqfoundation/pathos.git@master. Dadurch werden die entsprechenden Abhängigkeiten erhalten. Eine neue Version ist fast fertig… jetzt pathosläuft fast alles auch unter Windows und ist 3.xkompatibel.
Mike McKerns
1
@ Rika: Ja. Blockierende, iterative und asynchrone Karten sind verfügbar.
Mike McKerns
35

Sie können auch eine __call__()Methode in Ihrem definieren someClass(), die someClass.go()eine Instanz von aufruft und dann someClass()an den Pool übergibt. Dieses Objekt ist pickleable und es funktioniert gut (für mich) ...

dorvak
quelle
3
Dies ist viel einfacher als die von Alex Martelli vorgeschlagene Technik, aber Sie können nur eine Methode pro Klasse an Ihren Multiprozessor-Pool senden.
veraltet
6
Ein weiteres zu beachtendes Detail ist, dass nur das Objekt (Klasseninstanz) ausgewählt wird, nicht die Klasse selbst. Wenn Sie Klassenattribute von ihren Standardwerten geändert haben, werden diese Änderungen daher nicht auf die verschiedenen Prozesse übertragen. Die Problemumgehung besteht darin, sicherzustellen, dass alles, was Ihre Funktion benötigt, als Instanzattribut gespeichert wird.
veraltet
2
@dorvak könntest du bitte ein einfaches Beispiel mit zeigen __call__()? Ich denke, Ihre Antwort könnte die sauberere sein - ich habe Mühe, diesen Fehler zu verstehen, und zum ersten Mal komme ich, um einen Anruf zu sehen. Übrigens hilft auch diese Antwort zu klären, was Multiprocessing macht: [ stackoverflow.com/a/20789937/305883]
user305883
1
Können Sie ein Beispiel dafür geben?
Frmsaul
1
Es gibt eine neue Antwort (derzeit unter dieser) mit Beispielcode dafür.
Aaron
22

Einige Einschränkungen für Steven Bethards Lösung:

Wenn Sie Ihre Klassenmethode als Funktion registrieren, wird der Destruktor Ihrer Klasse überraschenderweise jedes Mal aufgerufen, wenn Ihre Methodenverarbeitung abgeschlossen ist. Wenn Sie also eine Instanz Ihrer Klasse haben, die n-mal ihre Methode aufruft, verschwinden Mitglieder möglicherweise zwischen zwei Läufen und Sie erhalten möglicherweise eine Nachricht malloc: *** error for object 0x...: pointer being freed was not allocated(z. B. geöffnete Mitgliedsdatei) oder pure virtual method called, terminate called without an active exception(was bedeutet, dass die Lebensdauer eines von mir verwendeten Mitgliedsobjekts kürzer war als was ich dachte). Ich habe dies beim Umgang mit n größer als die Poolgröße bekommen. Hier ist ein kurzes Beispiel:

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

# --------- see Stenven's solution above -------------
from copy_reg import pickle
from types import MethodType

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)


class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multi-processing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self.process_obj, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __del__(self):
        print "... Destructor"

    def process_obj(self, index):
        print "object %d" % index
        return "results"

pickle(MethodType, _pickle_method, _unpickle_method)
Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once)

Ausgabe:

Constructor ...
object 0
object 1
object 2
... Destructor
object 3
... Destructor
object 4
... Destructor
object 5
... Destructor
object 6
... Destructor
object 7
... Destructor
... Destructor
... Destructor
['results', 'results', 'results', 'results', 'results', 'results', 'results', 'results']
... Destructor

Die __call__Methode ist nicht so äquivalent, weil [Keine, ...] aus den Ergebnissen gelesen werden:

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multiprocessing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __call__(self, i):
        self.process_obj(i)

    def __del__(self):
        print "... Destructor"

    def process_obj(self, i):
        print "obj %d" % i
        return "result"

Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once), 
# **and** results are empty !

Keine der beiden Methoden ist also zufriedenstellend ...

Eric H.
quelle
7
Sie erhalten Nonezurück, weil Ihre Definition von __call__fehlt return: es sollte sein return self.process_obj(i).
Torek
1
@Eric Ich habe den gleichen Fehler erhalten und diese Lösung ausprobiert. Es wurde jedoch ein neuer Fehler als "cPickle.PicklingError: <Typ 'Funktion'> kann nicht ausgewählt werden. Attribut-Lookup eingebaut. Funktion fehlgeschlagen". Wissen Sie, was ein wahrscheinlicher Grund dafür sein kann?
Naman
15

Es gibt eine weitere Abkürzung, die Sie verwenden können, obwohl sie je nach den Instanzen in Ihrer Klasse ineffizient sein kann.

Wie jeder gesagt hat, besteht das Problem darin, dass der multiprocessingCode die Dinge auswählen muss, die er an die von ihm gestarteten Unterprozesse sendet, und der Pickler keine Instanzmethoden ausführt.

Anstatt jedoch die Instanzmethode zu senden, können Sie die eigentliche Klasseninstanz sowie den Namen der aufzurufenden Funktion an eine normale Funktion senden, die dann getattrzum Aufrufen der Instanzmethode verwendet wird, wodurch die gebundene Methode im PoolUnterprozess erstellt wird. Dies ähnelt dem Definieren einer __call__Methode, außer dass Sie mehr als eine Elementfunktion aufrufen können.

Den Code von @ EricH. aus seiner Antwort zu stehlen und ihn ein wenig zu kommentieren (ich habe ihn neu getippt, daher alle Namensänderungen und so, aus irgendeinem Grund schien dies einfacher als Ausschneiden und Einfügen :-)), um all die Magie zu veranschaulichen:

import multiprocessing
import os

def call_it(instance, name, args=(), kwargs=None):
    "indirect caller for instance methods and multiprocessing"
    if kwargs is None:
        kwargs = {}
    return getattr(instance, name)(*args, **kwargs)

class Klass(object):
    def __init__(self, nobj, workers=multiprocessing.cpu_count()):
        print "Constructor (in pid=%d)..." % os.getpid()
        self.count = 1
        pool = multiprocessing.Pool(processes = workers)
        async_results = [pool.apply_async(call_it,
            args = (self, 'process_obj', (i,))) for i in range(nobj)]
        pool.close()
        map(multiprocessing.pool.ApplyResult.wait, async_results)
        lst_results = [r.get() for r in async_results]
        print lst_results

    def __del__(self):
        self.count -= 1
        print "... Destructor (in pid=%d) count=%d" % (os.getpid(), self.count)

    def process_obj(self, index):
        print "object %d" % index
        return "results"

Klass(nobj=8, workers=3)

Die Ausgabe zeigt, dass der Konstruktor tatsächlich einmal (in der ursprünglichen PID) und der Destruktor 9 Mal aufgerufen wird (einmal für jede erstellte Kopie = 2 oder 3 Mal pro Pool-Worker-Prozess nach Bedarf plus einmal im Original) Prozess). Dies ist wie in diesem Fall häufig in Ordnung, da der Standard-Pickler eine Kopie der gesamten Instanz erstellt und diese (halb-) heimlich neu auffüllt - in diesem Fall:

obj = object.__new__(Klass)
obj.__dict__.update({'count':1})

- Deshalb zählt der Destruktor, obwohl er in den drei Worker-Prozessen achtmal aufgerufen wird, jedes Mal von 1 auf 0 herunter -, aber natürlich können Sie auf diese Weise immer noch in Schwierigkeiten geraten. Bei Bedarf können Sie Ihre eigenen bereitstellen __setstate__:

    def __setstate__(self, adict):
        self.count = adict['count']

in diesem Fall zum Beispiel.

torek
quelle
1
Dies ist bei weitem die beste Antwort für dieses Problem, da es am einfachsten ist, auf das nicht beizbare Standardverhalten anzuwenden
Matt Taylor
12

Sie können auch eine __call__()Methode in Ihrem definieren someClass(), die someClass.go()eine Instanz von aufruft und dann someClass()an den Pool übergibt. Dieses Objekt ist pickleable und es funktioniert gut (für mich) ...

class someClass(object):
   def __init__(self):
       pass
   def f(self, x):
       return x*x

   def go(self):
      p = Pool(4)
      sc = p.map(self, range(4))
      print sc

   def __call__(self, x):   
     return self.f(x)

sc = someClass()
sc.go()
Parisjohn
quelle
3

Die Lösung von Parisjohn oben funktioniert gut mit mir. Außerdem sieht der Code sauber und leicht verständlich aus. In meinem Fall gibt es einige Funktionen, die mit Pool aufgerufen werden können, daher habe ich den Code von Parisjohn etwas weiter unten geändert. Ich habe aufgerufen , um mehrere Funktionen aufrufen zu können, und die Funktionsnamen werden im Argument dict von übergeben go():

from multiprocessing import Pool
class someClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def g(self, x):
        return x*x+1    

    def go(self):
        p = Pool(4)
        sc = p.map(self, [{"func": "f", "v": 1}, {"func": "g", "v": 2}])
        print sc

    def __call__(self, x):
        if x["func"]=="f":
            return self.f(x["v"])
        if x["func"]=="g":
            return self.g(x["v"])        

sc = someClass()
sc.go()
Neobot
quelle
1

Eine möglicherweise triviale Lösung besteht darin, auf die Verwendung umzusteigen multiprocessing.dummy . Dies ist eine threadbasierte Implementierung der Multiprozessor-Schnittstelle, bei der dieses Problem in Python 2.7 nicht auftritt. Ich habe hier nicht viel Erfahrung, aber diese schnelle Importänderung ermöglichte es mir, apply_async für eine Klassenmethode aufzurufen.

Ein paar gute Ressourcen zu multiprocessing.dummy:

https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.dummy

http://chriskiehl.com/article/parallelism-in-one-line/

David Parks
quelle
1

In diesem einfachen Fall, in dem someClass.fkeine Daten von der Klasse geerbt und nichts an die Klasse angehängt werden, besteht eine mögliche Lösung darin, sie zu trennen f, damit sie ausgewählt werden können:

import multiprocessing


def f(x):
    return x*x


class someClass(object):
    def __init__(self):
        pass

    def go(self):
        pool = multiprocessing.Pool(processes=4)       
        print pool.map(f, range(10))
mhh
quelle
1

Warum nicht eine separate Funktion verwenden?

def func(*args, **kwargs):
    return inst.method(args, kwargs)

print pool.map(func, arr)
0script0
quelle
1

Ich bin auf dasselbe Problem gestoßen, habe jedoch festgestellt, dass es einen JSON-Encoder gibt, mit dem diese Objekte zwischen Prozessen verschoben werden können.

from pyVmomi.VmomiSupport import VmomiJSONEncoder

Verwenden Sie dies, um Ihre Liste zu erstellen:

jsonSerialized = json.dumps(pfVmomiObj, cls=VmomiJSONEncoder)

Verwenden Sie dann in der zugeordneten Funktion Folgendes, um das Objekt wiederherzustellen:

pfVmomiObj = json.loads(jsonSerialized)
George
quelle
0

Update: Ab dem Tag dieses Schreibens können NamedTuples ausgewählt werden (beginnend mit Python 2.7).

Das Problem hierbei ist, dass die untergeordneten Prozesse die Klasse des Objekts nicht importieren können - in diesem Fall die Klasse P -. Im Fall eines Projekts mit mehreren Modellen sollte die Klasse P überall dort importierbar sein, wo der untergeordnete Prozess verwendet wird

Eine schnelle Problemumgehung besteht darin, es importierbar zu machen, indem es auf Globals () angewendet wird.

globals()["P"] = P
rachid el kedmiri
quelle