Ich habe den Python
Code:
from multiprocessing import Process
def f(name):
print 'hello', name
if __name__ == '__main__':
for i in range(0, MAX_PROCESSES):
p = Process(target=f, args=(i,))
p.start()
das läuft gut. Ist MAX_PROCESSES
jedoch variabel und kann ein beliebiger Wert zwischen 1
und sein 512
. Da ich diesen Code nur auf einem Computer mit 8
Kernen ausführe, muss ich herausfinden, ob es möglich ist, die Anzahl der Prozesse zu begrenzen, die gleichzeitig ausgeführt werden dürfen. Ich habe nachgesehen multiprocessing.Queue
, aber es sieht nicht so aus, wie ich es brauche - oder ich interpretiere die Dokumente falsch.
Gibt es eine Möglichkeit, die Anzahl der gleichzeitig ausgeführten multiprocessing.Process
s zu begrenzen ?
Antworten:
Es ist möglicherweise am sinnvollsten,
multiprocessing.Pool
einen Pool von Arbeitsprozessen zu verwenden, der auf der maximalen Anzahl der auf Ihrem System verfügbaren Kerne basiert, und dann im Grunde genommen Aufgaben einzugeben, sobald die Kerne verfügbar werden.Das Beispiel aus den Standarddokumenten ( http://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers ) zeigt, dass Sie die Anzahl der Kerne auch manuell festlegen können:
from multiprocessing import Pool def f(x): return x*x if __name__ == '__main__': pool = Pool(processes=4) # start 4 worker processes result = pool.apply_async(f, [10]) # evaluate "f(10)" asynchronously print result.get(timeout=1) # prints "100" unless your computer is *very* slow print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]"
Und es ist auch praktisch zu wissen, dass es die
multiprocessing.cpu_count()
Methode gibt, die Anzahl der Kerne auf einem bestimmten System zu zählen, falls dies in Ihrem Code erforderlich ist.Bearbeiten: Hier ist ein Code-Entwurf, der für Ihren speziellen Fall zu funktionieren scheint:
import multiprocessing def f(name): print 'hello', name if __name__ == '__main__': pool = multiprocessing.Pool() #use all available cores, otherwise specify the number you want as an argument for i in xrange(0, 512): pool.apply_async(f, args=(i,)) pool.close() pool.join()
quelle
multiprocessing.cpu_count()-1 or 1
Dies kann eine nützliche Heuristik sein, um zu entscheiden, wie viele Prozesse parallel ausgeführt werden sollen: Mit -1 wird vermieden, dass das System durch Monopolisierung aller Kerne blockiert wird. Wenn jedoch nur eine CPU verfügbar ist, kann deror
Single-Core-Betrieb problemlos zurückgreifen.multiprocessing.cpu_count()
nicht die Anzahl der Kerne ist, sondern die Anzahl der Threads (im Sinne von Hyperthreading).Ich denke, Semaphor ist das, wonach Sie suchen. Es blockiert den Hauptprozess, nachdem es auf 0 heruntergezählt hat. Beispielcode:
from multiprocessing import Process from multiprocessing import Semaphore import time def f(name, sema): print('process {} starting doing business'.format(name)) # simulate a time-consuming task by sleeping time.sleep(5) # `release` will add 1 to `sema`, allowing other # processes blocked on it to continue sema.release() if __name__ == '__main__': concurrency = 20 total_task_num = 1000 sema = Semaphore(concurrency) all_processes = [] for i in range(total_task_num): # once 20 processes are running, the following `acquire` call # will block the main process since `sema` has been reduced # to 0. This loop will continue only after one or more # previously created processes complete. sema.acquire() p = Process(target=f, args=(i, sema)) all_processes.append(p) p.start() # inside main process, wait for all processes to finish for p in all_processes: p.join()
Der folgende Code ist strukturierter, da er
sema
in derselben Funktion erfasst und freigegeben wird . Es wird jedoch zu viel Ressourcen verbrauchen, wenntotal_task_num
es sehr groß ist:from multiprocessing import Process from multiprocessing import Semaphore import time def f(name, sema): print('process {} starting doing business'.format(name)) # `sema` is acquired and released in the same # block of code here, making code more readable, # but may lead to problem. sema.acquire() time.sleep(5) sema.release() if __name__ == '__main__': concurrency = 20 total_task_num = 1000 sema = Semaphore(concurrency) all_processes = [] for i in range(total_task_num): p = Process(target=f, args=(i, sema)) all_processes.append(p) # the following line won't block after 20 processes # have been created and running, instead it will carry # on until all 1000 processes are created. p.start() # inside main process, wait for all processes to finish for p in all_processes: p.join()
Der obige Code erstellt
total_task_num
Prozesse, aber nurconcurrency
Prozesse werden ausgeführt, während andere Prozesse blockiert sind, wodurch wertvolle Systemressourcen verbraucht werden.quelle
allgemeiner könnte dies auch so aussehen:
import multiprocessing def chunks(l, n): for i in range(0, len(l), n): yield l[i:i + n] numberOfThreads = 4 if __name__ == '__main__': jobs = [] for i, param in enumerate(params): p = multiprocessing.Process(target=f, args=(i,param)) jobs.append(p) for i in chunks(jobs,numberOfThreads): for j in i: j.start() for j in i: j.join()
Natürlich ist dieser Weg ziemlich grausam (da er auf jeden Prozess in einem Junk wartet, bis er mit dem nächsten Chunk fortfährt). Trotzdem funktioniert es gut für ungefähr gleiche Laufzeiten der Funktionsaufrufe.
quelle