Wie mache ich parallele Programmierung in Python?

141

Für C ++ können wir OpenMP verwenden, um parallel zu programmieren. OpenMP funktioniert jedoch nicht für Python. Was soll ich tun, wenn ich einige Teile meines Python-Programms parallel schalten möchte?

Die Struktur des Codes kann wie folgt betrachtet werden:

solve1(A)
solve2(B)

Wo solve1und solve2sind zwei unabhängige Funktionen. Wie kann diese Art von Code parallel statt nacheinander ausgeführt werden, um die Laufzeit zu verkürzen? Hoffe jemand kann mir helfen. Vielen Dank im Voraus. Der Code lautet:

def solve(Q, G, n):
    i = 0
    tol = 10 ** -4

    while i < 1000:
        inneropt, partition, x = setinner(Q, G, n)
        outeropt = setouter(Q, G, n)

        if (outeropt - inneropt) / (1 + abs(outeropt) + abs(inneropt)) < tol:
            break

        node1 = partition[0]
        node2 = partition[1]

        G = updateGraph(G, node1, node2)

        if i == 999:
            print "Maximum iteration reaches"
    print inneropt

Wobei Setinner und Setouter zwei unabhängige Funktionen sind. Dort möchte ich parallel ...

ilovecp3
quelle
31
Schauen Sie sich Multiprocessing an . Hinweis: Pythons Threads sind nicht für CPU-gebundene Aufgaben geeignet, sondern nur für E / A-gebundene Aufgaben.
9000
4
@ 9000 +100 Internets für die Erwähnung der CPU- und E / A-abhängigen Aufgaben.
Hyperboreus
@ 9000 Eigentlich sind Threads meines Wissens überhaupt nicht für CPU-gebundene Aufgaben geeignet! Prozesse sind der richtige Weg, wenn Sie echte CPU-gebundene Aufgaben ausführen.
Omar Al-Ithawi
6
@OmarIthawi: Warum, Threads funktionieren gut, wenn Sie viele CPU-Kerne haben (wie jetzt üblich). Dann kann Ihr Prozess mehrere Threads ausführen, die alle diese Kerne parallel laden und implizit gemeinsame Daten zwischen ihnen austauschen (dh ohne einen expliziten gemeinsam genutzten Speicherbereich oder prozessübergreifendes Messaging).
9000
1
@ user2134774: Nun ja, mein zweiter Kommentar macht wenig Sinn. Wahrscheinlich können die einzigen C-Erweiterungen, die die GIL freigeben, davon profitieren. zB Teile von NumPy und Pandas machen das. In anderen Fällen ist es falsch (aber ich kann es jetzt nicht bearbeiten).
9000

Antworten:

162

Sie können das Multiprozessor- Modul verwenden. Für diesen Fall könnte ich einen Verarbeitungspool verwenden:

from multiprocessing import Pool
pool = Pool()
result1 = pool.apply_async(solve1, [A])    # evaluate "solve1(A)" asynchronously
result2 = pool.apply_async(solve2, [B])    # evaluate "solve2(B)" asynchronously
answer1 = result1.get(timeout=10)
answer2 = result2.get(timeout=10)

Dadurch entstehen Prozesse, die allgemeine Arbeit für Sie leisten können. Da wir nicht bestanden haben processes, wird ein Prozess für jeden CPU-Kern auf Ihrem Computer erzeugt. Jeder CPU-Kern kann einen Prozess gleichzeitig ausführen.

Wenn Sie eine Liste einer einzelnen Funktion zuordnen möchten, gehen Sie folgendermaßen vor:

args = [A, B]
results = pool.map(solve1, args)

Verwenden Sie keine Threads, da die GIL alle Vorgänge für Python-Objekte sperrt.

Matt Williamson
quelle
1
Akzeptiert pool.mapauch Wörterbücher als Argumente? Oder nur einfache Listen?
Die Bndr
Nur Listen, denke ich. Sie können jedoch auch dict.items () übergeben, eine Liste der wichtigsten Wertetupel
Matt Williamson,
Leider endet dies mit einem "nicht zerhackbaren Typ:" Listen "-Fehler
The Bndr
zusätzlich zu meinem letzten Kommentar: `dict.items ()` work. Der Fehler tritt auf, weil ich die Behandlung der Variablen Insight der Prozessfunktion ändern musste. Leider war die Fehlermeldung nicht sehr hilfreich ... Also: Danke für Ihren Hinweis. :-)
The Bndr
2
Was ist hier Timeout?
Gamma
26

Dies kann sehr elegant mit Ray gemacht werden .

Um Ihr Beispiel zu parallelisieren, müssen Sie Ihre Funktionen mit dem @ray.remoteDekorator definieren und sie dann mit aufrufen .remote.

import ray

ray.init()

# Define the functions.

@ray.remote
def solve1(a):
    return 1

@ray.remote
def solve2(b):
    return 2

# Start two tasks in the background.
x_id = solve1.remote(0)
y_id = solve2.remote(1)

# Block until the tasks are done and get the results.
x, y = ray.get([x_id, y_id])

Dies hat eine Reihe von Vorteilen gegenüber dem Multiprozessor- Modul.

  1. Der gleiche Code wird sowohl auf einem Multicore-Computer als auch auf einem Cluster von Computern ausgeführt.
  2. Prozesse teilen Daten effizient durch gemeinsam genutzten Speicher und Serialisierung ohne Kopie .
  3. Fehlermeldungen werden gut verbreitet.
  4. Diese Funktionsaufrufe können zusammengesetzt werden, z.

    @ray.remote
    def f(x):
        return x + 1
    
    x_id = f.remote(1)
    y_id = f.remote(x_id)
    z_id = f.remote(y_id)
    ray.get(z_id)  # returns 4
  5. Klassen können nicht nur Funktionen remote aufrufen, sondern auch als Akteure remote instanziiert werden .

Beachten Sie, dass Ray ein Framework ist, an dessen Entwicklung ich mitgewirkt habe.

Robert Nishihara
quelle
Ich erhalte immer wieder die Fehlermeldung "Es konnte keine Version gefunden werden, die die Anforderungen erfüllt. ray (aus Versionen
:)
2
Normalerweise bedeutet diese Art von Fehler, dass Sie ein Upgrade durchführen müssen pip. Ich würde vorschlagen, es zu versuchen pip install --upgrade pip. Wenn Sie überhaupt etwas verwenden müssen, sudoist es möglich, dass die Version pip, die Sie für die Installation verwenden, raynicht dieselbe ist, die aktualisiert wird. Sie können mit überprüfen pip --version. Außerdem wird Windows derzeit nicht unterstützt. Wenn Sie also unter Windows arbeiten, ist dies wahrscheinlich das Problem.
Robert Nishihara
1
Nur eine Anmerkung, dies dient hauptsächlich dazu, gleichzeitige Jobs auf mehrere Computer zu verteilen.
Matt Williamson
2
Es ist tatsächlich sowohl für den Einzelmaschinenfall als auch für die Clustereinstellung optimiert. Viele der Entwurfsentscheidungen (z. B. gemeinsamer Speicher, Serialisierung ohne Kopie) zielen darauf ab, einzelne Maschinen gut zu unterstützen.
Robert Nishihara
2
Es wäre großartig, wenn die Dokumente mehr darauf hinweisen würden. Durch das Lesen der Dokumente hatte ich das Gefühl, dass es nicht wirklich für den Einzelmaschinenfall gedacht war.
Schlitten
4

Die Lösung besteht, wie andere gesagt haben, darin, mehrere Prozesse zu verwenden. Welcher Rahmen besser geeignet ist, hängt jedoch von vielen Faktoren ab. Zusätzlich zu den bereits erwähnten gibt es auch charm4py und mpi4py (ich bin der Entwickler von charm4py).

Es gibt eine effizientere Möglichkeit, das obige Beispiel zu implementieren, als die Worker-Pool-Abstraktion zu verwenden. Die Hauptschleife sendet Gin jeder der 1000 Iterationen immer wieder dieselben Parameter (einschließlich des vollständigen Diagramms ) an die Mitarbeiter. Da sich mindestens ein Mitarbeiter in einem anderen Prozess befindet, müssen die Argumente kopiert und an die anderen Prozesse gesendet werden. Dies kann je nach Größe der Objekte sehr kostspielig sein. Stattdessen ist es sinnvoll, dass Mitarbeiter den Status speichern und einfach die aktualisierten Informationen senden.

In charm4py kann dies beispielsweise folgendermaßen geschehen:

class Worker(Chare):

    def __init__(self, Q, G, n):
        self.G = G
        ...

    def setinner(self, node1, node2):
        self.updateGraph(node1, node2)
        ...


def solve(Q, G, n):
    # create 2 workers, each on a different process, passing the initial state
    worker_a = Chare(Worker, onPE=0, args=[Q, G, n])
    worker_b = Chare(Worker, onPE=1, args=[Q, G, n])
    while i < 1000:
        result_a = worker_a.setinner(node1, node2, ret=True)  # execute setinner on worker A
        result_b = worker_b.setouter(node1, node2, ret=True)  # execute setouter on worker B

        inneropt, partition, x = result_a.get()  # wait for result from worker A
        outeropt = result_b.get()  # wait for result from worker B
        ...

Beachten Sie, dass wir für dieses Beispiel wirklich nur einen Mitarbeiter benötigen. Die Hauptschleife könnte eine der Funktionen ausführen und den Worker die andere ausführen lassen. Aber mein Code hilft, ein paar Dinge zu veranschaulichen:

  1. Worker A wird in Prozess 0 ausgeführt (wie die Hauptschleife). Während result_a.get()das Warten auf das Ergebnis blockiert ist, führt Worker A die Berechnung im selben Prozess durch.
  2. Argumente werden automatisch unter Bezugnahme auf Worker A übergeben, da sie sich im selben Prozess befinden (es ist kein Kopieren erforderlich).
Juan Galvez
quelle
2

In einigen Fällen ist es möglich, Schleifen mit Numba automatisch zu parallelisieren , obwohl dies nur mit einer kleinen Teilmenge von Python funktioniert:

from numba import njit, prange

@njit(parallel=True)
def prange_test(A):
    s = 0
    # Without "parallel=True" in the jit-decorator
    # the prange statement is equivalent to range
    for i in prange(A.shape[0]):
        s += A[i]
    return s

Leider scheint Numba nur mit Numpy-Arrays zu funktionieren, nicht jedoch mit anderen Python-Objekten. Theoretisch könnte es auch möglich sein , Python nach C ++ zu kompilieren und dann automatisch mit dem Intel C ++ - Compiler zu parallelisieren , obwohl ich dies noch nicht ausprobiert habe.

Anderson Green
quelle
2

Sie können die joblibBibliothek verwenden, um parallele Berechnungen und Mehrfachverarbeitungen durchzuführen.

from joblib import Parallel, delayed

Sie können einfach eine Funktion erstellen foo, die parallel ausgeführt werden soll, und basierend auf dem folgenden Code die parallele Verarbeitung implementieren:

output = Parallel(n_jobs=num_cores)(delayed(foo)(i) for i in input)

Wo num_coreskann aus der multiprocessingBibliothek wie folgt bezogen werden:

import multiprocessing

num_cores = multiprocessing.cpu_count()

Wenn Sie eine Funktion mit mehr als einem Eingabeargument haben und nur eines der Argumente durch eine Liste durchlaufen möchten, können Sie die partialFunktion aus der functoolsBibliothek wie folgt verwenden:

from joblib import Parallel, delayed
import multiprocessing
from functools import partial
def foo(arg1, arg2, arg3, arg4):
    '''
    body of the function
    '''
    return output
input = [11,32,44,55,23,0,100,...] # arbitrary list
num_cores = multiprocessing.cpu_count()
foo_ = partial(foo, arg2=arg2, arg3=arg3, arg4=arg4)
# arg1 is being fetched from input list
output = Parallel(n_jobs=num_cores)(delayed(foo_)(i) for i in input)

Sie können eine vollständige Erklärung der Python und R Multiprocessing mit paar Beispiele finden sich hier .

Vahab Najari
quelle