Python Multiprocessing PicklingError: <Typ 'Funktion'> kann nicht eingelegt werden

243

Es tut mir leid, dass ich den Fehler nicht mit einem einfacheren Beispiel reproduzieren kann und mein Code zu kompliziert ist, um ihn zu veröffentlichen. Wenn ich das Programm in der IPython-Shell anstelle des regulären Python ausführe, funktionieren die Dinge gut.

Ich habe einige frühere Hinweise zu diesem Problem nachgeschlagen. Sie wurden alle durch die Verwendung von pool zum Aufrufen einer Funktion verursacht, die innerhalb einer Klassenfunktion definiert wurde. Dies ist aber bei mir nicht der Fall.

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib64/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

Ich würde mich über jede Hilfe freuen.

Update : Die Funktion, die ich auswähle, wird auf der obersten Ebene des Moduls definiert. Es wird jedoch eine Funktion aufgerufen, die eine verschachtelte Funktion enthält. dh f()ruft g()Anrufe auf, h()die eine verschachtelte Funktion haben i(), und ich rufe an pool.apply_async(f). f(), g(), h()Sind alle auf der obersten Ebene definiert. Ich habe ein einfacheres Beispiel mit diesem Muster versucht und es funktioniert jedoch.

Vendetta
quelle
3
Die Antwort auf oberster Ebene / akzeptiert ist gut, aber es könnte bedeuten, dass Sie Ihren Code neu strukturieren müssen, was schmerzhaft sein kann. Ich würde jedem empfehlen, der dieses Problem hat, auch die zusätzlichen Antworten unter Verwendung von dillund zu lesen pathos. Ich habe jedoch kein Glück mit einer der Lösungen bei der Arbeit mit vtkobjects :( Hat es jemand geschafft, Python-Code in paralleler Verarbeitung von vtkPolyData auszuführen?
Chris

Antworten:

305

Hier ist eine Liste, was eingelegt werden kann . Insbesondere können Funktionen nur ausgewählt werden, wenn sie auf der obersten Ebene eines Moduls definiert sind.

Dieser Code:

import multiprocessing as mp

class Foo():
    @staticmethod
    def work(self):
        pass

if __name__ == '__main__':   
    pool = mp.Pool()
    foo = Foo()
    pool.apply_async(foo.work)
    pool.close()
    pool.join()

ergibt einen Fehler, der fast identisch mit dem von Ihnen geposteten ist:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

Das Problem ist, dass poolalle Methoden a verwenden mp.SimpleQueue, um Aufgaben an die Arbeitsprozesse zu übergeben. Alles, was durch das geht, mp.SimpleQueuemuss auswählbar sein und foo.workist nicht auswählbar, da es nicht auf der obersten Ebene des Moduls definiert ist.

Sie kann behoben werden, indem auf der obersten Ebene eine Funktion definiert wird, die Folgendes aufruft foo.work():

def work(foo):
    foo.work()

pool.apply_async(work,args=(foo,))

Beachten Sie, dass dies fooauswählbar ist, da Fooes auf der obersten Ebene definiert und foo.__dict__auswählbar ist.

unutbu
quelle
2
Danke für deine Antwort. Ich habe meine Frage aktualisiert. Ich glaube nicht, dass das die Ursache ist
Vendetta
7
Um einen PicklingError zu erhalten, muss etwas in die Warteschlange gestellt werden, das nicht ausgewählt werden kann. Es könnte die Funktion oder ihre Argumente sein. Um mehr über das Problem zu erfahren, schlage ich vor, eine Kopie Ihres Programms zu erstellen und es bei jedem erneuten Ausführen des Programms zu reduzieren, um es immer einfacher zu machen, um festzustellen, ob das Problem weiterhin besteht. Wenn es wirklich einfach wird, haben Sie das Problem entweder selbst entdeckt oder haben etwas, das Sie hier posten können.
Unutbu
3
Außerdem: Wenn Sie eine Funktion auf der obersten Ebene eines Moduls definieren, diese jedoch dekoriert ist, wird auf die Ausgabe des Dekorators verwiesen, und dieser Fehler wird trotzdem angezeigt.
Bobpoekert
5
Nur spät um 5 Jahre, aber ich bin gerade darauf gestoßen. Es stellt sich heraus, dass "oberste Ebene" wörtlicher als gewöhnlich verstanden werden muss: Es scheint mir, dass die Funktionsdefinition der Initialisierung des Pools (dh der pool = Pool()Zeile hier ) vorausgehen muss . Ich habe das nicht erwartet, und dies könnte der Grund sein, warum das Problem von OP weiterhin bestand.
Andras Deak
4
Insbesondere können Funktionen nur ausgewählt werden, wenn sie auf der obersten Ebene eines Moduls definiert sind. Es scheint, dass das Ergebnis der Anwendung functool.partialauf eine Funktion der obersten Ebene auch beizbar ist, selbst wenn es in einer anderen Funktion definiert ist.
user1071847
96

Ich würde pathos.multiprocesssingstattdessen verwenden multiprocessing. pathos.multiprocessingist ein Fork von , multiprocessingdass Anwendungen dill. dillkann fast alles in Python serialisieren, so dass Sie viel mehr parallel senden können. Der pathosFork kann auch direkt mit mehreren Argumentfunktionen arbeiten, wie Sie dies für Klassenmethoden benötigen.

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool(4)
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
>>> 
>>> class Foo(object):
...   @staticmethod
...   def work(self, x):
...     return x+1
... 
>>> f = Foo()
>>> p.apipe(f.work, f, 100)
<processing.pool.ApplyResult object at 0x10504f8d0>
>>> res = _
>>> res.get()
101

Holen Sie sich pathos(und wenn Sie möchten dill) hier: https://github.com/uqfoundation

Mike McKerns
quelle
5
arbeitete ein Vergnügen. Für alle anderen habe ich beide Bibliotheken installiert durch: sudo pip install git+https://github.com/uqfoundation/dill.git@masterundsudo pip install git+https://github.com/uqfoundation/pathos.git@master
Alexander McFarlane
5
@AlexanderMcFarlane Ich würde Python-Pakete nicht mit installieren sudo(insbesondere von externen Quellen wie Github). Stattdessen würde ich empfehlen zu laufen:pip install --user git+...
Chris
Die Verwendung von just pip install pathosfunktioniert leider nicht und gibt folgende Meldung aus:Could not find a version that satisfies the requirement pp==1.5.7-pathos (from pathos)
xApple
11
pip install pathosfunktioniert jetzt und pathosist Python 3-kompatibel.
Mike McKerns
3
@DanielGoldfarb: multiprocessist ein Fork von multiprocessingwo dillersetzt hat picklean mehreren Stellen im Code ... aber im Wesentlichen, dass es. pathosBietet einige zusätzliche API-Ebenen multiprocessund zusätzliche Backends. Aber das ist der Kern davon.
Mike McKerns
29

Wie andere gesagt haben, multiprocessingkönnen Python-Objekte nur an Arbeitsprozesse übertragen werden, die eingelegt werden können. Wenn Sie Ihren Code nicht wie von unutbu beschrieben reorganisieren können, können Sie die dillerweiterten Beiz- / Entpickelfunktionen zum Übertragen von Daten (insbesondere Codedaten) verwenden, wie unten gezeigt.

Diese Lösung erfordert nur die Installation dillund keine anderen Bibliotheken wie folgt pathos:

import os
from multiprocessing import Pool

import dill


def run_dill_encoded(payload):
    fun, args = dill.loads(payload)
    return fun(*args)


def apply_async(pool, fun, args):
    payload = dill.dumps((fun, args))
    return pool.apply_async(run_dill_encoded, (payload,))


if __name__ == "__main__":

    pool = Pool(processes=5)

    # asyn execution of lambda
    jobs = []
    for i in range(10):
        job = apply_async(pool, lambda a, b: (a, b, a * b), (i, i + 1))
        jobs.append(job)

    for job in jobs:
        print job.get()
    print

    # async execution of static method

    class O(object):

        @staticmethod
        def calc():
            return os.getpid()

    jobs = []
    for i in range(10):
        job = apply_async(pool, O.calc, ())
        jobs.append(job)

    for job in jobs:
        print job.get()
Rocksportrocker
quelle
6
Ich bin der dillund pathosAutor ... und obwohl Sie Recht haben, ist es nicht so viel schöner und sauberer und flexibler, es auch zu verwenden, pathosals in meiner Antwort? Oder vielleicht bin ich ein bisschen voreingenommen ...
Mike McKerns
4
Ich war mir des Status pathoszum Zeitpunkt des Schreibens nicht bewusst und wollte eine Lösung vorstellen, die der Antwort sehr nahe kommt. Nachdem ich Ihre Lösung gesehen habe, stimme ich zu, dass dies der richtige Weg ist.
Rocksportrocker
Ich habe deine Lösung gelesen und Doh… I didn't even think of doing it like that. dachte: Das war irgendwie cool.
Mike McKerns
4
Vielen Dank für die Veröffentlichung, ich habe diesen Ansatz für Dilling / Undilling Argumente verwendet, die nicht eingelegt werden konnten: stackoverflow.com/questions/27883574/…
jazzblue
@rocksportrocker. Ich lese dieses Beispiel und kann nicht verstehen, warum es eine explizite forSchleife gibt. Normalerweise würde eine parallele Routine eine Liste erstellen und eine Liste ohne Schleife zurückgeben.
user1700890
20

Ich habe festgestellt, dass ich auch genau diese Fehlerausgabe auf einem perfekt funktionierenden Code generieren kann, indem ich versuche, den Profiler darauf zu verwenden.

Beachten Sie, dass dies unter Windows war (wo das Gabeln etwas weniger elegant ist).

Ich lief:

python -m profile -o output.pstats <script> 

Und festgestellt, dass durch Entfernen der Profilerstellung der Fehler behoben und durch Platzieren der Profilerstellung wiederhergestellt wurde. Hat mich auch verrückt gemacht, weil ich wusste, dass der Code funktioniert hat. Ich überprüfte, ob etwas pool.py aktualisiert hatte ... dann hatte ich ein sinkendes Gefühl und beseitigte die Profilerstellung und das wars.

Hier für die Archive posten, falls jemand anderes darauf stößt.

Hesekiel Kruglick
quelle
3
WOW, danke fürs Erwähnen! Es hat mich die letzte Stunde oder so verrückt gemacht; Ich habe alles bis zu einem sehr einfachen Beispiel ausprobiert - nichts schien zu funktionieren. Aber ich hatte auch den Profiler durch meine Batchdatei laufen lassen :(
Tim
1
Oh, ich kann dir nicht genug danken. Das klingt allerdings so albern, weil es so unerwartet ist. Ich denke, es sollte in den Dokumenten erwähnt werden. Alles, was ich hatte, war eine Import-PDF-Anweisung, und eine einfache Top-Level-Funktion mit nur einem passwar nicht 'pickle'able.
0xc0de
10

Wenn dieses Problem auftritt, besteht multiprocessingeine einfache Lösung darin, von Poolzu zu wechseln ThreadPool. Dies kann ohne Änderung des Codes außer dem Import erfolgen.

from multiprocessing.pool import ThreadPool as Pool

Dies funktioniert, weil ThreadPool den Speicher mit dem Hauptthread teilt, anstatt einen neuen Prozess zu erstellen. Dies bedeutet, dass kein Beizen erforderlich ist.

Der Nachteil dieser Methode ist, dass Python nicht die beste Sprache für den Umgang mit Threads ist. Es verwendet die sogenannte globale Interpreter-Sperre, um die Thread-Sicherheit zu gewährleisten, was hier einige Anwendungsfälle verlangsamen kann. Wenn Sie jedoch hauptsächlich mit anderen Systemen interagieren (HTTP-Befehle ausführen, mit einer Datenbank sprechen, in Dateisysteme schreiben), ist Ihr Code wahrscheinlich nicht an die CPU gebunden und wird keinen großen Einfluss haben. Tatsächlich habe ich beim Schreiben von HTTP / HTTPS-Benchmarks festgestellt, dass das hier verwendete Thread-Modell weniger Overhead und Verzögerungen aufweist, da der Overhead beim Erstellen neuer Prozesse viel höher ist als der Overhead beim Erstellen neuer Threads.

Wenn Sie also eine Menge Dinge im Python-Benutzerbereich verarbeiten, ist dies möglicherweise nicht die beste Methode.

tedivm
quelle
2
Aber dann verwenden Sie nur eine CPU (zumindest bei regulären Python-Versionen, die die GIL verwenden ), was den Zweck zunichte macht.
Endre Both
Das hängt wirklich davon ab, was der Zweck ist. Die globale Interpreter-Sperre bedeutet, dass jeweils nur eine Instanz Python-Code ausführen kann. Bei Aktionen, die stark blockieren (Dateisystemzugriff, Herunterladen großer oder mehrerer Dateien, Ausführen von externem Code), ist die GIL jedoch kein Problem. In einigen Fällen überwiegt der Overhead durch das Öffnen neuer Prozesse (anstelle von Threads) den GIL-Overhead.
Tedivm
Das stimmt, danke. Dennoch möchten Sie möglicherweise eine Einschränkung in die Antwort aufnehmen. Heutzutage, wenn die Verarbeitungsleistung hauptsächlich in Form von mehr als leistungsstärkeren CPU-Kernen steigt, ist der Wechsel von Multicore- zu Single-Core-Ausführung ein ziemlich bedeutender Nebeneffekt.
Beende beide
Guter Punkt - Ich habe die Antwort mit weiteren Details aktualisiert. Ich möchte jedoch darauf hinweisen, dass beim Umstieg auf Threaded Multiprocessing Python nicht nur auf einem einzelnen Kern funktioniert.
Tedivm
4

Diese Lösung erfordert nur die Installation von Dill und keine anderen Bibliotheken als Pathos

def apply_packed_function_for_map((dumped_function, item, args, kwargs),):
    """
    Unpack dumped function as target function and call it with arguments.

    :param (dumped_function, item, args, kwargs):
        a tuple of dumped function and its arguments
    :return:
        result of target function
    """
    target_function = dill.loads(dumped_function)
    res = target_function(item, *args, **kwargs)
    return res


def pack_function_for_map(target_function, items, *args, **kwargs):
    """
    Pack function and arguments to object that can be sent from one
    multiprocessing.Process to another. The main problem is:
        «multiprocessing.Pool.map*» or «apply*»
        cannot use class methods or closures.
    It solves this problem with «dill».
    It works with target function as argument, dumps it («with dill»)
    and returns dumped function with arguments of target function.
    For more performance we dump only target function itself
    and don't dump its arguments.
    How to use (pseudo-code):

        ~>>> import multiprocessing
        ~>>> images = [...]
        ~>>> pool = multiprocessing.Pool(100500)
        ~>>> features = pool.map(
        ~...     *pack_function_for_map(
        ~...         super(Extractor, self).extract_features,
        ~...         images,
        ~...         type='png'
        ~...         **options,
        ~...     )
        ~... )
        ~>>>

    :param target_function:
        function, that you want to execute like  target_function(item, *args, **kwargs).
    :param items:
        list of items for map
    :param args:
        positional arguments for target_function(item, *args, **kwargs)
    :param kwargs:
        named arguments for target_function(item, *args, **kwargs)
    :return: tuple(function_wrapper, dumped_items)
        It returs a tuple with
            * function wrapper, that unpack and call target function;
            * list of packed target function and its' arguments.
    """
    dumped_function = dill.dumps(target_function)
    dumped_items = [(dumped_function, item, args, kwargs) for item in items]
    return apply_packed_function_for_map, dumped_items

Es funktioniert auch für numpy Arrays.

Ilia w495 Nikitin
quelle
2
Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

Dieser Fehler tritt auch auf, wenn Sie eine integrierte Funktion im Modellobjekt haben, die an den asynchronen Job übergeben wurde.

So stellen Sie sicher , die überprüfen Modellobjekte , die vergangen sind , keine eingebauten Funktionen. (In unserem Fall haben wir die FieldTracker()Funktion von Django-Modell-Utils innerhalb des Modells verwendet, um ein bestimmtes Feld zu verfolgen). Hier ist der Link zum relevanten GitHub-Problem.

Penkey Suresh
quelle
0

Aufbauend auf der @ rocksportrocker-Lösung wäre es sinnvoll, beim Senden und Empfangen der Ergebnisse Dill zu verwenden.

import dill
import itertools
def run_dill_encoded(payload):
    fun, args = dill.loads(payload)
    res = fun(*args)
    res = dill.dumps(res)
    return res

def dill_map_async(pool, fun, args_list,
                   as_tuple=True,
                   **kw):
    if as_tuple:
        args_list = ((x,) for x in args_list)

    it = itertools.izip(
        itertools.cycle([fun]),
        args_list)
    it = itertools.imap(dill.dumps, it)
    return pool.map_async(run_dill_encoded, it, **kw)

if __name__ == '__main__':
    import multiprocessing as mp
    import sys,os
    p = mp.Pool(4)
    res = dill_map_async(p, lambda x:[sys.stdout.write('%s\n'%os.getpid()),x][-1],
                  [lambda x:x+1]*10,)
    res = res.get(timeout=100)
    res = map(dill.loads,res)
    print(res)
sollte sehen
quelle