Asynchroner Methodenaufruf in Python?

177

Ich habe mich gefragt, ob es in Python eine Bibliothek für asynchrone Methodenaufrufe gibt . Es wäre großartig, wenn Sie so etwas tun könnten

@async
def longComputation():
    <code>


token = longComputation()
token.registerCallback(callback_function)
# alternative, polling
while not token.finished():
    doSomethingElse()
    if token.finished():
        result = token.result()

Oder um eine nicht asynchrone Routine asynchron aufzurufen

def longComputation()
    <code>

token = asynccall(longComputation())

Es wäre großartig, eine verfeinerte Strategie als Muttersprache im Sprachkern zu haben. Wurde dies berücksichtigt?

Stefano Borini
quelle
Ab Python 3.4: docs.python.org/3/library/asyncio.html (es gibt einen Backport für 3.3 und eine glänzende neue asyncund awaitSyntax von 3.5).
Jonrsharpe
Es gibt keinen Rückrufmechanismus, aber Sie können die Ergebnisse in einem Wörterbuch zusammenfassen und er basiert auf dem Multiprozessor-Modul von Python. Ich bin sicher, Sie können der dekorierten Funktion einen weiteren Parameter als Rückruf hinzufügen. github.com/alex-sherman/deco .
RajaRaviVarma
Um loszulegen. Offizielle Dokumentation - docs.python.org/3/library/concurrency.html
Adarsh ​​Madrecha

Antworten:

141

Sie können das in Python 2.6 hinzugefügte Multiprocessing-Modul verwenden . Sie können Prozesspools verwenden und dann asynchron Ergebnisse erhalten mit:

apply_async(func[, args[, kwds[, callback]]])

Z.B:

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=1)              # Start a worker processes.
    result = pool.apply_async(f, [10], callback) # Evaluate "f(10)" asynchronously calling callback when finished.

Dies ist nur eine Alternative. Dieses Modul bietet viele Möglichkeiten, um das zu erreichen, was Sie wollen. Es wird auch sehr einfach sein, daraus einen Dekorateur zu machen.

Lucas S.
quelle
5
Lucas S., dein Beispiel funktioniert leider nicht. Die Rückruffunktion wird nie aufgerufen.
DataGreed
6
Es ist wahrscheinlich zu bedenken, dass dies separate Prozesse erzeugt, anstatt einen separaten Thread innerhalb eines Prozesses. Dies könnte einige Implikationen haben.
user47741
11
Dies funktioniert: result = pool.apply_async (f, [10], callback = finish)
MJ
6
Um wirklich etwas asynchrones in Python zu tun, muss das Multiprozessor-Modul verwendet werden, um neue Prozesse zu erzeugen. Das bloße Erstellen neuer Threads ist immer noch der globalen Interpreter-Sperre ausgeliefert, die verhindert, dass ein Python-Prozess mehrere Dinge gleichzeitig ausführt.
Drahkar
2
Falls Sie mit dieser Lösung keinen neuen Prozess erzeugen möchten, ändern Sie den Import in from multiprocessing.dummy import Pool. multiprocessing.dummy hat genau das gleiche Verhalten über Threads anstelle von Prozessen implementiert
Almog Cohen
202

Etwas wie:

import threading

thr = threading.Thread(target=foo, args=(), kwargs={})
thr.start() # Will run "foo"
....
thr.is_alive() # Will return whether foo is running currently
....
thr.join() # Will wait till "foo" is done

Weitere Informationen finden Sie in der Dokumentation unter https://docs.python.org/library/threading.html .

Drakosha
quelle
1
Ja, wenn Sie nur asynchron arbeiten müssen, warum nicht einfach Thread verwenden? Immerhin ist der Faden leichter als der Prozess
kk1957
22
Wichtiger Hinweis: Die Standardimplementierung (CPython) von Threads hilft aufgrund der "Global Interpreter Lock" nicht bei rechnergebundenen Aufgaben. Siehe die Bibliothek doc: link
löslicher Fisch
3
Ist die Verwendung von thread.join () wirklich asynchron? Was ist, wenn Sie einen Thread (z. B. einen UI-Thread) nicht blockieren und nicht eine Menge Ressourcen verwenden möchten, die eine while-Schleife ausführen?
Mgamerz
1
@Mgamerz Join ist synchron. Sie können den Thread die Ergebnisse der Ausführung in eine Warteschlange stellen lassen oder / und einen Rückruf aufrufen. Ansonsten wissen Sie nicht, wann es fertig ist (wenn überhaupt).
Drakosha
1
Ist es möglich, eine Rückruffunktion am Ende der Thread-Ausführung
aufzurufen,
49

Ab Python 3.5 können Sie erweiterte Generatoren für asynchrone Funktionen verwenden.

import asyncio
import datetime

Erweiterte Generatorsyntax:

@asyncio.coroutine
def display_date(loop):
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        yield from asyncio.sleep(1)


loop = asyncio.get_event_loop()
# Blocking call which returns when the display_date() coroutine is done
loop.run_until_complete(display_date(loop))
loop.close()

Neue async/awaitSyntax:

async def display_date(loop):
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)


loop = asyncio.get_event_loop()
# Blocking call which returns when the display_date() coroutine is done
loop.run_until_complete(display_date(loop))
loop.close()
camabeh
quelle
7
@carnabeh, könnten Sie dieses Beispiel um die Funktion "def longComputation ()" des OP erweitern? Die meisten Beispiele verwenden "warte auf asyncio.sleep (1)", aber wenn longComputation () beispielsweise ein Double zurückgibt, können Sie nicht einfach "warte auf longComputation ()" verwenden.
Fab
Zehn Jahre in der Zukunft und dies sollte jetzt die akzeptierte Antwort sein. Wenn Sie in Python3.5 + über Async sprechen, sollten Sie an Asyncio und Async-Schlüsselwörter denken.
zeh
31

Es ist nicht im Sprachkern, aber eine sehr ausgereifte Bibliothek, die macht, was Sie wollen, ist Twisted . Es wird das verzögerte Objekt eingeführt, an das Sie Rückrufe oder Fehlerbehandlungsroutinen ("Errbacks") anhängen können. Ein Zurückgestelltes ist im Grunde ein "Versprechen", dass eine Funktion irgendwann ein Ergebnis haben wird.

Meredith L. Patterson
quelle
1
Schauen Sie sich insbesondere twisted.internet.defer ( twistedmatrix.com/documents/8.2.0/api/… ) an.
Nicholas Riley
21

Sie können einen Dekorator implementieren, um Ihre Funktionen asynchron zu machen, obwohl dies etwas schwierig ist. Das multiprocessingModul ist voll von kleinen Macken und scheinbar willkürlichen Einschränkungen - umso mehr Grund, es hinter einer benutzerfreundlichen Oberfläche zu kapseln.

from inspect import getmodule
from multiprocessing import Pool


def async(decorated):
    r'''Wraps a top-level function around an asynchronous dispatcher.

        when the decorated function is called, a task is submitted to a
        process pool, and a future object is returned, providing access to an
        eventual return value.

        The future object has a blocking get() method to access the task
        result: it will return immediately if the job is already done, or block
        until it completes.

        This decorator won't work on methods, due to limitations in Python's
        pickling machinery (in principle methods could be made pickleable, but
        good luck on that).
    '''
    # Keeps the original function visible from the module global namespace,
    # under a name consistent to its __name__ attribute. This is necessary for
    # the multiprocessing pickling machinery to work properly.
    module = getmodule(decorated)
    decorated.__name__ += '_original'
    setattr(module, decorated.__name__, decorated)

    def send(*args, **opts):
        return async.pool.apply_async(decorated, args, opts)

    return send

Der folgende Code veranschaulicht die Verwendung des Dekorateurs:

@async
def printsum(uid, values):
    summed = 0
    for value in values:
        summed += value

    print("Worker %i: sum value is %i" % (uid, summed))

    return (uid, summed)


if __name__ == '__main__':
    from random import sample

    # The process pool must be created inside __main__.
    async.pool = Pool(4)

    p = range(0, 1000)
    results = []
    for i in range(4):
        result = printsum(i, sample(p, 100))
        results.append(result)

    for result in results:
        print("Worker %i: sum value is %i" % result.get())

In einem realen Fall würde ich etwas mehr auf den Dekorator eingehen und ihm eine Möglichkeit bieten, ihn für das Debuggen auszuschalten (während die zukünftige Benutzeroberfläche beibehalten wird), oder vielleicht eine Möglichkeit, mit Ausnahmen umzugehen. aber ich denke, das zeigt das Prinzip gut genug.

xperroni
quelle
Dies sollte die beste Antwort sein. Ich liebe es, wie es Wert zurückgeben kann. Nicht wie der Thread, der nur asynchron läuft.
Aminah Nuraini
16

Gerade

import threading, time

def f():
    print "f started"
    time.sleep(3)
    print "f finished"

threading.Thread(target=f).start()
Antigluk
quelle
8

Sie könnten Eventlet verwenden. Sie können damit scheinbar synchronen Code schreiben, ihn jedoch asynchron über das Netzwerk arbeiten lassen.

Hier ist ein Beispiel für einen Super-Minimal-Crawler:

urls = ["http://www.google.com/intl/en_ALL/images/logo.gif",
     "https://wiki.secondlife.com/w/images/secondlife.jpg",
     "http://us.i1.yimg.com/us.yimg.com/i/ww/beta/y3.gif"]

import eventlet
from eventlet.green import urllib2

def fetch(url):

  return urllib2.urlopen(url).read()

pool = eventlet.GreenPool()

for body in pool.imap(fetch, urls):
  print "got body", len(body)
Raj
quelle
7

Meine Lösung ist:

import threading

class TimeoutError(RuntimeError):
    pass

class AsyncCall(object):
    def __init__(self, fnc, callback = None):
        self.Callable = fnc
        self.Callback = callback

    def __call__(self, *args, **kwargs):
        self.Thread = threading.Thread(target = self.run, name = self.Callable.__name__, args = args, kwargs = kwargs)
        self.Thread.start()
        return self

    def wait(self, timeout = None):
        self.Thread.join(timeout)
        if self.Thread.isAlive():
            raise TimeoutError()
        else:
            return self.Result

    def run(self, *args, **kwargs):
        self.Result = self.Callable(*args, **kwargs)
        if self.Callback:
            self.Callback(self.Result)

class AsyncMethod(object):
    def __init__(self, fnc, callback=None):
        self.Callable = fnc
        self.Callback = callback

    def __call__(self, *args, **kwargs):
        return AsyncCall(self.Callable, self.Callback)(*args, **kwargs)

def Async(fnc = None, callback = None):
    if fnc == None:
        def AddAsyncCallback(fnc):
            return AsyncMethod(fnc, callback)
        return AddAsyncCallback
    else:
        return AsyncMethod(fnc, callback)

Und funktioniert genau wie gewünscht:

@Async
def fnc():
    pass
Nevyn
quelle
5

So etwas funktioniert bei mir, Sie können dann die Funktion aufrufen und sie wird sich auf einen neuen Thread versenden.

from thread import start_new_thread

def dowork(asynchronous=True):
    if asynchronous:
        args = (False)
        start_new_thread(dowork,args) #Call itself on a new thread.
    else:
        while True:
            #do something...
            time.sleep(60) #sleep for a minute
    return
Nicholas Hamilton
quelle
2

Gibt es einen Grund, keine Threads zu verwenden? Sie können die threadingKlasse verwenden. finished()Verwenden Sie anstelle der Funktion die isAlive(). Die result()Funktion könnte join()den Thread und das Ergebnis abrufen. Und, wenn Sie können, überschreiben die run()und __init__Funktionen die Funktion im Konstruktor angegeben aufrufen und den Wert irgendwo auf die Instanz der Klasse speichern.

ondra
quelle
2
Wenn es sich um ein rechenintensives Funktions-Threading handelt, erhalten Sie nichts (es wird wahrscheinlich die Dinge tatsächlich langsamer machen), da ein Python-Prozess aufgrund der GIL auf einen CPU-Kern beschränkt ist.
Kurt
2
@Kurt, obwohl das stimmt, hat das OP nicht erwähnt, dass die Leistung sein Anliegen ist. Es gibt andere Gründe für asynchrones Verhalten ...
Peter Hansen
Threads in Python sind nicht besonders gut, wenn Sie die Möglichkeit haben möchten, den asynchronen Methodenaufruf zu beenden, da nur der Hauptthread in Python Signale empfängt.
CivFan
2

Sie können concurrent.futures verwenden (hinzugefügt in Python 3.2).

import time
from concurrent.futures import ThreadPoolExecutor


def long_computation(duration):
    for x in range(0, duration):
        print(x)
        time.sleep(1)
    return duration * 2


print('Use polling')
with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(long_computation, 5)
    while not future.done():
        print('waiting...')
        time.sleep(0.5)

    print(future.result())

print('Use callback')
executor = ThreadPoolExecutor(max_workers=1)
future = executor.submit(long_computation, 5)
future.add_done_callback(lambda f: print(f.result()))

print('waiting for callback')

executor.shutdown(False)  # non-blocking

print('shutdown invoked')
Großer Kürbis
quelle
Dies ist eine sehr gute Antwort, da es die einzige hier ist, die die Möglichkeit eines Threadpools mit Rückrufen
bietet
Dies leidet leider auch unter der "Global Interpreter Lock". Siehe den Bibliotheksdokument: Link . Getestet mit Python 3.7
Alex
0

Sie können process verwenden. Wenn Sie es für immer ausführen möchten, verwenden Sie während (wie beim Netzwerk) in Ihrer Funktion:

from multiprocessing import Process
def foo():
    while 1:
        # Do something

p = Process(target = foo)
p.start()

Wenn Sie es nur einmal ausführen möchten, gehen Sie folgendermaßen vor:

from multiprocessing import Process
def foo():
    # Do something

p = Process(target = foo)
p.start()
p.join()
Keivan
quelle