"Feuer und vergiss" Python async / warten

114

Manchmal muss ein unkritischer asynchroner Vorgang ausgeführt werden, aber ich möchte nicht warten, bis er abgeschlossen ist. In der Coroutine-Implementierung von Tornado können Sie eine asynchrone Funktion "auslösen und vergessen", indem Sie einfach das yieldSchlüsselwort weglassen .

Ich habe versucht , zu , um herauszufinden , wie „Feuer & Forget“ mit der neuen async/ awaitSyntax in Python 3.5 freigegeben. ZB ein vereinfachtes Code-Snippet:

async def async_foo():
    print("Do some stuff asynchronously here...")

def bar():
    async_foo()  # fire and forget "async_foo()"

bar()

Was jedoch passiert, ist, dass es bar()niemals ausgeführt wird und stattdessen eine Laufzeitwarnung angezeigt wird:

RuntimeWarning: coroutine 'async_foo' was never awaited
  async_foo()  # fire and forget "async_foo()"
Mike N.
quelle
Verbunden? stackoverflow.com/q/32808893/1639625 Tatsächlich denke ich, dass es ein Duplikat ist, aber ich möchte es nicht sofort betrügen. Kann jemand bestätigen?
tobias_k
3
@tobias_k, ich glaube nicht, dass es doppelt ist. Die Antwort unter dem Link ist zu weit gefasst, um auf diese Frage beantwortet zu werden.
Mikhail Gerasimov
2
Läuft (1) Ihr "Haupt" -Prozess für immer weiter? Oder (2) möchten Sie zulassen, dass Ihr Prozess stirbt, aber dass vergessene Aufgaben ihre Arbeit fortsetzen? Oder (3) bevorzugen Sie es, dass Ihr Hauptprozess kurz vor dem Ende auf vergessene Aufgaben wartet?
Julien Palard

Antworten:

168

Aktualisieren:

Ersetzen Sie asyncio.ensure_futuredurch asyncio.create_tasküberall, wenn Sie Python verwenden> = 3.7 Es ist eine neuere, schönere Möglichkeit , Aufgaben zu erzeugen .


asyncio.Task zu "feuern und vergessen"

Laut Python-Dokumenten asyncio.Taskist es möglich, eine Coroutine zu starten, um "im Hintergrund" auszuführen . Die von der asyncio.ensure_future Funktion erstellte Aufgabe blockiert die Ausführung nicht (daher kehrt die Funktion sofort zurück!). Dies scheint eine Möglichkeit zu sein, wie gewünscht zu „feuern und zu vergessen“.

import asyncio


async def async_foo():
    print("async_foo started")
    await asyncio.sleep(1)
    print("async_foo done")


async def main():
    asyncio.ensure_future(async_foo())  # fire and forget async_foo()

    # btw, you can also create tasks inside non-async funcs

    print('Do some actions 1')
    await asyncio.sleep(1)
    print('Do some actions 2')
    await asyncio.sleep(1)
    print('Do some actions 3')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Ausgabe:

Do some actions 1
async_foo started
Do some actions 2
async_foo done
Do some actions 3

Was ist, wenn Aufgaben nach Abschluss der Ereignisschleife ausgeführt werden?

Beachten Sie, dass Asyncio erwartet, dass die Aufgabe zum Zeitpunkt des Abschlusses der Ereignisschleife abgeschlossen ist. Wenn Sie also wechseln main()zu:

async def main():
    asyncio.ensure_future(async_foo())  # fire and forget

    print('Do some actions 1')
    await asyncio.sleep(0.1)
    print('Do some actions 2')

Sie erhalten diese Warnung, nachdem das Programm beendet wurde:

Task was destroyed but it is pending!
task: <Task pending coro=<async_foo() running at [...]

Um zu verhindern, dass Sie nach Abschluss der Ereignisschleife einfach auf alle ausstehenden Aufgaben warten können :

async def main():
    asyncio.ensure_future(async_foo())  # fire and forget

    print('Do some actions 1')
    await asyncio.sleep(0.1)
    print('Do some actions 2')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    # Let's also finish all running tasks:
    pending = asyncio.Task.all_tasks()
    loop.run_until_complete(asyncio.gather(*pending))

Töte Aufgaben, anstatt sie abzuwarten

Manchmal möchten Sie nicht darauf warten, dass Aufgaben erledigt werden (einige Aufgaben können beispielsweise so erstellt werden, dass sie für immer ausgeführt werden). In diesem Fall können Sie sie einfach abbrechen (), anstatt sie abzuwarten:

import asyncio
from contextlib import suppress


async def echo_forever():
    while True:
        print("echo")
        await asyncio.sleep(1)


async def main():
    asyncio.ensure_future(echo_forever())  # fire and forget

    print('Do some actions 1')
    await asyncio.sleep(1)
    print('Do some actions 2')
    await asyncio.sleep(1)
    print('Do some actions 3')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    # Let's also cancel all running tasks:
    pending = asyncio.Task.all_tasks()
    for task in pending:
        task.cancel()
        # Now we should await task to execute it's cancellation.
        # Cancelled task raises asyncio.CancelledError that we can suppress:
        with suppress(asyncio.CancelledError):
            loop.run_until_complete(task)

Ausgabe:

Do some actions 1
echo
Do some actions 2
echo
Do some actions 3
echo
Mikhail Gerasimov
quelle
Ich habe den ersten Block kopiert und hinter mir gelassen und ihn einfach an meinem Ende ausgeführt. Aus irgendeinem Grund habe ich Folgendes erhalten: Zeile 4 async def async_foo (): ^ Als ob es einen Syntaxfehler mit der Funktionsdefinition in Zeile 4 gibt: "async def async_foo ( ): "Vermisse ich etwas?
Gil Allen
3
@ GilAllen Diese Syntax funktioniert nur in Python 3.5+. Python 3.4 benötigt eine alte Syntax (siehe docs.python.org/3.4/library/asyncio-task.html ). Python 3.3 und niedriger unterstützt Asyncio überhaupt nicht.
Mikhail Gerasimov
Wie würden Sie die Aufgaben in einem Thread beenden?… ̣Ich habe einen Thread, der einige Aufgaben erstellt, und ich möchte alle ausstehenden Aufgaben beenden, wenn der Thread in seiner stop()Methode stirbt .
Sardathrion - gegen SE Missbrauch
@Sardathrion Ich bin mir nicht sicher, ob die Aufgabe irgendwo auf den Thread verweist, in dem sie erstellt wurde, aber nichts hindert Sie daran, sie manuell zu verfolgen: Fügen Sie beispielsweise einfach alle im Thread erstellten Aufgaben zu einer Liste hinzu und brechen Sie sie zu gegebener Zeit ab über.
Mikhail Gerasimov
2
Beachten Sie, dass "Task.all_tasks () seit Python 3.7 veraltet ist. Verwenden Sie stattdessen asyncio.all_tasks ()"
Alexis
13

Vielen Dank, Sergey, für die kurze Antwort. Hier ist die dekorierte Version davon.

import asyncio
import time

def fire_and_forget(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, *kwargs)

    return wrapped

@fire_and_forget
def foo():
    time.sleep(1)
    print("foo() completed")

print("Hello")
foo()
print("I didn't wait for foo()")

Produziert

>>> Hello
>>> foo() started
>>> I didn't wait for foo()
>>> foo() completed

Hinweis: Überprüfen Sie meine andere Antwort, die dasselbe tut, mit einfachen Threads.

nehem
quelle
Nachdem ich diesen Ansatz verwendet hatte, kam es zu einer erheblichen Verlangsamung, wodurch ~ 5 kleine Feuer-und-Vergessen-Aufgaben pro Sekunde erstellt wurden. Verwenden Sie dies nicht in der Produktion für eine langfristige Aufgabe. Es wird Ihre CPU und Speicher verbrauchen!
Pir
10

Dies ist keine vollständig asynchrone Ausführung, aber möglicherweise ist run_in_executor () für Sie geeignet.

def fire_and_forget(task, *args, **kwargs):
    loop = asyncio.get_event_loop()
    if callable(task):
        return loop.run_in_executor(None, task, *args, **kwargs)
    else:    
        raise TypeError('Task must be a callable')

def foo():
    #asynchronous stuff here


fire_and_forget(foo)
Sergey Gornostaev
quelle
3
Schöne prägnante Antwort. Es ist erwähnenswert, dass der executorWille standardmäßig anruft concurrent.futures.ThreadPoolExecutor.submit(). Ich erwähne, weil das Erstellen von Threads nicht kostenlos ist. 1000-mal pro Sekunde Feuer und Vergessen wird wahrscheinlich das Thread-Management stark belasten
Brad Solomon
Ja. Ich habe Ihre Warnung nicht beachtet und eine erhebliche Verlangsamung festgestellt, nachdem ich diesen Ansatz verwendet habe, um ~ 5 kleine Feuer-und-Vergessen-Aufgaben pro Sekunde zu erstellen. Verwenden Sie dies nicht in der Produktion für eine langfristige Aufgabe. Es wird Ihre CPU und Speicher verbrauchen!
Pir
3

Wenn Sie aus irgendeinem Grund nicht verwenden können, finden Sie asynciohier die Implementierung mit einfachen Threads. Überprüfen Sie meine anderen Antworten und auch die Antwort von Sergey.

import threading

def fire_and_forget(f):
    def wrapped():
        threading.Thread(target=f).start()

    return wrapped

@fire_and_forget
def foo():
    time.sleep(1)
    print("foo() completed")

print("Hello")
foo()
print("I didn't wait for foo()")
nehem
quelle
Wenn wir nur diese fire_and_forget-Funktionalität und nichts anderes von asyncio benötigen, wäre es immer noch besser, asyncio zu verwenden? Was sind die Vorteile?
Pir