Gibt es eine einfache prozessbasierte parallele Map für Python?

73

Ich suche eine einfache prozessbasierte parallele Map für Python, also eine Funktion

parmap(function,[data])

Das würde die Funktion für jedes Element von [Daten] in einem anderen Prozess ausführen (nun, auf einem anderen Kern, aber AFAIK, die einzige Möglichkeit, Dinge auf verschiedenen Kernen in Python auszuführen, besteht darin, mehrere Interpreter zu starten) und eine Liste der Ergebnisse zurückgeben .

Gibt es so etwas? Ich hätte gerne etwas Einfaches , also wäre ein einfaches Modul schön. Wenn es so etwas nicht gibt, werde ich mich natürlich mit einer großen Bibliothek zufrieden geben: - /

static_rtti
quelle

Antworten:

127

Ich denke, was Sie brauchen, ist die Map-Methode in Multiprocessing.Pool () :

map (func, iterable [, chunksize])

A parallel equivalent of the map() built-in function (it supports only
one iterable argument though). It blocks till the result is ready.

This method chops the iterable into a number of chunks which it submits to the 
process pool as separate tasks. The (approximate) size of these chunks can be 
specified by setting chunksize to a positive integ

Wenn Sie diese Funktion beispielsweise zuordnen möchten:

def f(x):
    return x**2

Um (10) zu erreichen, können Sie dies mit der integrierten Funktion map () tun:

map(f, range(10))

oder Verwenden der Methodenzuordnung () eines multiprocessing.Pool () -Objekts:

import multiprocessing
pool = multiprocessing.Pool()
print pool.map(f, range(10))
Flávio Amieiro
quelle
6
Wenn Sie dies über ein langlebiges Programm aufrufen, rufen Sie unbedingt auf pool.close(idealerweise im finallyBlock eines Gehäuses try/finally). Andernfalls kann der Pool untergeordnete Prozesse möglicherweise nicht bereinigen und es kann zu Zombie-Prozessen kommen. Siehe bugs.python.org/issue19675
rogueleaderr
8
@rogueleaderr Wäre es nicht idiomatischer zu verwenden with?
CodeMonkey
1
Guter Punkt @CodeMonkey! Das erste Beispiel in den offiziellen Dokumenten verwendet, withdamit die Bereinigung gut funktioniert.
Rogueleaderr
10
PicklingError: Can't pickle <function <lambda> at 0x121572bf8>: attribute lookup <lambda> on __main__ failedwie kommt es, dass es nicht funktionieren kann lambda?
O.rka
2
Ich fand hier ein wirklich gutes Beispiel, das etwas komplizierter ist als das vorherige: blog.adeel.io/2016/11/06/parallelize-pandas-map-or-apply
Rafael Valero
5

Dies kann elegant mit Ray erfolgen , einem System, mit dem Sie Ihren Python-Code einfach parallelisieren und verteilen können.

Um Ihr Beispiel zu parallelisieren, müssen Sie Ihre Kartenfunktion mit dem @ray.remoteDekorator definieren und dann mit aufrufen .remote. Dadurch wird sichergestellt, dass jede Instanz der Remote-Funktion in einem anderen Prozess ausgeführt wird.

import time
import ray

ray.init()

# Define the function you want to apply map on, as remote function. 
@ray.remote
def f(x):
    # Do some work...
    time.sleep(1)
    return x*x

# Define a helper parmap(f, list) function.
# This function executes a copy of f() on each element in "list".
# Each copy of f() runs in a different process.
# Note f.remote(x) returns a future of its result (i.e., 
# an identifier of the result) rather than the result itself.  
def parmap(f, list):
    return [f.remote(x) for x in list]

# Call parmap() on a list consisting of first 5 integers.
result_ids = parmap(f, range(1, 6))

# Get the results
results = ray.get(result_ids)
print(results)

Dies wird gedruckt:

[1, 4, 9, 16, 25]

und es endet ungefähr in len(list)/p(auf die nächste ganze pZahl aufgerundet), wo sich die Anzahl der Kerne auf Ihrer Maschine befindet. Unter der Annahme einer Maschine mit 2 Kernen wird unser Beispiel 5/2aufgerundet ausgeführt, dh in ungefähr einer 3Sekunde.

Die Verwendung von Ray gegenüber dem Multiprocessing- Modul bietet eine Reihe von Vorteilen . Insbesondere wird derselbe Code sowohl auf einem einzelnen Computer als auch auf einem Cluster von Computern ausgeführt. Weitere Vorteile von Ray finden Sie in diesem verwandten Beitrag .

Ion Stoica
quelle
4

Für diejenigen, die nach einem Python-Äquivalent von Rs mclapply () suchen, ist hier meine Implementierung. Es ist eine Verbesserung der folgenden zwei Beispiele:

Es kann auf Zuordnungsfunktionen mit einzelnen oder mehreren Argumenten angewendet werden.

import numpy as np, pandas as pd
from scipy import sparse
import functools, multiprocessing
from multiprocessing import Pool

num_cores = multiprocessing.cpu_count()

def parallelize_dataframe(df, func, U=None, V=None):

    #blockSize = 5000
    num_partitions = 5 # int( np.ceil(df.shape[0]*(1.0/blockSize)) )
    blocks = np.array_split(df, num_partitions)

    pool = Pool(num_cores)
    if V is not None and U is not None:
        # apply func with multiple arguments to dataframe (i.e. involves multiple columns)
        df = pd.concat(pool.map(functools.partial(func, U=U, V=V), blocks))
    else:
        # apply func with one argument to dataframe (i.e. involves single column)
        df = pd.concat(pool.map(func, blocks))

    pool.close()
    pool.join()

    return df

def square(x):
    return x**2

def test_func(data):
    print("Process working on: ", data.shape)
    data["squareV"] = data["testV"].apply(square)
    return data

def vecProd(row, U, V):
    return np.sum( np.multiply(U[int(row["obsI"]),:], V[int(row["obsJ"]),:]) )

def mProd_func(data, U, V):
    data["predV"] = data.apply( lambda row: vecProd(row, U, V), axis=1 )
    return data

def generate_simulated_data():

    N, D, nnz, K = [302, 184, 5000, 5]
    I = np.random.choice(N, size=nnz, replace=True)
    J = np.random.choice(D, size=nnz, replace=True)
    vals = np.random.sample(nnz)

    sparseY = sparse.csc_matrix((vals, (I, J)), shape=[N, D])

    # Generate parameters U and V which could be used to reconstruct the matrix Y
    U = np.random.sample(N*K).reshape([N,K])
    V = np.random.sample(D*K).reshape([D,K])

    return sparseY, U, V

def main():
    Y, U, V = generate_simulated_data()

    # find row, column indices and obvseved values for sparse matrix Y
    (testI, testJ, testV) = sparse.find(Y)

    colNames = ["obsI", "obsJ", "testV", "predV", "squareV"]
    dtypes = {"obsI":int, "obsJ":int, "testV":float, "predV":float, "squareV": float}

    obsValDF = pd.DataFrame(np.zeros((len(testV), len(colNames))), columns=colNames)
    obsValDF["obsI"] = testI
    obsValDF["obsJ"] = testJ
    obsValDF["testV"] = testV
    obsValDF = obsValDF.astype(dtype=dtypes)

    print("Y.shape: {!s}, #obsVals: {}, obsValDF.shape: {!s}".format(Y.shape, len(testV), obsValDF.shape))

    # calculate the square of testVals    
    obsValDF = parallelize_dataframe(obsValDF, test_func)

    # reconstruct prediction of testVals using parameters U and V
    obsValDF = parallelize_dataframe(obsValDF, mProd_func, U, V)

    print("obsValDF.shape after reconstruction: {!s}".format(obsValDF.shape))
    print("First 5 elements of obsValDF:\n", obsValDF.iloc[:5,:])

if __name__ == '__main__':
    main()
Guter Wille
quelle
3

Ich weiß, dass dies ein alter Beitrag ist, aber für den Fall, dass ich ein Tool geschrieben habe, um diesen super, super einfachen Parmapper zu erstellen (ich nenne ihn in meiner Verwendung Parmap, aber der Name wurde übernommen).

Es übernimmt einen Großteil der Einrichtung und Dekonstruktion von Prozessen und fügt Tonnen von Funktionen hinzu. In grober Reihenfolge der Wichtigkeit

  • Kann Lambda und andere nicht auswählbare Funktionen übernehmen
  • Kann Starmap und andere ähnliche Aufrufmethoden anwenden, um die direkte Verwendung zu vereinfachen.
  • Kann zwischen Threads und / oder Prozessen aufgeteilt werden
  • Enthält Funktionen wie Fortschrittsbalken

Es verursacht zwar geringe Kosten, ist aber für die meisten Anwendungen vernachlässigbar.

Ich hoffe, Sie finden es nützlich.

(Hinweis: Wie mapin Python 3+ wird ein iterables Element zurückgegeben. Wenn Sie also erwarten, dass alle Ergebnisse sofort durchlaufen werden, verwenden Sie list())

Justin Winokur
quelle