Ich suche eine einfache prozessbasierte parallele Map für Python, also eine Funktion
parmap(function,[data])
Das würde die Funktion für jedes Element von [Daten] in einem anderen Prozess ausführen (nun, auf einem anderen Kern, aber AFAIK, die einzige Möglichkeit, Dinge auf verschiedenen Kernen in Python auszuführen, besteht darin, mehrere Interpreter zu starten) und eine Liste der Ergebnisse zurückgeben .
Gibt es so etwas? Ich hätte gerne etwas Einfaches , also wäre ein einfaches Modul schön. Wenn es so etwas nicht gibt, werde ich mich natürlich mit einer großen Bibliothek zufrieden geben: - /
quelle
pool.close
(idealerweise imfinally
Block eines Gehäusestry/finally
). Andernfalls kann der Pool untergeordnete Prozesse möglicherweise nicht bereinigen und es kann zu Zombie-Prozessen kommen. Siehe bugs.python.org/issue19675with
?with
damit die Bereinigung gut funktioniert.PicklingError: Can't pickle <function <lambda> at 0x121572bf8>: attribute lookup <lambda> on __main__ failed
wie kommt es, dass es nicht funktionieren kannlambda
?Dies kann elegant mit Ray erfolgen , einem System, mit dem Sie Ihren Python-Code einfach parallelisieren und verteilen können.
Um Ihr Beispiel zu parallelisieren, müssen Sie Ihre Kartenfunktion mit dem
@ray.remote
Dekorator definieren und dann mit aufrufen.remote
. Dadurch wird sichergestellt, dass jede Instanz der Remote-Funktion in einem anderen Prozess ausgeführt wird.import time import ray ray.init() # Define the function you want to apply map on, as remote function. @ray.remote def f(x): # Do some work... time.sleep(1) return x*x # Define a helper parmap(f, list) function. # This function executes a copy of f() on each element in "list". # Each copy of f() runs in a different process. # Note f.remote(x) returns a future of its result (i.e., # an identifier of the result) rather than the result itself. def parmap(f, list): return [f.remote(x) for x in list] # Call parmap() on a list consisting of first 5 integers. result_ids = parmap(f, range(1, 6)) # Get the results results = ray.get(result_ids) print(results)
Dies wird gedruckt:
[1, 4, 9, 16, 25]
und es endet ungefähr in
len(list)/p
(auf die nächste ganzep
Zahl aufgerundet), wo sich die Anzahl der Kerne auf Ihrer Maschine befindet. Unter der Annahme einer Maschine mit 2 Kernen wird unser Beispiel5/2
aufgerundet ausgeführt, dh in ungefähr einer3
Sekunde.Die Verwendung von Ray gegenüber dem Multiprocessing- Modul bietet eine Reihe von Vorteilen . Insbesondere wird derselbe Code sowohl auf einem einzelnen Computer als auch auf einem Cluster von Computern ausgeführt. Weitere Vorteile von Ray finden Sie in diesem verwandten Beitrag .
quelle
Für diejenigen, die nach einem Python-Äquivalent von Rs mclapply () suchen, ist hier meine Implementierung. Es ist eine Verbesserung der folgenden zwei Beispiele:
Es kann auf Zuordnungsfunktionen mit einzelnen oder mehreren Argumenten angewendet werden.
import numpy as np, pandas as pd from scipy import sparse import functools, multiprocessing from multiprocessing import Pool num_cores = multiprocessing.cpu_count() def parallelize_dataframe(df, func, U=None, V=None): #blockSize = 5000 num_partitions = 5 # int( np.ceil(df.shape[0]*(1.0/blockSize)) ) blocks = np.array_split(df, num_partitions) pool = Pool(num_cores) if V is not None and U is not None: # apply func with multiple arguments to dataframe (i.e. involves multiple columns) df = pd.concat(pool.map(functools.partial(func, U=U, V=V), blocks)) else: # apply func with one argument to dataframe (i.e. involves single column) df = pd.concat(pool.map(func, blocks)) pool.close() pool.join() return df def square(x): return x**2 def test_func(data): print("Process working on: ", data.shape) data["squareV"] = data["testV"].apply(square) return data def vecProd(row, U, V): return np.sum( np.multiply(U[int(row["obsI"]),:], V[int(row["obsJ"]),:]) ) def mProd_func(data, U, V): data["predV"] = data.apply( lambda row: vecProd(row, U, V), axis=1 ) return data def generate_simulated_data(): N, D, nnz, K = [302, 184, 5000, 5] I = np.random.choice(N, size=nnz, replace=True) J = np.random.choice(D, size=nnz, replace=True) vals = np.random.sample(nnz) sparseY = sparse.csc_matrix((vals, (I, J)), shape=[N, D]) # Generate parameters U and V which could be used to reconstruct the matrix Y U = np.random.sample(N*K).reshape([N,K]) V = np.random.sample(D*K).reshape([D,K]) return sparseY, U, V def main(): Y, U, V = generate_simulated_data() # find row, column indices and obvseved values for sparse matrix Y (testI, testJ, testV) = sparse.find(Y) colNames = ["obsI", "obsJ", "testV", "predV", "squareV"] dtypes = {"obsI":int, "obsJ":int, "testV":float, "predV":float, "squareV": float} obsValDF = pd.DataFrame(np.zeros((len(testV), len(colNames))), columns=colNames) obsValDF["obsI"] = testI obsValDF["obsJ"] = testJ obsValDF["testV"] = testV obsValDF = obsValDF.astype(dtype=dtypes) print("Y.shape: {!s}, #obsVals: {}, obsValDF.shape: {!s}".format(Y.shape, len(testV), obsValDF.shape)) # calculate the square of testVals obsValDF = parallelize_dataframe(obsValDF, test_func) # reconstruct prediction of testVals using parameters U and V obsValDF = parallelize_dataframe(obsValDF, mProd_func, U, V) print("obsValDF.shape after reconstruction: {!s}".format(obsValDF.shape)) print("First 5 elements of obsValDF:\n", obsValDF.iloc[:5,:]) if __name__ == '__main__': main()
quelle
Ich weiß, dass dies ein alter Beitrag ist, aber für den Fall, dass ich ein Tool geschrieben habe, um diesen super, super einfachen Parmapper zu erstellen (ich nenne ihn in meiner Verwendung Parmap, aber der Name wurde übernommen).
Es übernimmt einen Großteil der Einrichtung und Dekonstruktion von Prozessen und fügt Tonnen von Funktionen hinzu. In grober Reihenfolge der Wichtigkeit
Es verursacht zwar geringe Kosten, ist aber für die meisten Anwendungen vernachlässigbar.
Ich hoffe, Sie finden es nützlich.
(Hinweis: Wie
map
in Python 3+ wird ein iterables Element zurückgegeben. Wenn Sie also erwarten, dass alle Ergebnisse sofort durchlaufen werden, verwenden Sielist()
)quelle