Multiprocessing verwenden. Prozess mit maximaler Anzahl gleichzeitiger Prozesse

76

Ich habe den PythonCode:

from multiprocessing import Process

def f(name):
    print 'hello', name

if __name__ == '__main__':
    for i in range(0, MAX_PROCESSES):
        p = Process(target=f, args=(i,))
        p.start()

das läuft gut. Ist MAX_PROCESSESjedoch variabel und kann ein beliebiger Wert zwischen 1und sein 512. Da ich diesen Code nur auf einem Computer mit 8Kernen ausführe, muss ich herausfinden, ob es möglich ist, die Anzahl der Prozesse zu begrenzen, die gleichzeitig ausgeführt werden dürfen. Ich habe nachgesehen multiprocessing.Queue, aber es sieht nicht so aus, wie ich es brauche - oder ich interpretiere die Dokumente falsch.

Gibt es eine Möglichkeit, die Anzahl der gleichzeitig ausgeführten multiprocessing.Processs zu begrenzen ?

Brett
quelle
für i im Bereich (0, min (MAX_PROCESSES, 8)):
Jacob
@Jacob Ich möchte trotzdem, dass alle MAX_PROCESSES ausgeführt werden. Der obige Code ist der Einfachheit halber abgeschnitten, aber die Hauptfunktion wird bis zu 512 Mal aufgerufen (daher die Schleife). Ich frage mich also, ob es eine Möglichkeit gibt, Prozesse in die Warteschlange zu stellen.
Brett
2
Sie möchten also ein Master / Worker-Setup und die Anzahl der Worker begrenzen?
Jacob
@ Jacob Ja, das könnte eine bessere Art sein, es zu formulieren.
Brett

Antworten:

103

Es ist möglicherweise am sinnvollsten, multiprocessing.Pooleinen Pool von Arbeitsprozessen zu verwenden, der auf der maximalen Anzahl der auf Ihrem System verfügbaren Kerne basiert, und dann im Grunde genommen Aufgaben einzugeben, sobald die Kerne verfügbar werden.

Das Beispiel aus den Standarddokumenten ( http://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers ) zeigt, dass Sie die Anzahl der Kerne auch manuell festlegen können:

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=4)              # start 4 worker processes
    result = pool.apply_async(f, [10])    # evaluate "f(10)" asynchronously
    print result.get(timeout=1)           # prints "100" unless your computer is *very* slow
    print pool.map(f, range(10))          # prints "[0, 1, 4,..., 81]"

Und es ist auch praktisch zu wissen, dass es die multiprocessing.cpu_count()Methode gibt, die Anzahl der Kerne auf einem bestimmten System zu zählen, falls dies in Ihrem Code erforderlich ist.

Bearbeiten: Hier ist ein Code-Entwurf, der für Ihren speziellen Fall zu funktionieren scheint:

import multiprocessing

def f(name):
    print 'hello', name

if __name__ == '__main__':
    pool = multiprocessing.Pool() #use all available cores, otherwise specify the number you want as an argument
    for i in xrange(0, 512):
        pool.apply_async(f, args=(i,))
    pool.close()
    pool.join()
Treddy
quelle
Okay, ich habe eine Version entworfen, die für Ihren speziellen Fall gut zu funktionieren scheint und dem obigen Beitrag hinzugefügt.
Treddy
49
multiprocessing.cpu_count()-1 or 1Dies kann eine nützliche Heuristik sein, um zu entscheiden, wie viele Prozesse parallel ausgeführt werden sollen: Mit -1 wird vermieden, dass das System durch Monopolisierung aller Kerne blockiert wird. Wenn jedoch nur eine CPU verfügbar ist, kann der orSingle-Core-Betrieb problemlos zurückgreifen.
Andybuckley
Was ist, wenn meine Funktion viel Arbeit und wenig Verarbeitung hat? Wird die Verwendung von 10 Threads auf einem 4-Kern-Computer das Programm in irgendeiner Weise beeinflussen?
Abhidemon
3
Beachten Sie, dass dies multiprocessing.cpu_count()nicht die Anzahl der Kerne ist, sondern die Anzahl der Threads (im Sinne von Hyperthreading).
Grismar
1
Ich konnte die Bearbeitungszeit für das Back-End bei nächtlich geplanten Aufgaben in meiner App von ~ 20 Minuten auf ~ 8 Minuten reduzieren, indem ich das oben beschriebene verwendete. Danke @treddy!
Fergus
10

Ich denke, Semaphor ist das, wonach Sie suchen. Es blockiert den Hauptprozess, nachdem es auf 0 heruntergezählt hat. Beispielcode:

from multiprocessing import Process
from multiprocessing import Semaphore
import time

def f(name, sema):
    print('process {} starting doing business'.format(name))
    # simulate a time-consuming task by sleeping
    time.sleep(5)
    # `release` will add 1 to `sema`, allowing other 
    # processes blocked on it to continue
    sema.release()

if __name__ == '__main__':
    concurrency = 20
    total_task_num = 1000
    sema = Semaphore(concurrency)
    all_processes = []
    for i in range(total_task_num):
        # once 20 processes are running, the following `acquire` call
        # will block the main process since `sema` has been reduced
        # to 0. This loop will continue only after one or more 
        # previously created processes complete.
        sema.acquire()
        p = Process(target=f, args=(i, sema))
        all_processes.append(p)
        p.start()

    # inside main process, wait for all processes to finish
    for p in all_processes:
        p.join()

Der folgende Code ist strukturierter, da er semain derselben Funktion erfasst und freigegeben wird . Es wird jedoch zu viel Ressourcen verbrauchen, wenn total_task_numes sehr groß ist:

from multiprocessing import Process
from multiprocessing import Semaphore
import time

def f(name, sema):
    print('process {} starting doing business'.format(name))
    # `sema` is acquired and released in the same
    # block of code here, making code more readable,
    # but may lead to problem.
    sema.acquire()
    time.sleep(5)
    sema.release()

if __name__ == '__main__':
    concurrency = 20
    total_task_num = 1000
    sema = Semaphore(concurrency)
    all_processes = []
    for i in range(total_task_num):
        p = Process(target=f, args=(i, sema))
        all_processes.append(p)
        # the following line won't block after 20 processes
        # have been created and running, instead it will carry 
        # on until all 1000 processes are created.
        p.start()

    # inside main process, wait for all processes to finish
    for p in all_processes:
        p.join()

Der obige Code erstellt total_task_numProzesse, aber nur concurrencyProzesse werden ausgeführt, während andere Prozesse blockiert sind, wodurch wertvolle Systemressourcen verbraucht werden.

makiko_fly
quelle
Das ist toll!
Behebt
Ich bin nicht sicher, ob dies etwas ist, was ich falsch mache, aber meine sema.release () tritt nie auf, wenn ich den ersten Codeblock mit der Freigabe in der Funktion f verwende, sondern im Haupt erhalte. Hat jemand jemals dieses Problem? Dummer Fehler?
user1983682
4

allgemeiner könnte dies auch so aussehen:

import multiprocessing
def chunks(l, n):
    for i in range(0, len(l), n):
        yield l[i:i + n]

numberOfThreads = 4


if __name__ == '__main__':
    jobs = []
    for i, param in enumerate(params):
        p = multiprocessing.Process(target=f, args=(i,param))
        jobs.append(p)
    for i in chunks(jobs,numberOfThreads):
        for j in i:
            j.start()
        for j in i:
            j.join()

Natürlich ist dieser Weg ziemlich grausam (da er auf jeden Prozess in einem Junk wartet, bis er mit dem nächsten Chunk fortfährt). Trotzdem funktioniert es gut für ungefähr gleiche Laufzeiten der Funktionsaufrufe.

Baedsch
quelle