Ich versuche, ein großes numerisches Problem zu lösen, das viele Unterprobleme beinhaltet, und ich verwende Pythons Multiprocessing-Modul (speziell Pool.map), um verschiedene unabhängige Unterprobleme auf verschiedene Kerne aufzuteilen. Jedes Unterproblem beinhaltet das Berechnen vieler Unter-Unterprobleme, und ich versuche, diese Ergebnisse effektiv zu speichern, indem ich sie in einer Datei speichere, wenn sie noch von keinem Prozess berechnet wurden. Andernfalls überspringen Sie die Berechnung und lesen Sie einfach die Ergebnisse aus der Datei.
Ich habe Parallelitätsprobleme mit den Dateien: Verschiedene Prozesse prüfen manchmal, ob ein Sub-Subproblem bereits berechnet wurde (indem sie nach der Datei suchen, in der die Ergebnisse gespeichert werden). Stellen Sie sicher, dass dies nicht der Fall ist. Führen Sie die Berechnung aus. Versuchen Sie dann, die Ergebnisse gleichzeitig in dieselbe Datei zu schreiben. Wie vermeide ich es, solche Kollisionen zu schreiben?
quelle
multiprocessing.Lock
zur Synchronisierung mehrerer Prozesse an.Antworten:
@ GP89 erwähnte eine gute Lösung. Verwenden Sie eine Warteschlange, um die Schreibaufgaben an einen dedizierten Prozess zu senden, der ausschließlich Schreibzugriff auf die Datei hat. Alle anderen Mitarbeiter haben nur Lesezugriff. Dadurch werden Kollisionen vermieden. Hier ist ein Beispiel, das apply_async verwendet, aber auch mit map funktioniert:
import multiprocessing as mp import time fn = 'c:/temp/temp.txt' def worker(arg, q): '''stupidly simulates long running process''' start = time.clock() s = 'this is a test' txt = s for i in range(200000): txt += s done = time.clock() - start with open(fn, 'rb') as f: size = len(f.read()) res = 'Process' + str(arg), str(size), done q.put(res) return res def listener(q): '''listens for messages on the q, writes to file. ''' with open(fn, 'w') as f: while 1: m = q.get() if m == 'kill': f.write('killed') break f.write(str(m) + '\n') f.flush() def main(): #must use Manager queue here, or will not work manager = mp.Manager() q = manager.Queue() pool = mp.Pool(mp.cpu_count() + 2) #put listener to work first watcher = pool.apply_async(listener, (q,)) #fire off workers jobs = [] for i in range(80): job = pool.apply_async(worker, (i, q)) jobs.append(job) # collect results from the workers through the pool result queue for job in jobs: job.get() #now we are done, kill the listener q.put('kill') pool.close() pool.join() if __name__ == "__main__": main()
quelle
pool.join()
unten ein hinzufügenpool.close()
. Andernfalls würden meine Mitarbeiter vor dem Hörer fertig sein und der Prozess würde einfach aufhören.mp.cpu_count() + 2
beim Einstellen der Anzahl der Prozesse?Es scheint mir, dass Sie verwenden müssen,
Manager
um Ihre Ergebnisse vorübergehend in einer Liste zu speichern und dann die Ergebnisse aus der Liste in eine Datei zu schreiben. Verwenden Sie außerdemstarmap
, um das zu verarbeitende Objekt und die verwaltete Liste zu übergeben. Der erste Schritt besteht darin, den Parameter zu erstellen, an den übergeben werden sollstarmap
, einschließlich der verwalteten Liste.from multiprocessing import Manager from multiprocessing import Pool import pandas as pd def worker(row, param): # do something here and then append it to row x = param**2 row.append(x) if __name__ == '__main__': pool_parameter = [] # list of objects to process with Manager() as mgr: row = mgr.list([]) # build list of parameters to send to starmap for param in pool_parameter: params.append([row,param]) with Pool() as p: p.starmap(worker, params)
Ab diesem Punkt müssen Sie entscheiden, wie Sie mit der Liste umgehen möchten. Wenn Sie Tonnen von RAM und einen riesigen Datensatz haben, können Sie mit Pandas verketten. Dann können Sie die Datei sehr einfach als CSV oder Pickle speichern.
df = pd.concat(row, ignore_index=True) df.to_pickle('data.pickle') df.to_csv('data.csv')
quelle