Verarbeitung einer einzelnen Datei aus mehreren Prozessen

81

Ich habe eine einzelne große Textdatei, in der ich jede Zeile verarbeiten (einige Operationen ausführen) und sie in einer Datenbank speichern möchte. Da ein einzelnes einfaches Programm zu lange dauert, möchte ich, dass es über mehrere Prozesse oder Threads ausgeführt wird. Jeder Thread / Prozess sollte die VERSCHIEDENEN Daten (verschiedene Zeilen) aus dieser einzelnen Datei lesen und einige Operationen an ihren Daten (Zeilen) ausführen und sie in die Datenbank stellen, damit am Ende alle Daten verarbeitet werden und meine Die Datenbank wird mit den Daten gesichert, die ich benötige.

Aber ich bin nicht in der Lage herauszufinden, wie ich das angehen soll.

Pranavk
quelle
3
Gute Frage. Ich hatte auch diesen Zweifel. Obwohl ich mich für die Option entschieden habe, die Datei in kleinere Dateien
aufzuteilen

Antworten:

108

Was Sie suchen, ist ein Produzenten- / Konsumentenmuster

Grundlegendes Threading-Beispiel

Hier ist ein grundlegendes Beispiel für die Verwendung des Threading-Moduls (anstelle von Multiprocessing).

import threading
import Queue
import sys

def do_work(in_queue, out_queue):
    while True:
        item = in_queue.get()
        # process
        result = item
        out_queue.put(result)
        in_queue.task_done()

if __name__ == "__main__":
    work = Queue.Queue()
    results = Queue.Queue()
    total = 20

    # start for workers
    for i in xrange(4):
        t = threading.Thread(target=do_work, args=(work, results))
        t.daemon = True
        t.start()

    # produce data
    for i in xrange(total):
        work.put(i)

    work.join()

    # get the results
    for i in xrange(total):
        print results.get()

    sys.exit()

Sie würden das Dateiobjekt nicht mit den Threads teilen. Sie würden Arbeit für sie produzieren, indem Sie die Warteschlange mit Datenzeilen versorgen . Dann würde jeder Thread eine Zeile aufnehmen, verarbeiten und dann in die Warteschlange zurückgeben.

In das Multiprocessing-Modul sind einige erweiterte Funktionen integriert , mit denen Daten wie Listen und spezielle Arten von Warteschlangen gemeinsam genutzt werden können . Es gibt Kompromisse bei der Verwendung von Multiprocessing gegenüber Threads, und dies hängt davon ab, ob Ihre Arbeit an CPU oder E / A gebunden ist.

Grundlegendes Multiprocessing.Pool-Beispiel

Hier ist ein wirklich einfaches Beispiel für einen Multiprozessor-Pool

from multiprocessing import Pool

def process_line(line):
    return "FOO: %s" % line

if __name__ == "__main__":
    pool = Pool(4)
    with open('file.txt') as source_file:
        # chunk the work into batches of 4 lines at a time
        results = pool.map(process_line, source_file, 4)

    print results

Ein Pool ist ein Convenience-Objekt, das seine eigenen Prozesse verwaltet. Da eine geöffnete Datei über ihre Zeilen iterieren kann, können Sie sie an die übergeben pool.map(), die sie durchläuft und Zeilen an die Worker-Funktion übermittelt. Map blockiert und gibt das gesamte Ergebnis zurück, wenn es fertig ist. Beachten Sie, dass dies ein stark vereinfachtes Beispiel ist und dass die pool.map()gesamte Datei auf einmal in den Speicher eingelesen wird, bevor die Arbeit verteilt wird. Wenn Sie große Dateien erwarten, denken Sie daran. Es gibt fortgeschrittenere Möglichkeiten, ein Produzenten- / Konsumenten-Setup zu entwerfen.

Manueller "Pool" mit Limit und Zeilensortierung

Dies ist ein manuelles Beispiel für die Pool.map . Anstatt jedoch eine gesamte Iterable auf einmal zu verbrauchen, können Sie eine Warteschlangengröße festlegen, sodass Sie sie nur Stück für Stück so schnell wie möglich füttern. Ich habe auch die Zeilennummern hinzugefügt, damit Sie sie später verfolgen und auf sie verweisen können, wenn Sie möchten.

from multiprocessing import Process, Manager
import time
import itertools 

def do_work(in_queue, out_list):
    while True:
        item = in_queue.get()
        line_no, line = item

        # exit signal 
        if line == None:
            return

        # fake work
        time.sleep(.5)
        result = (line_no, line)

        out_list.append(result)


if __name__ == "__main__":
    num_workers = 4

    manager = Manager()
    results = manager.list()
    work = manager.Queue(num_workers)

    # start for workers    
    pool = []
    for i in xrange(num_workers):
        p = Process(target=do_work, args=(work, results))
        p.start()
        pool.append(p)

    # produce data
    with open("source.txt") as f:
        iters = itertools.chain(f, (None,)*num_workers)
        for num_and_line in enumerate(iters):
            work.put(num_and_line)

    for p in pool:
        p.join()

    # get the results
    # example:  [(1, "foo"), (10, "bar"), (0, "start")]
    print sorted(results)
jdi
quelle
1
Das ist gut, aber was ist, wenn die Verarbeitung E / A-gebunden ist? In diesem Fall kann Parallelität die Dinge verlangsamen, anstatt sie zu beschleunigen. Suchvorgänge innerhalb einer einzelnen Plattenspur sind viel schneller als Intertrack-Suchvorgänge, und paralleles Ausführen von E / A führt dazu, dass Intertrack-Suchvorgänge in einer ansonsten sequentiellen E / A-Last eingeführt werden. Um von parallelen E / A-Vorgängen zu profitieren, ist es manchmal sehr hilfreich, einen RAID-Spiegel zu verwenden.
user1277476
2
@ jwillis0720 - Sicher. (None,) * num_workersErstellt ein Tupel von None-Werten, das der Größe der Anzahl der Worker entspricht. Dies sind die Sentinel-Werte, die jedem Thread sagen, dass er beendet werden soll, da keine Arbeit mehr vorhanden ist. Mit dieser itertools.chainFunktion können Sie mehrere Sequenzen zu einer virtuellen Sequenz zusammenfügen, ohne etwas kopieren zu müssen. Wir erhalten also, dass zuerst die Zeilen in der Datei und dann die Werte None durchlaufen werden.
JDI
2
Das ist besser erklärt als mein Professor, sehr nett +1.
Lycuid
1
@ ℕʘʘḆḽḘ, ich habe meinen Text etwas bearbeitet, um klarer zu sein. Es wird nun erklärt, dass das mittlere Beispiel Ihre gesamten Dateidaten auf einmal in den Speicher schlürft. Dies kann ein Problem sein, wenn Ihre Datei größer ist als die derzeit verfügbare RAM-Menge. Dann zeige ich im 3. Beispiel, wie man Zeile für Zeile vorgeht, um nicht die gesamte Datei auf einmal zu verbrauchen.
JDI
1
@ ℕʘʘḆḽḘ Lesen Sie die Dokumente für pool.Map (). Es heißt, es werde das iterable in Stücke aufteilen und sie den Arbeitern vorlegen. So werden am Ende alle Zeilen in den Speicher verbraucht. Ja, das wiederholte Durchlaufen einer Zeile ist speichereffizient. Wenn Sie jedoch alle diese Zeilen im Speicher behalten, können Sie wieder die gesamte Datei lesen.
JDI
9

Hier ist ein wirklich dummes Beispiel, das ich mir ausgedacht habe:

import os.path
import multiprocessing

def newlinebefore(f,n):
    f.seek(n)
    c=f.read(1)
    while c!='\n' and n > 0:
        n-=1
        f.seek(n)
        c=f.read(1)

    f.seek(n)
    return n

filename='gpdata.dat'  #your filename goes here.
fsize=os.path.getsize(filename) #size of file (in bytes)

#break the file into 20 chunks for processing.
nchunks=20
initial_chunks=range(1,fsize,fsize/nchunks)

#You could also do something like:
#initial_chunks=range(1,fsize,max_chunk_size_in_bytes) #this should work too.


with open(filename,'r') as f:
    start_byte=sorted(set([newlinebefore(f,i) for i in initial_chunks]))

end_byte=[i-1 for i in start_byte] [1:] + [None]

def process_piece(filename,start,end):
    with open(filename,'r') as f:
        f.seek(start+1)
        if(end is None):
            text=f.read()
        else: 
            nbytes=end-start+1
            text=f.read(nbytes)

    # process text here. createing some object to be returned
    # You could wrap text into a StringIO object if you want to be able to
    # read from it the way you would a file.

    returnobj=text
    return returnobj

def wrapper(args):
    return process_piece(*args)

filename_repeated=[filename]*len(start_byte)
args=zip(filename_repeated,start_byte,end_byte)

pool=multiprocessing.Pool(4)
result=pool.map(wrapper,args)

#Now take your results and write them to the database.
print "".join(result)  #I just print it to make sure I get my file back ...

Der schwierige Teil hier ist, sicherzustellen, dass wir die Datei in Zeilenumbrüche aufteilen, damit Sie keine Zeilen verpassen (oder nur Teilzeilen lesen). Dann liest jeder Prozess seinen Teil der Datei und gibt ein Objekt zurück, das vom Hauptthread in die Datenbank gestellt werden kann. Natürlich müssen Sie diesen Teil möglicherweise sogar in Blöcken ausführen, damit Sie nicht alle Informationen auf einmal im Speicher behalten müssen. (Dies ist ganz einfach zu erreichen - teilen Sie einfach die "Args" -Liste in X Chunks und rufen Sie an pool.map(wrapper,chunk) - siehe hier )

mgilson
quelle
-5

Teilen Sie die einzelne große Datei in mehrere kleinere Dateien auf und lassen Sie jede in separaten Threads verarbeiten.

Tanu
quelle
das ist nicht das OP will !! aber nur für eine Idee ... nicht schlecht.
DRPK