Ich habe eine einzelne große Textdatei, in der ich jede Zeile verarbeiten (einige Operationen ausführen) und sie in einer Datenbank speichern möchte. Da ein einzelnes einfaches Programm zu lange dauert, möchte ich, dass es über mehrere Prozesse oder Threads ausgeführt wird. Jeder Thread / Prozess sollte die VERSCHIEDENEN Daten (verschiedene Zeilen) aus dieser einzelnen Datei lesen und einige Operationen an ihren Daten (Zeilen) ausführen und sie in die Datenbank stellen, damit am Ende alle Daten verarbeitet werden und meine Die Datenbank wird mit den Daten gesichert, die ich benötige.
Aber ich bin nicht in der Lage herauszufinden, wie ich das angehen soll.
python
multithreading
multiprocessing
Pranavk
quelle
quelle
Antworten:
Was Sie suchen, ist ein Produzenten- / Konsumentenmuster
Grundlegendes Threading-Beispiel
Hier ist ein grundlegendes Beispiel für die Verwendung des Threading-Moduls (anstelle von Multiprocessing).
import threading import Queue import sys def do_work(in_queue, out_queue): while True: item = in_queue.get() # process result = item out_queue.put(result) in_queue.task_done() if __name__ == "__main__": work = Queue.Queue() results = Queue.Queue() total = 20 # start for workers for i in xrange(4): t = threading.Thread(target=do_work, args=(work, results)) t.daemon = True t.start() # produce data for i in xrange(total): work.put(i) work.join() # get the results for i in xrange(total): print results.get() sys.exit()
Sie würden das Dateiobjekt nicht mit den Threads teilen. Sie würden Arbeit für sie produzieren, indem Sie die Warteschlange mit Datenzeilen versorgen . Dann würde jeder Thread eine Zeile aufnehmen, verarbeiten und dann in die Warteschlange zurückgeben.
In das Multiprocessing-Modul sind einige erweiterte Funktionen integriert , mit denen Daten wie Listen und spezielle Arten von Warteschlangen gemeinsam genutzt werden können . Es gibt Kompromisse bei der Verwendung von Multiprocessing gegenüber Threads, und dies hängt davon ab, ob Ihre Arbeit an CPU oder E / A gebunden ist.
Grundlegendes Multiprocessing.Pool-Beispiel
Hier ist ein wirklich einfaches Beispiel für einen Multiprozessor-Pool
from multiprocessing import Pool def process_line(line): return "FOO: %s" % line if __name__ == "__main__": pool = Pool(4) with open('file.txt') as source_file: # chunk the work into batches of 4 lines at a time results = pool.map(process_line, source_file, 4) print results
Ein Pool ist ein Convenience-Objekt, das seine eigenen Prozesse verwaltet. Da eine geöffnete Datei über ihre Zeilen iterieren kann, können Sie sie an die übergeben
pool.map()
, die sie durchläuft und Zeilen an die Worker-Funktion übermittelt. Map blockiert und gibt das gesamte Ergebnis zurück, wenn es fertig ist. Beachten Sie, dass dies ein stark vereinfachtes Beispiel ist und dass diepool.map()
gesamte Datei auf einmal in den Speicher eingelesen wird, bevor die Arbeit verteilt wird. Wenn Sie große Dateien erwarten, denken Sie daran. Es gibt fortgeschrittenere Möglichkeiten, ein Produzenten- / Konsumenten-Setup zu entwerfen.Manueller "Pool" mit Limit und Zeilensortierung
Dies ist ein manuelles Beispiel für die Pool.map . Anstatt jedoch eine gesamte Iterable auf einmal zu verbrauchen, können Sie eine Warteschlangengröße festlegen, sodass Sie sie nur Stück für Stück so schnell wie möglich füttern. Ich habe auch die Zeilennummern hinzugefügt, damit Sie sie später verfolgen und auf sie verweisen können, wenn Sie möchten.
from multiprocessing import Process, Manager import time import itertools def do_work(in_queue, out_list): while True: item = in_queue.get() line_no, line = item # exit signal if line == None: return # fake work time.sleep(.5) result = (line_no, line) out_list.append(result) if __name__ == "__main__": num_workers = 4 manager = Manager() results = manager.list() work = manager.Queue(num_workers) # start for workers pool = [] for i in xrange(num_workers): p = Process(target=do_work, args=(work, results)) p.start() pool.append(p) # produce data with open("source.txt") as f: iters = itertools.chain(f, (None,)*num_workers) for num_and_line in enumerate(iters): work.put(num_and_line) for p in pool: p.join() # get the results # example: [(1, "foo"), (10, "bar"), (0, "start")] print sorted(results)
quelle
(None,) * num_workers
Erstellt ein Tupel von None-Werten, das der Größe der Anzahl der Worker entspricht. Dies sind die Sentinel-Werte, die jedem Thread sagen, dass er beendet werden soll, da keine Arbeit mehr vorhanden ist. Mit dieseritertools.chain
Funktion können Sie mehrere Sequenzen zu einer virtuellen Sequenz zusammenfügen, ohne etwas kopieren zu müssen. Wir erhalten also, dass zuerst die Zeilen in der Datei und dann die Werte None durchlaufen werden.Hier ist ein wirklich dummes Beispiel, das ich mir ausgedacht habe:
import os.path import multiprocessing def newlinebefore(f,n): f.seek(n) c=f.read(1) while c!='\n' and n > 0: n-=1 f.seek(n) c=f.read(1) f.seek(n) return n filename='gpdata.dat' #your filename goes here. fsize=os.path.getsize(filename) #size of file (in bytes) #break the file into 20 chunks for processing. nchunks=20 initial_chunks=range(1,fsize,fsize/nchunks) #You could also do something like: #initial_chunks=range(1,fsize,max_chunk_size_in_bytes) #this should work too. with open(filename,'r') as f: start_byte=sorted(set([newlinebefore(f,i) for i in initial_chunks])) end_byte=[i-1 for i in start_byte] [1:] + [None] def process_piece(filename,start,end): with open(filename,'r') as f: f.seek(start+1) if(end is None): text=f.read() else: nbytes=end-start+1 text=f.read(nbytes) # process text here. createing some object to be returned # You could wrap text into a StringIO object if you want to be able to # read from it the way you would a file. returnobj=text return returnobj def wrapper(args): return process_piece(*args) filename_repeated=[filename]*len(start_byte) args=zip(filename_repeated,start_byte,end_byte) pool=multiprocessing.Pool(4) result=pool.map(wrapper,args) #Now take your results and write them to the database. print "".join(result) #I just print it to make sure I get my file back ...
Der schwierige Teil hier ist, sicherzustellen, dass wir die Datei in Zeilenumbrüche aufteilen, damit Sie keine Zeilen verpassen (oder nur Teilzeilen lesen). Dann liest jeder Prozess seinen Teil der Datei und gibt ein Objekt zurück, das vom Hauptthread in die Datenbank gestellt werden kann. Natürlich müssen Sie diesen Teil möglicherweise sogar in Blöcken ausführen, damit Sie nicht alle Informationen auf einmal im Speicher behalten müssen. (Dies ist ganz einfach zu erreichen - teilen Sie einfach die "Args" -Liste in X Chunks und rufen Sie an
pool.map(wrapper,chunk)
- siehe hier )quelle
Teilen Sie die einzelne große Datei in mehrere kleinere Dateien auf und lassen Sie jede in separaten Threads verarbeiten.
quelle