Parallelisieren einer for-Schleife in Python

35

Gibt es in Python Tools, die Matlabs Parfor ähneln? Ich habe diesen Thread gefunden , aber er ist vier Jahre alt. Ich dachte, vielleicht hat hier jemand neuere Erfahrungen.

Hier ist ein Beispiel für die Art von Dingen, die ich parallelisieren möchte:

X = np.random.normal(size=(10, 3))
F = np.zeros((10, ))
for i in range(10):
    F[i] = my_function(X[i,:])

wo my_functionnimmt eine ndarrayvon Größe (1,3)und gibt einen Skalar.

Zumindest möchte ich mehrere Kerne gleichzeitig verwenden - wie parfor. Mit anderen Worten, nehmen Sie ein gemeinsames Speichersystem mit 8 bis 16 Kernen an.

Paul G. Constantine
quelle
Viele Ergebnisse auf Google. Diese scheinen ziemlich einfach zu sein: blog.dominodatalab.com/simple-parallelization quora.com/What-is-the-Python-equivalent-of-MATLABs-parfor
Doug Lipinski
Vielen Dank, @ Doug-Lipinski. Diese Beispiele haben, wie andere, die ich beim Googeln gefunden habe, eine triviale Berechnung basierend auf dem Iterationsindex. Und sie behaupten immer, der Code sei "unglaublich einfach". Mein Beispiel definiert die Arrays (weist den Speicher zu) außerhalb der for-Schleife. Mir geht es gut, wenn ich es anders mache. So mache ich das in Matlab. Der knifflige Teil, der diese Beispiele zu umgehen scheint, besteht darin, einen Teil eines gegebenen Arrays an die Funktion innerhalb der Schleife zu übergeben.
Paul G. Constantine

Antworten:

19

Joblib macht was Sie wollen. Das grundlegende Verwendungsmuster ist:

from joblib import Parallel, delayed

def myfun(arg):
     do_stuff
     return result

results = Parallel(n_jobs=-1, verbose=verbosity_level, backend="threading")(
             map(delayed(myfun), arg_instances))

Dabei arg_instancesist eine Liste von Werten, für die myfunparallel berechnet wird. Die Haupteinschränkung ist, dass myfunes sich um eine Toplevel-Funktion handeln muss. Der backendParameter kann entweder "threading"oder sein "multiprocessing".

Sie können der parallelisierten Funktion weitere allgemeine Parameter übergeben. Der Body von myfunkann auch auf initialisierte globale Variablen verweisen, die den untergeordneten Variablen zur Verfügung stehen.

Argumente und Ergebnisse können mit dem Threading-Backend so ziemlich alles sein, aber die Ergebnisse müssen mit dem Multiprocessing-Backend serialisierbar sein.


Dask bietet auch ähnliche Funktionen. Es ist möglicherweise vorzuziehen, wenn Sie mit Kerndaten arbeiten oder versuchen, komplexere Berechnungen zu parallelisieren.

Daniel Mahler
quelle
Ich sehe keinen Mehrwert, um den Akku einschließlich Multiprozessor zu nutzen. Ich würde wetten, dass Joblib es unter der Haube benutzt.
Xavier Combelle
1
Es muss erwähnt werden, dass Joblib keine Zauberei ist, das threadingBackend unter dem GIL-Engpass leidet und das multiprocessingBackend aufgrund der Serialisierung aller Parameter und Rückgabewerte einen hohen Overhead verursacht. In dieser Antwort finden Sie Informationen zur Parallelverarbeitung in Python.
Jakub Klinkovský
Ich kann keine Kombination aus Funktionskomplexität und Anzahl der Iterationen finden, für die die Joblib schneller wäre als eine for-Schleife. Für mich hat es die gleiche Geschwindigkeit, wenn n_jobs = 1, und ist in allen anderen Fällen viel langsamer
Aleksejs Fomins
@AleksejsFomins Threadbasierte Parallelität hilft nicht für Code, der die GIL nicht freigibt, aber eine signifikante Anzahl, insbesondere Datenwissenschaft oder numerische Bibliotheken. Ansonsten brauchen Sie Mutiprocessing, Jobli unterstützt beides. Das Multiprocessing-Modul verfügt jetzt auch über eine Parallele map, die Sie direkt verwenden können. Auch wenn Sie mkl compiled numpy verwenden, werden vektorisierte Operationen automatisch parallelisiert, ohne dass Sie etwas unternehmen. Die Nummer in Ananconda ist standardmäßig mkl-aktiviert. Es gibt jedoch keine universelle Lösung. Joblib ist sehr unkompliziert und es gab im Jahr 2015 weniger Probleme.
Daniel Mahler
Danke für deinen Rat. Ich erinnere mich, dass ich zuvor Multiprocessing ausprobiert und sogar ein paar Posts geschrieben habe, weil es nicht wie erwartet skaliert hat. Vielleicht sollte ich es mir noch einmal ansehen
Aleksejs Fomins
9

Was Sie suchen, ist Numba , das eine for-Schleife automatisch parallelisieren kann. Aus ihrer Dokumentation

from numba import jit, prange

@jit
def parallel_sum(A):
    sum = 0.0
    for i in prange(A.shape[0]):
        sum += A[i]

    return sum
LKlevin
quelle
8

Ohne etwas Besonderes anzunehmen my_function Auswahl , multiprocessing.Pool().map()ist eine gute Vermutung für die Parallelisierung solcher einfachen Schleifen. joblib, dask, mpiBerechnungen oder numbain anderen Antworten vorgeschlagen wie nicht sieht keinen Vorteil für solche Anwendungsfälle und fügen nutzlos Abhängigkeiten zu bringen (summieren sie sind übertrieben). Es ist unwahrscheinlich, dass die in einer anderen Antwort vorgeschlagene Verwendung von Threading eine gute Lösung darstellt, da Sie mit der GIL-Interaktion Ihres Codes vertraut sein müssen oder Ihr Code hauptsächlich Eingabe / Ausgabe ausführen sollte.

Das numbakönnte eine gute Idee sein, um sequentiellen reinen Python-Code zu beschleunigen, aber ich glaube, dass dies außerhalb des Rahmens der Frage liegt.

import multiprocessing
import numpy as np

if __name__ == "__main__":
   #the previous line is necessary under windows to not execute 
   # main module on each child under windows

   X = np.random.normal(size=(10, 3))
   F = np.zeros((10, ))

   pool = multiprocessing.Pool(processes=16)
   # if number of processes is not specified, it uses the number of core
   F[:] = pool.map(my_function, (X[i,:] for i in range(10)) )

Es gibt jedoch einige Einschränkungen (die jedoch die meisten Anwendungen nicht beeinträchtigen sollten):

  • Unter Windows gibt es keine Gabelunterstützung. Daher wird beim Start eines jeden Kindes ein Interpreter mit Hauptmodul gestartet, der möglicherweise einen Overhead verursacht if __name__ == "__main__"
  • Die Argumente und die Ergebnisse von my_function sind zu kurz und zu kurz gefasst. Möglicherweise ist der Aufwand zu hoch. Reduzieren Sie diesen Aufwand in dieser Antwort https://stackoverflow.com/a/37072511/128629 . Es macht auch nicht auswählbare Objekte unbrauchbar
  • my_functionsollten nicht von gemeinsamen Zuständen abhängen, wie z. B. der Kommunikation mit globalen Variablen, da Zustände nicht zwischen Prozessen geteilt werden. reine Funktionen (Funktionen im mathematischen Sinne) sind Beispiele für Funktionen, die keine Zustände teilen
Xavier Combelle
quelle
6

Mein Eindruck von parfor ist, dass MATLAB Implementierungsdetails kapselt, sodass sowohl Shared-Memory-Parallelität (wie gewünscht) als auch Distributed-Memory-Parallelität (wenn Sie einen Distributed-Computing-Server von MATLAB ausführen ) verwendet werden können.

Wenn Sie Parallelität mit gemeinsam genutztem Speicher wünschen und eine Art Task-Parallel-Schleife ausführen, ist das Multiprocessing-Standardbibliothekspaket wahrscheinlich das, was Sie möchten, möglicherweise mit einem netten Front-End wie joblib , wie in Dougs Beitrag erwähnt. Die Standardbibliothek wird nicht verschwinden und wird gewartet, sodass das Risiko gering ist.

Es gibt auch andere Optionen, wie Parallel Python und IPythons parallele Funktionen . Ein kurzer Blick auf Parallel Python lässt mich glauben, dass es dem Geist von parfor näher kommt, indem die Bibliothek Details für den verteilten Fall kapselt, aber die Kosten dafür sind, dass Sie ihr Ökosystem übernehmen müssen. Die Kosten für die Verwendung von IPython sind ähnlich. Sie müssen die IPython-Methode anwenden, die es Ihnen wert sein kann oder nicht.

Wenn Ihnen der verteilte Speicher am Herzen liegt , empfehle ich mpi4py . Lisandro Dalcin leistet großartige Arbeit und mpi4py wird in den PETSc-Python-Wrappern verwendet. Wie Multiprocessing ist es eine Schnittstelle für Parallelität auf niedriger (er) Ebene als parfor, die jedoch wahrscheinlich eine Weile andauert.

Geoff Oxberry
quelle
Danke, @Geoff. Haben Sie Erfahrung mit diesen Bibliotheken? Vielleicht werde ich versuchen, mpi4py auf einem gemeinsam genutzten Speichergerät / Multicore-Prozessor zu verwenden.
Paul G. Constantine
@PaulGConstantine Ich habe mpi4py erfolgreich verwendet. Es ist ziemlich schmerzlos, wenn Sie mit MPI vertraut sind. Ich habe Multiprocessing nicht verwendet, aber ich habe es Kollegen empfohlen, die sagten, es funktioniere gut für sie. Ich habe auch IPython verwendet, aber nicht die Parallelitätsfunktionen, sodass ich nicht sagen kann, wie gut es funktioniert.
Geoff Oxberry
1
Aron hat ein nettes mpi4py-Tutorial, das er für den PyHPC-Kurs bei Supercomputing vorbereitet hat: github.com/pyHPC/pyhpc-tutorial
Matt Knepley
4

Bevor ich nach einem "Black-Box" -Tool suche, mit dem parallele "generische" Python-Funktionen ausgeführt werden können, sollte analysiert werden, wie my_function()die Parallelisierung von Hand erfolgen kann.

Vergleichen Sie zuerst die Ausführungszeit mit my_function(v)dem forOverhead von Python- Schleifen: [C] Python- forSchleifen sind ziemlich langsam, sodass der Zeitaufwand my_function()vernachlässigbar sein kann.

>>> timeit.timeit('pass', number=1000000)
0.01692986488342285
>>> timeit.timeit('for i in range(10): pass', number=1000000)
0.47521495819091797
>>> timeit.timeit('for i in xrange(10): pass', number=1000000)
0.42337894439697266

Zweite Prüfung, ob es eine einfache Vektorimplementierung gibt my_function(v), die keine Schleifen erfordert:F[:] = my_vector_function(X)

(Diese beiden ersten Punkte sind ziemlich trivial, verzeihen Sie mir, wenn ich sie hier nur der Vollständigkeit halber erwähnte.)

Der dritte und wichtigste Punkt ist, zumindest für CPython-Implementierungen, zu prüfen, ob die my_functionmeiste Zeit innerhalb oder außerhalb der globalen Interpretersperre oder GIL verbracht wird . Wenn Zeit außerhalb der GIL verbracht wird, sollte das threadingStandardbibliotheksmodul verwendet werden. ( Hier ein Beispiel). Übrigens könnte man sich das Schreiben my_function()als C-Erweiterung vorstellen, nur um die GIL zu veröffentlichen.

Wenn my_function()die GIL nicht freigegeben wird, kann das multiprocessingModul verwendet werden .

Referenzen: Python-Dokumentation zur gleichzeitigen Ausführung und Numpy / Scipy-Einführung zur parallelen Verarbeitung .

Stefano M
quelle
2

Du kannst es mit Julia versuchen. Es ist ziemlich nah an Python und hat viele MATLAB-Konstrukte. Die Übersetzung hier ist:

F = @parallel (vcat) for i in 1:10
    my_function(randn(3))
end

Dadurch werden auch die Zufallszahlen parallelisiert und die Ergebnisse am Ende während der Reduktion nur verkettet. Das nutzt Multiprocessing (Sie müssen also addprocs(N)vor der Verwendung Prozesse hinzufügen, und dies funktioniert auch auf mehreren Knoten auf einem HPC, wie in diesem Blogbeitrag gezeigt ).

Sie können pmapstattdessen auch Folgendes verwenden:

F = pmap((i)->my_function(randn(3)),1:10)

Wenn Sie Thread-Parallelität wünschen, können Sie verwenden Threads.@threads(stellen Sie jedoch sicher, dass Sie den Algorithmus thread-sicher machen). Legen Sie vor dem Öffnen von Julia die Umgebungsvariable JULIA_NUM_THREADS fest.

Ftmp = [Float64[] for i in Threads.nthreads()]
Threads.@threads for i in 1:10
    push!(Ftmp[Threads.threadid()],my_function(randn(3)))
end
F = vcat(Ftmp...)

Hier erstelle ich für jeden Thread ein separates Array, damit sie beim Hinzufügen zum Array nicht in Konflikt geraten und die Arrays anschließend einfach verketten. Threading ist ziemlich neu, daher gibt es im Moment nur die direkte Verwendung von Threads, aber ich bin sicher, dass Thread-Verkleinerungen und Maps genauso hinzugefügt werden, wie es für die Mehrfachverarbeitung war.

Chris Rackauckas
quelle
0

Ich empfehle, die parallelen und verzögerten Funktionen der JobLib-Bibliothek zu verwenden. Verwenden Sie das Modul "tempfile", um temporären gemeinsamen Speicher für große Arrays zu erstellen. Beispiele und Verwendung finden Sie unter https://pythonhosted.org/joblib/parallel.html

Ramkumar
quelle