Threading-Pool ähnlich dem Multiprocessing-Pool?

347

Gibt es eine Pool-Klasse für Worker- Threads , ähnlich der Pool-Klasse des Multiprocessing-Moduls ?

Ich mag zum Beispiel die einfache Möglichkeit, eine Kartenfunktion zu parallelisieren

def long_running_func(p):
    c_func_no_gil(p)

p = multiprocessing.Pool(4)
xs = p.map(long_running_func, range(100))

Ich möchte dies jedoch ohne den Aufwand für die Erstellung neuer Prozesse tun.

Ich weiß von der GIL. In meinem Anwendungsfall ist die Funktion jedoch eine E / A-gebundene C-Funktion, für die der Python-Wrapper die GIL vor dem eigentlichen Funktionsaufruf freigibt.

Muss ich meinen eigenen Threading-Pool schreiben?

Martin
quelle
Hier ist etwas, das im Python-Kochbuch vielversprechend aussieht: Rezept 576519: Thread-Pool mit derselben API wie (Multi) Processing.Pool (Python)
otherchirps
1
Heutzutage ist es eingebaut : from multiprocessing.pool import ThreadPool.
Martineau
Können Sie das näher erläutern I know about the GIL. However, in my usecase, the function will be an IO-bound C function for which the python wrapper will release the GIL before the actual function call.?
Mrgloom
@mrgloom stackoverflow.com/questions/1294382
Wächter der Dunkelheit

Antworten:

448

Ich habe gerade herausgefunden , dass es tatsächlich ist ein Thread-basierte Pool - Schnittstelle im multiprocessingModul, aber es ist etwas versteckt und nicht richtig dokumentiert.

Es kann über importiert werden

from multiprocessing.pool import ThreadPool

Es wird mithilfe einer Dummy-Prozessklasse implementiert, die einen Python-Thread umschließt. Diese threadbasierte Prozessklasse befindet sich multiprocessing.dummyin der Dokumentation, die in den Dokumenten kurz erwähnt wird . Dieses Dummy-Modul soll die gesamte Multiprozessor-Schnittstelle basierend auf Threads bereitstellen.

Martin
quelle
5
Das ist großartig. Ich hatte ein Problem beim Erstellen von ThreadPools außerhalb des Hauptthreads. Sie können sie jedoch aus einem untergeordneten Thread verwenden, der einmal erstellt wurde. Ich habe ein Problem dafür gemeldet
Olson
82
Ich verstehe nicht, warum diese Klasse keine Dokumentation hat. Solche Helferklassen sind heutzutage so wichtig.
Wernight
18
@Wernight: Es ist nicht in erster Linie öffentlich, weil niemand einen Patch angeboten hat, der es (oder ähnliches) als Threading bereitstellt. ThreadPool, einschließlich Dokumentation und Tests. Es wäre in der Tat eine gute Batterie, sie in die Standardbibliothek aufzunehmen, aber es wird nicht passieren, wenn niemand sie schreibt. Ein schöner Vorteil dieser vorhandenen Implementierung in Multiprocessing ist, dass es das Schreiben eines solchen Threading-Patches viel einfacher machen sollte ( docs.python.org/devguide )
ncoghlan
3
@ daniel.gindi: multiprocessing.dummy.Pool/ multiprocessing.pool.ThreadPoolsind dasselbe und beide Thread-Pools. Sie ahmen die Schnittstelle eines Prozesspools nach, sind jedoch vollständig in Bezug auf das Threading implementiert. Lesen Sie die Dokumente erneut, Sie haben sie rückwärts erhalten.
ShadowRanger
9
@ daniel.gindi: Lesen Sie weiter : " multiprocessing.dummyRepliziert die API von multiprocessing, ist aber nicht mehr als ein Wrapper um das threadingModul." multiprocessingIm Allgemeinen geht es um Prozesse, aber um das Wechseln zwischen Prozessen und Threads zu ermöglichen, haben sie (meistens) die multiprocessingAPI in multiprocessing.dummyThreads repliziert , jedoch nicht mit Prozessen. Ziel ist es, Ihnen zu import multiprocessing.dummy as multiprocessingermöglichen, prozessbasierten Code in threadbasierten Code zu ändern.
ShadowRanger
236

In Python 3 können Sie Folgendes verwenden concurrent.futures.ThreadPoolExecutor:

executor = ThreadPoolExecutor(max_workers=10)
a = executor.submit(my_function)

Weitere Informationen und Beispiele finden Sie in den Dokumenten .

Adrian Adamiak
quelle
6
um die zurückportiert Futures zu verwenden Modul laufensudo pip install futures
yair
Es ist der effizienteste und schnellste Weg für die Mehrfachverarbeitung
Haritsinh Gohil
2
Was ist der Unterschied zwischen ThreadPoolExecutorund multiprocessing.dummy.Pool?
Jay
2
aus concurrent.futures importieren ThreadPoolExecutor
stackOverlord
63

Ja, und es scheint (mehr oder weniger) dieselbe API zu haben.

import multiprocessing

def worker(lnk):
    ....    
def start_process():
    .....
....

if(PROCESS):
    pool = multiprocessing.Pool(processes=POOL_SIZE, initializer=start_process)
else:
    pool = multiprocessing.pool.ThreadPool(processes=POOL_SIZE, 
                                           initializer=start_process)

pool.map(worker, inputs)
....
Warfares
quelle
9
Der Importpfad für ThreadPoolunterscheidet sich von Pool. Richtiger Import ist from multiprocessing.pool import ThreadPool.
Ringelblume
2
Seltsamerweise ist dies keine dokumentierte API, und multiprocessing.pool wird nur kurz als Bereitstellung von AsyncResult erwähnt. Es ist jedoch in 2.x und 3.x verfügbar.
Marvin
2
Das habe ich gesucht. Es ist nur eine einzelne Importzeile und eine kleine Änderung an meiner vorhandenen Poollinie und es funktioniert perfekt.
Danegraphics
39

Für etwas sehr Einfaches und Leichtes (von hier leicht modifiziert ):

from Queue import Queue
from threading import Thread


class Worker(Thread):
    """Thread executing tasks from a given tasks queue"""
    def __init__(self, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True
        self.start()

    def run(self):
        while True:
            func, args, kargs = self.tasks.get()
            try:
                func(*args, **kargs)
            except Exception, e:
                print e
            finally:
                self.tasks.task_done()


class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads):
        self.tasks = Queue(num_threads)
        for _ in range(num_threads):
            Worker(self.tasks)

    def add_task(self, func, *args, **kargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kargs))

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()

if __name__ == '__main__':
    from random import randrange
    from time import sleep

    delays = [randrange(1, 10) for i in range(100)]

    def wait_delay(d):
        print 'sleeping for (%d)sec' % d
        sleep(d)

    pool = ThreadPool(20)

    for i, d in enumerate(delays):
        pool.add_task(wait_delay, d)

    pool.wait_completion()

Um Rückrufe nach Abschluss der Aufgabe zu unterstützen, können Sie den Rückruf einfach zum Aufgabentupel hinzufügen.

dgorissen
quelle
Wie können sich die Threads jemals verbinden, wenn sie bedingungslos eine Endlosschleife bilden?
Joseph Garvin
@JosephGarvin Ich habe es getestet und die Threads blockieren in einer leeren Warteschlange (da der Aufruf von Queue.get()blockiert), bis das Programm endet. Danach werden sie automatisch beendet.
Forumulator
@ JosephGarvin, gute Frage. Queue.join()wird tatsächlich in die Task-Warteschlange aufgenommen, nicht in Worker-Threads. Wenn die Warteschlange leer ist, wird wait_completionzurückgegeben, das Programm wird beendet und Threads werden vom Betriebssystem geerntet.
Randomir
Wenn der gesamte Code in eine ordentliche Funktion eingepackt ist, scheint er Threads nicht zu stoppen, selbst wenn die Warteschlange leer ist und pool.wait_completion()zurückkehrt. Das Ergebnis ist, dass Threads einfach weiter aufgebaut werden.
Ubiquibacon
17

Hallo, um den Thread-Pool in Python zu verwenden, können Sie diese Bibliothek verwenden:

from multiprocessing.dummy import Pool as ThreadPool

und dann für die Verwendung, diese Bibliothek wie folgt:

pool = ThreadPool(threads)
results = pool.map(service, tasks)
pool.close()
pool.join()
return results

Die Threads geben die Anzahl der gewünschten Threads an, und Aufgaben sind eine Liste der Aufgaben, die dem Dienst am häufigsten zugeordnet sind.

Manochehr Rasouli
quelle
Danke, das ist ein toller Vorschlag! Aus den Dokumenten: multiprocessing.dummy repliziert die API für Multiprocessing, ist jedoch nur ein Wrapper um das Threading-Modul. Eine Korrektur - ich denke, Sie möchten sagen, dass die Pool-API (Funktion, iterierbar) ist
Layser
2
Wir haben die .close()und .join()-Aufrufe verpasst und das führt .map()dazu , dass alle Threads beendet werden. Nur eine Warnung.
Anatoly Scherbakov
8

Hier ist das Ergebnis, das ich schließlich verwendet habe. Es ist eine modifizierte Version der Klassen von dgorissen oben.

Datei: threadpool.py

from queue import Queue, Empty
import threading
from threading import Thread


class Worker(Thread):
    _TIMEOUT = 2
    """ Thread executing tasks from a given tasks queue. Thread is signalable, 
        to exit
    """
    def __init__(self, tasks, th_num):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon, self.th_num = True, th_num
        self.done = threading.Event()
        self.start()

    def run(self):       
        while not self.done.is_set():
            try:
                func, args, kwargs = self.tasks.get(block=True,
                                                   timeout=self._TIMEOUT)
                try:
                    func(*args, **kwargs)
                except Exception as e:
                    print(e)
                finally:
                    self.tasks.task_done()
            except Empty as e:
                pass
        return

    def signal_exit(self):
        """ Signal to thread to exit """
        self.done.set()


class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads, tasks=[]):
        self.tasks = Queue(num_threads)
        self.workers = []
        self.done = False
        self._init_workers(num_threads)
        for task in tasks:
            self.tasks.put(task)

    def _init_workers(self, num_threads):
        for i in range(num_threads):
            self.workers.append(Worker(self.tasks, i))

    def add_task(self, func, *args, **kwargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kwargs))

    def _close_all_threads(self):
        """ Signal all threads to exit and lose the references to them """
        for workr in self.workers:
            workr.signal_exit()
        self.workers = []

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()

    def __del__(self):
        self._close_all_threads()


def create_task(func, *args, **kwargs):
    return (func, args, kwargs)

Den Pool benutzen

from random import randrange
from time import sleep

delays = [randrange(1, 10) for i in range(30)]

def wait_delay(d):
    print('sleeping for (%d)sec' % d)
    sleep(d)

pool = ThreadPool(20)
for i, d in enumerate(delays):
    pool.add_task(wait_delay, d)
pool.wait_completion()
Forumulator
quelle
Anmerkung für andere Leser: Dieser Code ist Python 3 (shebang #!/usr/bin/python3)
Daniel Marschall
Warum verwenden Sie for i, d in enumerate(delays):den iWert und ignorieren ihn dann ?
Martineau
@martineau - wahrscheinlich nur ein Relikt aus der Entwicklung, wo sie wahrscheinlich iwährend eines Laufs drucken wollten .
n1k31t4
Warum gibt create_taskes? Wofür ist das?
MrR
Ich kann es nicht glauben und antworte mit 4 Stimmen auf SO ist der Weg, ThreadPooling in Python zu machen. Der Threadpool in der offiziellen Python-Distribution ist noch kaputt? Was vermisse ich?
MrR
2

Der Aufwand für die Erstellung der neuen Prozesse ist minimal, insbesondere wenn es sich nur um vier handelt. Ich bezweifle, dass dies ein Leistungs-Hotspot Ihrer Anwendung ist. Halten Sie es einfach, optimieren Sie, wo Sie müssen und wohin die Profilerstellungsergebnisse zeigen.

unbeli
quelle
5
Wenn sich der Fragesteller unter Windows befindet (was er meiner Meinung nach nicht angegeben hat), kann das Hochfahren des Prozesses meiner Meinung nach einen erheblichen Aufwand bedeuten. Zumindest geht es um die Projekte, die ich kürzlich gemacht habe. :-)
Brandon Rhodes
1

Es ist kein Thread-basierter Pool integriert. Es kann jedoch sehr schnell sein, eine Produzenten- / Konsumentenwarteschlange mit der QueueKlasse zu implementieren .

Von: https://docs.python.org/2/library/queue.html

from threading import Thread
from Queue import Queue
def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

q = Queue()
for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()       # block until all tasks are done
Yann Ramin
quelle
3
Dies ist beim concurrent.futuresModul nicht mehr der Fall .
Thanatos
11
Ich glaube nicht, dass das mehr stimmt. from multiprocessing.pool import ThreadPool
Randall Hunt