Python: Wie kann ich Python-Funktionen parallel ausführen?

108

Ich recherchierte zuerst und konnte keine Antwort auf meine Frage finden. Ich versuche, mehrere Funktionen in Python parallel auszuführen.

Ich habe so etwas:

files.py

import common #common is a util class that handles all the IO stuff

dir1 = 'C:\folder1'
dir2 = 'C:\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

def func1():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir1)
       c.getFiles(dir1)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir1)
       c.getFiles(dir1)

def func2():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir2)
       c.getFiles(dir2)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir2)
       c.getFiles(dir2)

Ich möchte func1 und func2 aufrufen und sie gleichzeitig ausführen lassen. Die Funktionen interagieren nicht miteinander oder mit demselben Objekt. Im Moment muss ich warten, bis func1 fertig ist, bevor func2 startet. Wie mache ich so etwas wie das Folgende:

process.py

from files import func1, func2

runBothFunc(func1(), func2())

Ich möchte in der Lage sein, beide Verzeichnisse nahezu zeitgleich zu erstellen, da ich jede Minute zähle, wie viele Dateien erstellt werden. Wenn das Verzeichnis nicht vorhanden ist, wird mein Timing beeinträchtigt.

lmcadory
quelle
1
Vielleicht möchten Sie dies neu gestalten. Wenn Sie die Anzahl der Dateien / Ordner pro Minute zählen, erstellen Sie eine Race-Bedingung. Wie wäre es, wenn jede Funktion einen Zähler aktualisiert oder eine Sperrdatei verwendet, um sicherzustellen, dass der periodische Prozess die Anzahl nicht aktualisiert, bis beide Funktionen ausgeführt wurden?

Antworten:

162

Sie könnten threadingoder verwenden multiprocessing.

Aufgrund Besonderheiten der CPython , threadingist unwahrscheinlich wahre Parallelität zu erreichen. Aus diesem Grund multiprocessingist in der Regel eine bessere Wahl.

Hier ist ein vollständiges Beispiel:

from multiprocessing import Process

def func1():
  print 'func1: starting'
  for i in xrange(10000000): pass
  print 'func1: finishing'

def func2():
  print 'func2: starting'
  for i in xrange(10000000): pass
  print 'func2: finishing'

if __name__ == '__main__':
  p1 = Process(target=func1)
  p1.start()
  p2 = Process(target=func2)
  p2.start()
  p1.join()
  p2.join()

Die Mechanismen zum Starten / Verbinden von untergeordneten Prozessen können leicht in eine Funktion eingekapselt werden, die wie folgt aussieht runBothFunc:

def runInParallel(*fns):
  proc = []
  for fn in fns:
    p = Process(target=fn)
    p.start()
    proc.append(p)
  for p in proc:
    p.join()

runInParallel(func1, func2)
NPE
quelle
4
Ich habe Ihren Code verwendet, aber die Funktionen wurden immer noch nicht gleichzeitig gestartet.
lmcadory
4
@ Lamar McAdory: Bitte erklären Sie, was genau Sie unter "zur gleichen Zeit" verstehen, und geben Sie vielleicht ein konkretes Beispiel dafür, was Sie getan haben, was Sie erwartet hatten und was tatsächlich passiert ist.
NPE
4
@ Lamar: Sie können niemals eine Garantie für "genau die gleiche Zeit" haben und zu denken, dass Sie es können, ist einfach falsch. Abhängig davon, wie viele CPUs Sie haben, die Auslastung der Maschine und das Timing vieler Ereignisse auf dem Computer haben alle Einfluss auf die Zeit, zu der die Threads / Prozesse gestartet werden. Da die Prozesse direkt nach der Erstellung gestartet werden, muss der Aufwand für die Erstellung eines Prozesses auch in der angezeigten Zeitdifferenz berechnet werden.
Martin
@ Lamar McAdory: Es gibt keine Möglichkeit, eine perfekte Synchronität der Ausführung zweier Funktionen sicherzustellen. Vielleicht lohnt es sich, den Gesamtansatz neu zu bewerten, um festzustellen, ob es einen besseren Weg gibt, um das zu erreichen, was Sie versuchen.
NPE
1
Ist es möglich, eine Liste der Ergebnisse jeder Funktion zu erhalten? Angenommen, jede Funktion gibt einen anderen Wert zurück. Können die Werte an eine Liste angehängt werden, die später verwendet werden kann? Vielleicht das Ergebnis an eine globale Liste anhängen?
Pelos
18

Dies kann elegant mit Ray erfolgen , einem System, mit dem Sie Ihren Python-Code einfach parallelisieren und verteilen können.

Um Ihr Beispiel zu parallelisieren, müssen Sie Ihre Funktionen mit dem @ray.remoteDekorator definieren und sie dann mit aufrufen .remote.

import ray

ray.init()

dir1 = 'C:\\folder1'
dir2 = 'C:\\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

# Define the functions. 
# You need to pass every global variable used by the function as an argument.
# This is needed because each remote function runs in a different process,
# and thus it does not have access to the global variables defined in 
# the current process.
@ray.remote
def func1(filename, addFiles, dir):
    # func1() code here...

@ray.remote
def func2(filename, addFiles, dir):
    # func2() code here...

# Start two tasks in the background and wait for them to finish.
ray.get([func1.remote(filename, addFiles, dir1), func2.remote(filename, addFiles, dir2)]) 

Wenn Sie dasselbe Argument an beide Funktionen übergeben und das Argument groß ist, können Sie dies effizienter verwenden ray.put(). Dadurch wird vermieden, dass das große Argument zweimal serialisiert und zwei Speicherkopien davon erstellt werden:

largeData_id = ray.put(largeData)

ray.get([func1(largeData_id), func2(largeData_id)])

Wenn func1() und func2()Rückkehr Ergebnisse, müssen Sie den Code neu zu schreiben , wie folgt:

ret_id1 = func1.remote(filename, addFiles, dir1)
ret_id2 = func1.remote(filename, addFiles, dir2)
ret1, ret2 = ray.get([ret_id1, ret_id2])

Die Verwendung von Ray gegenüber dem Multiprocessing- Modul bietet eine Reihe von Vorteilen . Insbesondere wird derselbe Code sowohl auf einem einzelnen Computer als auch auf einem Cluster von Computern ausgeführt. Weitere Vorteile von Ray finden Sie in diesem verwandten Beitrag .

Ion Stoica
quelle
17

Wenn Ihre Funktionen hauptsächlich E / A-Arbeit (und weniger CPU-Arbeit) ausführen und Sie Python 3.2+ haben, können Sie einen ThreadPoolExecutor verwenden :

from concurrent.futures import ThreadPoolExecutor

def run_io_tasks_in_parallel(tasks):
    with ThreadPoolExecutor() as executor:
        running_tasks = [executor.submit(task) for task in tasks]
        for running_task in running_tasks:
            running_task.result()

run_io_tasks_in_parallel([
    lambda: print('IO task 1 running!'),
    lambda: print('IO task 2 running!'),
])

Wenn Ihre Funktionen hauptsächlich CPU-Arbeit (und weniger E / A-Arbeit) ausführen und Sie Python 2.6+ haben, können Sie das Multiprozessor- Modul verwenden:

from multiprocessing import Process

def run_cpu_tasks_in_parallel(tasks):
    running_tasks = [Process(target=task) for task in tasks]
    for running_task in running_tasks:
        running_task.start()
    for running_task in running_tasks:
        running_task.join()

run_cpu_tasks_in_parallel([
    lambda: print('CPU task 1 running!'),
    lambda: print('CPU task 2 running!'),
])
David Foster
quelle
Das ist eine gute Antwort. Wie kann man anhand des Ergebnisses für die E / A-gebundenen Aufgaben mithilfe von concurrent.futures identifizieren, welche erledigt wurden? Grundsätzlich, anstelle von Lamba-Funktionen, wenn wir normale Funktionen haben, wie kann man das Ergebnis identifizieren, das der aufgerufenen Funktion zugeordnet ist?
Tragaknight
Trotzdem habe ich einen Weg gefunden - anstelle von run_cpu_tasks_in_parallel ([lambda: print ('CPU-Task 1 läuft!'), Lambda: print ('CPU-Task 2 läuft!'),]) Verwenden Sie diesen - results = run_io_tasks_in_parallel ([lambda: {'is_something1': func1 ()}, lambda: {'is_something2': func2 ()},])
Tragaknight
5

Wenn Sie ein Windows-Benutzer sind und Python 3 verwenden, hilft Ihnen dieser Beitrag bei der parallelen Programmierung in Python. Wenn Sie die Poolprogrammierung einer normalen Multiprozessor-Bibliothek ausführen, wird eine Fehlermeldung bezüglich der Hauptfunktion in Ihrem Programm angezeigt. Dies liegt daran, dass Windows keine fork () -Funktionalität hat. Der folgende Beitrag gibt eine Lösung für das erwähnte Problem.

http://python.6.x6.nabble.com/Multiprocessing-Pool-woes-td5047050.html

Da ich Python 3 verwendet habe, habe ich das Programm ein wenig wie folgt geändert:

from types import FunctionType
import marshal

def _applicable(*args, **kwargs):
  name = kwargs['__pw_name']
  code = marshal.loads(kwargs['__pw_code'])
  gbls = globals() #gbls = marshal.loads(kwargs['__pw_gbls'])
  defs = marshal.loads(kwargs['__pw_defs'])
  clsr = marshal.loads(kwargs['__pw_clsr'])
  fdct = marshal.loads(kwargs['__pw_fdct'])
  func = FunctionType(code, gbls, name, defs, clsr)
  func.fdct = fdct
  del kwargs['__pw_name']
  del kwargs['__pw_code']
  del kwargs['__pw_defs']
  del kwargs['__pw_clsr']
  del kwargs['__pw_fdct']
  return func(*args, **kwargs)

def make_applicable(f, *args, **kwargs):
  if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
  kwargs['__pw_name'] = f.__name__  # edited
  kwargs['__pw_code'] = marshal.dumps(f.__code__)   # edited
  kwargs['__pw_defs'] = marshal.dumps(f.__defaults__)  # edited
  kwargs['__pw_clsr'] = marshal.dumps(f.__closure__)  # edited
  kwargs['__pw_fdct'] = marshal.dumps(f.__dict__)   # edited
  return _applicable, args, kwargs

def _mappable(x):
  x,name,code,defs,clsr,fdct = x
  code = marshal.loads(code)
  gbls = globals() #gbls = marshal.loads(gbls)
  defs = marshal.loads(defs)
  clsr = marshal.loads(clsr)
  fdct = marshal.loads(fdct)
  func = FunctionType(code, gbls, name, defs, clsr)
  func.fdct = fdct
  return func(x)

def make_mappable(f, iterable):
  if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
  name = f.__name__    # edited
  code = marshal.dumps(f.__code__)   # edited
  defs = marshal.dumps(f.__defaults__)  # edited
  clsr = marshal.dumps(f.__closure__)  # edited
  fdct = marshal.dumps(f.__dict__)  # edited
  return _mappable, ((i,name,code,defs,clsr,fdct) for i in iterable)

Nach dieser Funktion wird auch der obige Problemcode ein wenig wie folgt geändert:

from multiprocessing import Pool
from poolable import make_applicable, make_mappable

def cube(x):
  return x**3

if __name__ == "__main__":
  pool    = Pool(processes=2)
  results = [pool.apply_async(*make_applicable(cube,x)) for x in range(1,7)]
  print([result.get(timeout=10) for result in results])

Und ich bekam die Ausgabe als:

[1, 8, 27, 64, 125, 216]

Ich denke, dass dieser Beitrag für einige der Windows-Benutzer nützlich sein kann.

Arun Sooraj
quelle
4

Es gibt keine Möglichkeit zu garantieren, dass zwei Funktionen synchron miteinander ausgeführt werden. Dies scheint genau das zu sein, was Sie tun möchten.

Das Beste, was Sie tun können, ist, die Funktion in mehrere Schritte aufzuteilen und dann zu warten, bis beide an kritischen Synchronisationspunkten beendet sind, indem Sie Process.joindie Antwort von like @ aix verwenden.

Dies ist besser als time.sleep(10)weil Sie keine genauen Timings garantieren können. Wenn Sie explizit warten, sagen Sie, dass die Funktionen ausgeführt werden müssen, bevor Sie zum nächsten übergehen, anstatt davon auszugehen, dass sie innerhalb von 10 ms ausgeführt werden, was nicht garantiert ist, basierend auf dem, was sonst noch auf dem Computer vor sich geht.

Davy8
quelle
1

Scheint, als hätten Sie eine einzige Funktion, die Sie für zwei verschiedene Parameter aufrufen müssen. Dies kann elegant mit einer Kombination von concurrent.futuresund mapmit Python 3.2+ erfolgen

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def sleep_secs(seconds):
  time.sleep(seconds)
  print(f'{seconds} has been processed')

secs_list = [2,4, 6, 8, 10, 12]

Wenn Ihre Operation an E / A gebunden ist, können Sie Folgendes verwenden ThreadPoolExecutor:

with ThreadPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)

Beachten Sie, wie maphier mapIhre Funktion zur Liste der Argumente verwendet wird.

Wenn Ihre Funktion an die CPU gebunden ist, können Sie sie verwenden ProcessPoolExecutor

with ProcessPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)

Wenn Sie sich nicht sicher sind, können Sie einfach beide ausprobieren und sehen, welche Ihnen bessere Ergebnisse liefert.

Wenn Sie Ihre Ergebnisse ausdrucken möchten, können Sie dies einfach tun:

with ThreadPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)
  for result in results:
    print(result)
BICube
quelle