Python Multiprocessing pool.map für mehrere Argumente

536

Gibt es in der Python-Multiprozessor-Bibliothek eine Variante von pool.map, die mehrere Argumente unterstützt?

text = "test"
def harvester(text, case):
    X = case[0]
    text+ str(X)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=6)
    case = RAW_DATASET
    pool.map(harvester(text,case),case, 1)
    pool.close()
    pool.join()
user642897
quelle
4
Zu meiner Überraschung konnte ich das weder machen partialnoch lambdatun. Ich denke, es hat mit der seltsamen Art und Weise zu tun, wie Funktionen an die Unterprozesse (via pickle) übergeben werden.
senderle
10
@senderle: Dies ist ein Fehler in Python 2.6, wurde aber ab 2.7 behoben
unutbu
1
Einfach ersetzen pool.map(harvester(text,case),case, 1) durch: pool.apply_async(harvester(text,case),case, 1)
Tung Nguyen
3
@Syrtis_Major, bitte bearbeiten Sie keine OP-Fragen, die die zuvor gegebenen Antworten effektiv verzerren. Das Hinzufügen returnzu harvester()@senderies Antwort wurde ungenau. Das hilft zukünftigen Lesern nicht.
Ricalsin
1
Ich würde sagen, eine einfache Lösung wäre, alle Argumente in ein Tupel zu packen und es in der ausführenden Funktion zu entpacken. Ich tat dies, als ich komplizierte mehrere Argumente an eine Funktion senden musste, die von einem Pool von Prozessen ausgeführt wurde.
HS Rathore

Antworten:

358

Die Antwort darauf ist version- und situationsabhängig. Die allgemeinste Antwort für neuere Versionen von Python (seit 3.3) wurde unten von JF Sebastian beschrieben . 1 Es wird die Pool.starmapMethode verwendet, die eine Folge von Argumenttupeln akzeptiert. Anschließend werden die Argumente aus jedem Tupel automatisch entpackt und an die angegebene Funktion übergeben:

import multiprocessing
from itertools import product

def merge_names(a, b):
    return '{} & {}'.format(a, b)

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with multiprocessing.Pool(processes=3) as pool:
        results = pool.starmap(merge_names, product(names, repeat=2))
    print(results)

# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...

Für frühere Versionen von Python müssen Sie eine Hilfsfunktion schreiben, um die Argumente explizit zu entpacken. Wenn Sie verwenden möchten with, müssen Sie auch einen Wrapper schreiben, um sich Poolin einen Kontextmanager zu verwandeln . (Danke an Myon für diesen Hinweis.)

import multiprocessing
from itertools import product
from contextlib import contextmanager

def merge_names(a, b):
    return '{} & {}'.format(a, b)

def merge_names_unpack(args):
    return merge_names(*args)

@contextmanager
def poolcontext(*args, **kwargs):
    pool = multiprocessing.Pool(*args, **kwargs)
    yield pool
    pool.terminate()

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with poolcontext(processes=3) as pool:
        results = pool.map(merge_names_unpack, product(names, repeat=2))
    print(results)

# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...

In einfacheren Fällen können Sie mit einem festen zweiten Argument auch verwenden partial, jedoch nur in Python 2.7+.

import multiprocessing
from functools import partial
from contextlib import contextmanager

@contextmanager
def poolcontext(*args, **kwargs):
    pool = multiprocessing.Pool(*args, **kwargs)
    yield pool
    pool.terminate()

def merge_names(a, b):
    return '{} & {}'.format(a, b)

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with poolcontext(processes=3) as pool:
        results = pool.map(partial(merge_names, b='Sons'), names)
    print(results)

# Output: ['Brown & Sons', 'Wilson & Sons', 'Bartlett & Sons', ...

1. Ein Großteil davon wurde von seiner Antwort inspiriert, die wahrscheinlich stattdessen hätte akzeptiert werden sollen. Aber da dieser oben feststeckt, schien es am besten, ihn für zukünftige Leser zu verbessern.

senderle
quelle
Es scheint mir, dass RAW_DATASET in diesem Fall eine globale Variable sein sollte? Während ich möchte, dass der Partial_harvester den Wert von case bei jedem Aufruf von harvester () ändert. Wie erreicht man das?
xgdgsc
Das Wichtigste dabei ist, den =RAW_DATASETStandardwert zuzuweisen case. Andernfalls pool.mapwird über die mehreren Argumente verwirrt.
Emerson Xu
1
Ich bin verwirrt, was ist mit der textVariablen in Ihrem Beispiel passiert ? Warum wird RAW_DATASETscheinbar zweimal bestanden. Ich denke, Sie könnten einen Tippfehler haben?
Dave
with .. as .. Ich bin mir nicht sicher, warum ich es benutze AttributeError: __exit__, aber es funktioniert gut, wenn ich nur anrufe und pool = Pool();dann manuell schließe pool.close()(python2.7)
Myon
1
@ Myon, guter Fang. Es scheint, dass PoolObjekte erst in Python 3.3 zu Kontextmanagern werden. Ich habe eine einfache Wrapper-Funktion hinzugefügt, die einen PoolKontextmanager zurückgibt .
senderle
501

Gibt es eine Variante von pool.map, die mehrere Argumente unterstützt?

Python 3.3 enthält folgende pool.starmap()Methode :

#!/usr/bin/env python3
from functools import partial
from itertools import repeat
from multiprocessing import Pool, freeze_support

def func(a, b):
    return a + b

def main():
    a_args = [1,2,3]
    second_arg = 1
    with Pool() as pool:
        L = pool.starmap(func, [(1, 1), (2, 1), (3, 1)])
        M = pool.starmap(func, zip(a_args, repeat(second_arg)))
        N = pool.map(partial(func, b=second_arg), a_args)
        assert L == M == N

if __name__=="__main__":
    freeze_support()
    main()

Für ältere Versionen:

#!/usr/bin/env python2
import itertools
from multiprocessing import Pool, freeze_support

def func(a, b):
    print a, b

def func_star(a_b):
    """Convert `f([1,2])` to `f(1,2)` call."""
    return func(*a_b)

def main():
    pool = Pool()
    a_args = [1,2,3]
    second_arg = 1
    pool.map(func_star, itertools.izip(a_args, itertools.repeat(second_arg)))

if __name__=="__main__":
    freeze_support()
    main()

Ausgabe

1 1
2 1
3 1

Beachten Sie, wie itertools.izip()und itertools.repeat()hier verwendet werden.

Aufgrund des von @unutbu erwähnten Fehlers können Sie functools.partial()oder ähnliche Funktionen in Python 2.6 nicht verwenden , daher sollte die einfache Wrapper-Funktion func_star()explizit definiert werden. Siehe auch die vonuptimebox .

jfs
quelle
1
F.: Sie können das Argumenttupel in der folgenden Signatur entpacken func_star: def func_star((a, b)). Natürlich funktioniert dies nur für eine feste Anzahl von Argumenten, aber wenn dies der einzige Fall ist, den er hat, ist es besser lesbar.
Björn Pollex
1
@ Space_C0wb0y: Die f((a,b))Syntax ist in py3k veraltet und wird entfernt. Und das ist hier unnötig.
JFS
vielleicht pythonischer: func = lambda x: func(*x)anstatt eine Wrapper-Funktion zu definieren
Dylam
1
@ zthomas.nc In dieser Frage geht es darum, wie mehrere Argumente für die Mehrfachverarbeitung pool.map unterstützt werden. Wenn Sie wissen möchten, wie eine Methode anstelle einer Funktion in einem anderen Python-Prozess über Multiprocessing aufgerufen wird, stellen Sie eine separate Frage (wenn alles andere fehlschlägt, können Sie jederzeit eine globale Funktion erstellen, die den Methodenaufruf ähnlich wie func_star()oben
umschließt
1
Ich wünschte es gäbe starstarmap.
23онстантин Ван
141

Ich denke, das Folgende wird besser sein

def multi_run_wrapper(args):
   return add(*args)
def add(x,y):
    return x+y
if __name__ == "__main__":
    from multiprocessing import Pool
    pool = Pool(4)
    results = pool.map(multi_run_wrapper,[(1,2),(2,3),(3,4)])
    print results

Ausgabe

[3, 5, 7]
imotai
quelle
16
Einfachste Lösung. Es gibt eine kleine Optimierung; Entfernen Sie die Wrapper-Funktion und entpacken Sie sie argsdirekt add. Sie funktioniert für eine beliebige Anzahl von Argumenten:def add(args): (x,y) = args
Ahmed
1
Sie könnten auch eine lambdaFunktion verwenden, anstatt zu definierenmulti_run_wrapper(..)
Andre Holzner
2
hm ... in der Tat lambdafunktioniert die Verwendung von a nicht, weil pool.map(..)versucht wird, die gegebene Funktion zu übernehmen
Andre Holzner
Wie verwenden Sie dies, wenn Sie das Ergebnis von addin einer Liste speichern möchten ?
Vivek Subramanian
@Ahmed Ich mag es, wie es ist, weil IMHO der Methodenaufruf fehlschlagen sollte, wenn die Anzahl der Parameter nicht korrekt ist.
Michael Dorner
56

Verwenden von Python 3.3+ mitpool.starmap():

from multiprocessing.dummy import Pool as ThreadPool 

def write(i, x):
    print(i, "---", x)

a = ["1","2","3"]
b = ["4","5","6"] 

pool = ThreadPool(2)
pool.starmap(write, zip(a,b)) 
pool.close() 
pool.join()

Ergebnis:

1 --- 4
2 --- 5
3 --- 6

Sie können auch weitere Argumente zip (), wenn Sie möchten: zip(a,b,c,d,e)

Wenn Sie einen konstanten Wert als Argument übergeben möchten, müssen Sie ihn verwenden import itertoolsund dann zip(itertools.repeat(constant), a)zum Beispiel.

user136036
quelle
2
Dies ist eine nahezu exakte doppelte Antwort wie die von @JFSebastian im Jahr 2011 (mit mehr als 60 Stimmen).
Mike McKerns
29
Nein. Zunächst wurden viele unnötige Dinge entfernt und es wird klargestellt, dass es sich um Python 3.3+ handelt und für Anfänger gedacht ist, die nach einer einfachen und sauberen Antwort suchen. Als Anfänger selbst hat es einige Zeit gedauert, es so herauszufinden (ja, bei JFSebastians-Posts), und deshalb habe ich meinen Post geschrieben, um anderen Anfängern zu helfen, weil sein Post einfach "es gibt Starmap" sagte, es aber nicht erklärte - dies ist das, was mein Beitrag beabsichtigt. Es gibt also absolut keinen Grund, mich mit zwei Abstimmungen zu verprügeln.
user136036
Im Jahr 2011 gab es in Python 3.3 + kein "+" ... also offensichtlich.
Mike McKerns
27

Nachdem ich in der Antwort von JF Sebastian von itertools erfahren hatte, beschloss ich, noch einen Schritt weiter zu gehen und ein parmapPaket zu schreiben , das sich um Parallelisierung, Angebot mapund starmapFunktionen von Python-2.7 und Python-3.2 (und später auch) kümmert und eine beliebige Anzahl von Positionsargumenten annehmen kann .

Installation

pip install parmap

So parallelisieren Sie:

import parmap
# If you want to do:
y = [myfunction(x, argument1, argument2) for x in mylist]
# In parallel:
y = parmap.map(myfunction, mylist, argument1, argument2)

# If you want to do:
z = [myfunction(x, y, argument1, argument2) for (x,y) in mylist]
# In parallel:
z = parmap.starmap(myfunction, mylist, argument1, argument2)

# If you want to do:
listx = [1, 2, 3, 4, 5, 6]
listy = [2, 3, 4, 5, 6, 7]
param = 3.14
param2 = 42
listz = []
for (x, y) in zip(listx, listy):
        listz.append(myfunction(x, y, param1, param2))
# In parallel:
listz = parmap.starmap(myfunction, zip(listx, listy), param1, param2)

Ich habe Parmap auf PyPI und in ein Github-Repository hochgeladen .

Als Beispiel kann die Frage wie folgt beantwortet werden:

import parmap

def harvester(case, text):
    X = case[0]
    text+ str(X)

if __name__ == "__main__":
    case = RAW_DATASET  # assuming this is an iterable
    parmap.map(harvester, case, "test", chunksize=1)
Zeehio
quelle
20

# "Wie man mehrere Argumente nimmt".

def f1(args):
    a, b, c = args[0] , args[1] , args[2]
    return a+b+c

if __name__ == "__main__":
    import multiprocessing
    pool = multiprocessing.Pool(4) 

    result1 = pool.map(f1, [ [1,2,3] ])
    print(result1)
Dane Lee
quelle
2
Ordentlich und elegant.
Prav001
1
Ich verstehe nicht, warum ich den ganzen Weg hierher scrollen muss, um die beste Antwort zu finden.
Toti
11

Es gibt eine Verzweigung mit dem multiprocessingNamen Pathos ( Hinweis: Verwenden Sie die Version auf Github ), die nicht benötigt wird. starmapDie Map-Funktionen spiegeln die API für die Python-Map wider , sodass die Map mehrere Argumente annehmen kann. Mit pathoskönnen Sie im Allgemeinen auch Multiprocessing im Interpreter ausführen, anstatt im __main__Block stecken zu bleiben . Pathos wird nach einigen milden Aktualisierungen veröffentlicht - hauptsächlich Konvertierung auf Python 3.x.

  Python 2.7.5 (default, Sep 30 2013, 20:15:49) 
  [GCC 4.2.1 (Apple Inc. build 5566)] on darwin
  Type "help", "copyright", "credits" or "license" for more information.
  >>> def func(a,b):
  ...     print a,b
  ...
  >>>
  >>> from pathos.multiprocessing import ProcessingPool    
  >>> pool = ProcessingPool(nodes=4)
  >>> pool.map(func, [1,2,3], [1,1,1])
  1 1
  2 1
  3 1
  [None, None, None]
  >>>
  >>> # also can pickle stuff like lambdas 
  >>> result = pool.map(lambda x: x**2, range(10))
  >>> result
  [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
  >>>
  >>> # also does asynchronous map
  >>> result = pool.amap(pow, [1,2,3], [4,5,6])
  >>> result.get()
  [1, 32, 729]
  >>>
  >>> # or can return a map iterator
  >>> result = pool.imap(pow, [1,2,3], [4,5,6])
  >>> result
  <processing.pool.IMapIterator object at 0x110c2ffd0>
  >>> list(result)
  [1, 32, 729]

pathosEs gibt verschiedene Möglichkeiten, wie Sie das genaue Verhalten ermitteln können starmap.

>>> def add(*x):
...   return sum(x)
... 
>>> x = [[1,2,3],[4,5,6]]
>>> import pathos
>>> import numpy as np
>>> # use ProcessPool's map and transposing the inputs
>>> pp = pathos.pools.ProcessPool()
>>> pp.map(add, *np.array(x).T)
[6, 15]
>>> # use ProcessPool's map and a lambda to apply the star
>>> pp.map(lambda x: add(*x), x)
[6, 15]
>>> # use a _ProcessPool, which has starmap
>>> _pp = pathos.pools._ProcessPool()
>>> _pp.starmap(add, x)
[6, 15]
>>> 
Mike McKerns
quelle
Ich möchte darauf hinweisen, dass dies nicht die Struktur in der ursprünglichen Frage anspricht. [[1,2,3], [4,5,6]] würden mit Sternenkarte zu [pow (1,2,3), pow (4,5,6)] und nicht zu [pow (1,4) auspacken. pow (2,5), pow (3, 6)]. Wenn Sie keine gute Kontrolle über die Eingaben haben, die an Ihre Funktion übergeben werden, müssen Sie diese möglicherweise zuerst umstrukturieren.
Scott
@ Scott: Ah, das habe ich nicht bemerkt ... vor über 5 Jahren. Ich werde ein kleines Update machen. Vielen Dank.
Mike McKerns
8

Sie können die folgenden zwei Funktionen verwenden, um zu vermeiden, dass für jede neue Funktion ein Wrapper geschrieben wird:

import itertools
from multiprocessing import Pool

def universal_worker(input_pair):
    function, args = input_pair
    return function(*args)

def pool_args(function, *args):
    return zip(itertools.repeat(function), zip(*args))

Verwenden Sie die Funktion functionmit den Listen der Argumente arg_0, arg_1und arg_2wie folgt dar :

pool = Pool(n_core)
list_model = pool.map(universal_worker, pool_args(function, arg_0, arg_1, arg_2)
pool.close()
pool.join()
M. Toya
quelle
8

Eine bessere Lösung für Python2:

from multiprocessing import Pool
def func((i, (a, b))):
    print i, a, b
    return a + b
pool = Pool(3)
pool.map(func, [(0,(1,2)), (1,(2,3)), (2,(3, 4))])

2 3 4

1 2 3

0 1 2

aus[]:

[3, 5, 7]

xmduhan
quelle
7

Eine andere einfache Alternative besteht darin, Ihre Funktionsparameter in ein Tupel zu packen und dann die Parameter, die ebenfalls in Tupel übergeben werden sollen, zu verpacken. Dies ist möglicherweise nicht ideal, wenn Sie mit großen Datenmengen arbeiten. Ich glaube, es würde Kopien für jedes Tupel machen.

from multiprocessing import Pool

def f((a,b,c,d)):
    print a,b,c,d
    return a + b + c +d

if __name__ == '__main__':
    p = Pool(10)
    data = [(i+0,i+1,i+2,i+3) for i in xrange(10)]
    print(p.map(f, data))
    p.close()
    p.join()

Gibt die Ausgabe in zufälliger Reihenfolge an:

0 1 2 3
1 2 3 4
2 3 4 5
3 4 5 6
4 5 6 7
5 6 7 8
7 8 9 10
6 7 8 9
8 9 10 11
9 10 11 12
[6, 10, 14, 18, 22, 26, 30, 34, 38, 42]
Alex Klibisz
quelle
In der Tat, immer noch auf der Suche nach einem besseren Weg :(
Fábio Dias
6

Ein besserer Weg ist die Verwendung von Decorator, anstatt die Wrapper-Funktion von Hand zu schreiben . Insbesondere wenn Sie viele Funktionen zuordnen müssen, spart der Dekorateur Zeit, indem er das Schreiben eines Wrappers für jede Funktion vermeidet. Normalerweise ist eine dekorierte Funktion nicht auswählbar, wir können sie jedoch verwenden functools, um sie zu umgehen . Weitere Diskussionen finden Sie hier .

Hier das Beispiel

def unpack_args(func):
    from functools import wraps
    @wraps(func)
    def wrapper(args):
        if isinstance(args, dict):
            return func(**args)
        else:
            return func(*args)
    return wrapper

@unpack_args
def func(x, y):
    return x + y

Dann können Sie es mit komprimierten Argumenten zuordnen

np, xlist, ylist = 2, range(10), range(10)
pool = Pool(np)
res = pool.map(func, zip(xlist, ylist))
pool.close()
pool.join()

Natürlich können Sie Pool.starmapin Python 3 (> = 3.3) immer verwenden, wie in anderen Antworten erwähnt.

Syrtis Major
quelle
Die Ergebnisse sind nicht wie erwartet: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18] Ich würde erwarten: [0,1,2,3,4,5,6,7,8, 9,1,2,3,4,5,6,7,8,9,10,2,3,4,5,6,7,8,9,10,11, ...
Tedo Vrbanec
@TedoVrbanec Die Ergebnisse sollten nur [0, 2, 4, 6, 8, 10, 12, 14, 16, 18] sein. Wenn Sie die spätere möchten, können Sie itertools.productanstelle von verwenden zip.
Syrtis Major
4

Eine andere Möglichkeit besteht darin, eine Liste von Listen an eine Routine mit einem Argument zu übergeben:

import os
from multiprocessing import Pool

def task(args):
    print "PID =", os.getpid(), ", arg1 =", args[0], ", arg2 =", args[1]

pool = Pool()

pool.map(task, [
        [1,2],
        [3,4],
        [5,6],
        [7,8]
    ])

Man kann dann eine Liste mit Argumenten mit seiner Lieblingsmethode erstellen.

Adobe
quelle
Dies ist ein einfacher Weg, aber Sie müssen Ihre ursprünglichen Funktionen ändern. Darüber hinaus rufen einige manchmal die Funktionen anderer auf, die möglicherweise nicht geändert werden können.
WeizhongTu
Ich werde sagen, dass dies bei Python Zen bleibt. Es sollte nur einen offensichtlichen Weg geben, dies zu tun. Wenn Sie zufällig der Autor der aufrufenden Funktion sind, sollten Sie diese Methode verwenden. In anderen Fällen können wir die Methode von imotai verwenden.
Nehem
Ich habe die Wahl, ein Tupel zu verwenden und es dann sofort als erstes in der ersten Zeile auszupacken.
Nehem
3

Hier ist eine andere Möglichkeit, IMHO einfacher und eleganter zu gestalten als alle anderen Antworten.

Dieses Programm verfügt über eine Funktion, die zwei Parameter verwendet, diese ausgibt und auch die Summe ausgibt:

import multiprocessing

def main():

    with multiprocessing.Pool(10) as pool:
        params = [ (2, 2), (3, 3), (4, 4) ]
        pool.starmap(printSum, params)
    # end with

# end function

def printSum(num1, num2):
    mySum = num1 + num2
    print('num1 = ' + str(num1) + ', num2 = ' + str(num2) + ', sum = ' + str(mySum))
# end function

if __name__ == '__main__':
    main()

Ausgabe ist:

num1 = 2, num2 = 2, sum = 4
num1 = 3, num2 = 3, sum = 6
num1 = 4, num2 = 4, sum = 8

Weitere Informationen finden Sie in den Python-Dokumenten:

https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool

Überprüfen Sie insbesondere die starmapFunktion.

Ich verwende Python 3.6 und bin mir nicht sicher, ob dies mit älteren Python-Versionen funktioniert

Warum es in den Dokumenten kein sehr einfaches Beispiel wie dieses gibt, weiß ich nicht genau.

cdahms
quelle
2

Ab Python 3.4.4 können Sie multiprocessing.get_context () verwenden, um ein Kontextobjekt für die Verwendung mehrerer Startmethoden abzurufen:

import multiprocessing as mp

def foo(q, h, w):
    q.put(h + ' ' + w)
    print(h + ' ' + w)

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,'hello', 'world'))
    p.start()
    print(q.get())
    p.join()

Oder Sie ersetzen einfach

pool.map(harvester(text,case),case, 1)

durch:

pool.apply_async(harvester(text,case),case, 1)
Tung Nguyen
quelle
2

Hier gibt es viele Antworten, aber keine scheint Python 2/3-kompatiblen Code bereitzustellen, der für jede Version geeignet ist. Wenn Sie möchten, dass Ihr Code nur funktioniert , funktioniert dies für beide Python-Versionen:

# For python 2/3 compatibility, define pool context manager
# to support the 'with' statement in Python 2
if sys.version_info[0] == 2:
    from contextlib import contextmanager
    @contextmanager
    def multiprocessing_context(*args, **kwargs):
        pool = multiprocessing.Pool(*args, **kwargs)
        yield pool
        pool.terminate()
else:
    multiprocessing_context = multiprocessing.Pool

Danach können Sie die Mehrfachverarbeitung auf reguläre Python 3-Weise verwenden, wie Sie möchten. Zum Beispiel:

def _function_to_run_for_each(x):
       return x.lower()
with multiprocessing_context(processes=3) as pool:
    results = pool.map(_function_to_run_for_each, ['Bob', 'Sue', 'Tim'])    print(results)

funktioniert in Python 2 oder Python 3.

cgnorthcutt
quelle
1

In der offiziellen Dokumentation heißt es, dass es nur ein iterierbares Argument unterstützt. In solchen Fällen verwende ich gerne apply_async. In Ihrem Fall würde ich tun:

from multiprocessing import Process, Pool, Manager

text = "test"
def harvester(text, case, q = None):
 X = case[0]
 res = text+ str(X)
 if q:
  q.put(res)
 return res


def block_until(q, results_queue, until_counter=0):
 i = 0
 while i < until_counter:
  results_queue.put(q.get())
  i+=1

if __name__ == '__main__':
 pool = multiprocessing.Pool(processes=6)
 case = RAW_DATASET
 m = Manager()
 q = m.Queue()
 results_queue = m.Queue() # when it completes results will reside in this queue
 blocking_process = Process(block_until, (q, results_queue, len(case)))
 blocking_process.start()
 for c in case:
  try:
   res = pool.apply_async(harvester, (text, case, q = None))
   res.get(timeout=0.1)
  except:
   pass
 blocking_process.join()
roj4s
quelle
1
text = "test"

def unpack(args):
    return args[0](*args[1:])

def harvester(text, case):
    X = case[0]
    text+ str(X)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=6)
    case = RAW_DATASET
    # args is a list of tuples 
    # with the function to execute as the first item in each tuple
    args = [(harvester, text, c) for c in case]
    # doing it this way, we can pass any function
    # and we don't need to define a wrapper for each different function
    # if we need to use more than one
    pool.map(unpack, args)
    pool.close()
    pool.join()
Jaime
quelle
1

Dies ist ein Beispiel für die Routine, mit der ich mehrere Argumente an eine Funktion mit einem Argument übergebe, die in einergabelung pool.imap verwendet wird:

from multiprocessing import Pool

# Wrapper of the function to map:
class makefun:
    def __init__(self, var2):
        self.var2 = var2
    def fun(self, i):
        var2 = self.var2
        return var1[i] + var2

# Couple of variables for the example:
var1 = [1, 2, 3, 5, 6, 7, 8]
var2 = [9, 10, 11, 12]

# Open the pool:
pool = Pool(processes=2)

# Wrapper loop
for j in range(len(var2)):
    # Obtain the function to map
    pool_fun = makefun(var2[j]).fun

    # Fork loop
    for i, value in enumerate(pool.imap(pool_fun, range(len(var1))), 0):
        print(var1[i], '+' ,var2[j], '=', value)

# Close the pool
pool.close()
A. Nodar
quelle
-3

Für Python2 können Sie diesen Trick verwenden

def fun(a,b):
    return a+b

pool = multiprocessing.Pool(processes=6)
b=233
pool.map(lambda x:fun(x,b),range(1000))
Hz Shang
quelle
warum b = 233. besiegt den Zweck der Frage
als ob