Wann man sie benutzt
Wenn Sie mehr als zwei Punkte für die Kommunikation benötigen, verwenden Sie a Queue()
.
Wenn Sie absolute Leistung benötigen, Pipe()
ist a viel schneller, weil Queue()
es darauf aufgebaut ist Pipe()
.
Leistungsbenchmarking
Angenommen, Sie möchten zwei Prozesse erzeugen und so schnell wie möglich Nachrichten zwischen ihnen senden. Dies sind die Timing-Ergebnisse eines Drag Race zwischen ähnlichen Tests mit Pipe()
und Queue()
... Dies ist auf einem ThinkpadT61 mit Ubuntu 11.10 und Python 2.7.2.
Zu Ihrer Information, ich habe Ergebnisse JoinableQueue()
als Bonus eingeworfen; JoinableQueue()
berücksichtigt Aufgaben, wenn sie queue.task_done()
aufgerufen werden (sie wissen nicht einmal über die spezifische Aufgabe Bescheid, sie zählen nur nicht abgeschlossene Aufgaben in der Warteschlange), sodass sie queue.join()
wissen , dass die Arbeit beendet ist.
Der Code für jeden am Ende dieser Antwort ...
mpenning@mpenning-T61:~$ python multi_pipe.py
Sending 10000 numbers to Pipe() took 0.0369849205017 seconds
Sending 100000 numbers to Pipe() took 0.328398942947 seconds
Sending 1000000 numbers to Pipe() took 3.17266988754 seconds
mpenning@mpenning-T61:~$ python multi_queue.py
Sending 10000 numbers to Queue() took 0.105256080627 seconds
Sending 100000 numbers to Queue() took 0.980564117432 seconds
Sending 1000000 numbers to Queue() took 10.1611330509 seconds
mpnening@mpenning-T61:~$ python multi_joinablequeue.py
Sending 10000 numbers to JoinableQueue() took 0.172781944275 seconds
Sending 100000 numbers to JoinableQueue() took 1.5714070797 seconds
Sending 1000000 numbers to JoinableQueue() took 15.8527247906 seconds
mpenning@mpenning-T61:~$
Zusammenfassend Pipe()
ist etwa dreimal schneller als ein Queue()
. Denken Sie nicht einmal an das, es JoinableQueue()
sei denn, Sie müssen wirklich die Vorteile haben.
BONUSMATERIAL 2
Multiprocessing führt zu geringfügigen Änderungen im Informationsfluss, die das Debuggen erschweren, es sei denn, Sie kennen einige Verknüpfungen. Beispielsweise verfügen Sie möglicherweise über ein Skript, das bei der Indizierung durch ein Wörterbuch unter vielen Bedingungen einwandfrei funktioniert, bei bestimmten Eingaben jedoch selten fehlschlägt.
Normalerweise erhalten wir Hinweise auf den Fehler, wenn der gesamte Python-Prozess abstürzt. Es werden jedoch keine unerwünschten Absturzrückverfolgungen auf die Konsole gedruckt, wenn die Mehrfachverarbeitungsfunktion abstürzt. Unbekannte Multiprozessor-Abstürze aufzuspüren ist schwierig, ohne einen Hinweis darauf zu haben, was den Prozess zum Absturz gebracht hat.
Der einfachste Weg, Multiprozessor-Crash-Informationen aufzuspüren, besteht darin, die gesamte Multiprozessor-Funktion in ein try
/ zu packen und Folgendes zu except
verwenden traceback.print_exc()
:
import traceback
def run(self, args):
try:
# Insert stuff to be multiprocessed here
return args[0]['that']
except:
print "FATAL: reader({0}) exited while multiprocessing".format(args)
traceback.print_exc()
Wenn Sie jetzt einen Absturz finden, sehen Sie etwas wie:
FATAL: reader([{'crash': 'this'}]) exited while multiprocessing
Traceback (most recent call last):
File "foo.py", line 19, in __init__
self.run(args)
File "foo.py", line 46, in run
KeyError: 'that'
Quellcode:
"""
multi_pipe.py
"""
from multiprocessing import Process, Pipe
import time
def reader_proc(pipe):
## Read from the pipe; this will be spawned as a separate Process
p_output, p_input = pipe
p_input.close() # We are only reading
while True:
msg = p_output.recv() # Read from the output pipe and do nothing
if msg=='DONE':
break
def writer(count, p_input):
for ii in xrange(0, count):
p_input.send(ii) # Write 'count' numbers into the input pipe
p_input.send('DONE')
if __name__=='__main__':
for count in [10**4, 10**5, 10**6]:
# Pipes are unidirectional with two endpoints: p_input ------> p_output
p_output, p_input = Pipe() # writer() writes to p_input from _this_ process
reader_p = Process(target=reader_proc, args=((p_output, p_input),))
reader_p.daemon = True
reader_p.start() # Launch the reader process
p_output.close() # We no longer need this part of the Pipe()
_start = time.time()
writer(count, p_input) # Send a lot of stuff to reader_proc()
p_input.close()
reader_p.join()
print("Sending {0} numbers to Pipe() took {1} seconds".format(count,
(time.time() - _start)))
"""
multi_queue.py
"""
from multiprocessing import Process, Queue
import time
import sys
def reader_proc(queue):
## Read from the queue; this will be spawned as a separate Process
while True:
msg = queue.get() # Read from the queue and do nothing
if (msg == 'DONE'):
break
def writer(count, queue):
## Write to the queue
for ii in range(0, count):
queue.put(ii) # Write 'count' numbers into the queue
queue.put('DONE')
if __name__=='__main__':
pqueue = Queue() # writer() writes to pqueue from _this_ process
for count in [10**4, 10**5, 10**6]:
### reader_proc() reads from pqueue as a separate process
reader_p = Process(target=reader_proc, args=((pqueue),))
reader_p.daemon = True
reader_p.start() # Launch reader_proc() as a separate python process
_start = time.time()
writer(count, pqueue) # Send a lot of stuff to reader()
reader_p.join() # Wait for the reader to finish
print("Sending {0} numbers to Queue() took {1} seconds".format(count,
(time.time() - _start)))
"""
multi_joinablequeue.py
"""
from multiprocessing import Process, JoinableQueue
import time
def reader_proc(queue):
## Read from the queue; this will be spawned as a separate Process
while True:
msg = queue.get() # Read from the queue and do nothing
queue.task_done()
def writer(count, queue):
for ii in xrange(0, count):
queue.put(ii) # Write 'count' numbers into the queue
if __name__=='__main__':
for count in [10**4, 10**5, 10**6]:
jqueue = JoinableQueue() # writer() writes to jqueue from _this_ process
# reader_proc() reads from jqueue as a different process...
reader_p = Process(target=reader_proc, args=((jqueue),))
reader_p.daemon = True
reader_p.start() # Launch the reader process
_start = time.time()
writer(count, jqueue) # Send a lot of stuff to reader_proc() (in different process)
jqueue.join() # Wait for the reader to finish
print("Sending {0} numbers to JoinableQueue() took {1} seconds".format(count,
(time.time() - _start)))
Ein weiteres
Queue()
bemerkenswertes Merkmal ist der Feeder-Thread. In diesem Abschnitt wird Folgendes angegeben: "Wenn ein Prozess zum ersten Mal ein Element in die Warteschlange stellt, wird ein Feeder-Thread gestartet, der Objekte aus einem Puffer in die Pipe überträgt." Es können unendlich viele (oder maximale) Elemente eingefügt werden,Queue()
ohne dass Aufrufe zumqueue.put()
Blockieren erforderlich sind . Auf diese Weise können Sie mehrere Elemente in einem speichernQueue()
, bis Ihr Programm bereit ist, sie zu verarbeiten.Pipe()
Auf der anderen Seite ist eine begrenzte Menge an Speicherplatz für Elemente vorhanden, die an eine Verbindung gesendet, aber nicht von der anderen Verbindung empfangen wurden. Nachdem dieser Speicher aufgebraucht ist, werden Aufrufe vonconnection.send()
blockiert, bis Platz zum Schreiben des gesamten Elements vorhanden ist. Dadurch wird der Thread, der das Schreiben ausführt, blockiert, bis ein anderer Thread aus der Pipe liest.Connection
Objekte geben Ihnen Zugriff auf den zugrunde liegenden Dateideskriptor. Auf * nix-Systemen können Sieconnection.send()
mithilfe deros.set_blocking()
Funktion verhindern, dass Anrufe blockiert werden . Dies führt jedoch zu Problemen, wenn Sie versuchen, ein einzelnes Element zu senden, das nicht in die Pipe-Datei passt. In neueren Linux-Versionen können Sie die Größe einer Datei erhöhen. Die maximal zulässige Größe hängt jedoch von den Systemkonfigurationen ab. Sie sollten sich daher niemals darauf verlassenPipe()
, Daten zu puffern. Anrufe anconnection.send
könnte blockieren, bis Daten aus der Pipe gelesen werden.Zusammenfassend ist Queue eine bessere Wahl als Pipe, wenn Sie Daten puffern müssen. Auch wenn Sie nur zwischen zwei Punkten kommunizieren müssen.
quelle