Wie kann ich eine Funktion mit Asyncio regelmäßig ausführen?

69

Ich migriere von tornadonach asynciound kann das asyncioÄquivalent von tornado's nicht finden PeriodicCallback. (A PeriodicCallbackbenötigt zwei Argumente: die auszuführende Funktion und die Anzahl der Millisekunden zwischen den Aufrufen.)

  • Gibt es so ein Äquivalent in asyncio?
  • Wenn nicht, was wäre der sauberste Weg, dies zu implementieren, ohne das Risiko einzugehen, RecursionErrornach einer Weile eine zu bekommen?
2Cubed
quelle
Warum müssen Sie sich vom Tornado entfernen? Sie können zusammenarbeiten, nein? tornadoweb.org/en/stable/asyncio.html
OneCricketeer
Fügen await asyncio.sleep(time)Sie einfach Ihre Funktion hinzu.
Songololo
Gleiches gilt für Twisted, keine LoopingCallImplementierung.
Zgoda

Antworten:

60

Für Python-Versionen unter 3.5:

import asyncio

@asyncio.coroutine
def periodic():
    while True:
        print('periodic')
        yield from asyncio.sleep(1)

def stop():
    task.cancel()

loop = asyncio.get_event_loop()
loop.call_later(5, stop)
task = loop.create_task(periodic())

try:
    loop.run_until_complete(task)
except asyncio.CancelledError:
    pass

Für Python 3.5 und höher:

import asyncio

async def periodic():
    while True:
        print('periodic')
        await asyncio.sleep(1)

def stop():
    task.cancel()

loop = asyncio.get_event_loop()
loop.call_later(5, stop)
task = loop.create_task(periodic())

try:
    loop.run_until_complete(task)
except asyncio.CancelledError:
    pass
A. Jesse Jiryu Davis
quelle
5
Selbst in Tornado würde ich eine solche Schleife anstelle einer PeriodicCallbackfür Anwendungen empfehlen , die Coroutinen verwenden.
Ben Darnell
8
Nur eine kurze Anmerkung: Erstellen Sie keine TaskInstanzen direkt . Verwenden Sie die ensure_future()Funktion oder die AbstractEventLoop.create_task()Methode. Aus der Asyncio-Dokumentation .
Torkel Bjørnson-Langen
Anstelle der stopFunktion kann auch ein Lambda verwendet werden . Dh:loop.call_later(5, lambda: task.cancel())
Torkel Bjørnson-Langen
22
Oder Sie können es einfach so nennen loop.call_later(5, task.cancel).
Schreiben Sie
Nur ein Hinweis für Python 3.7: Aus dem Asyncio-Dokument sollten wir das High-Level verwenden asyncio.create_task(), um Tasks zu erstellen .
Mhchia
29

Wenn Sie der Meinung sind, dass etwas "im Hintergrund" Ihres Asyncio-Programms passieren sollte, ist dies asyncio.Taskmöglicherweise eine gute Möglichkeit. In diesem Beitrag erfahren Sie, wie Sie mit Aufgaben arbeiten.

Hier ist eine mögliche Implementierung einer Klasse, die einige Funktionen regelmäßig ausführt:

import asyncio
from contextlib import suppress


class Periodic:
    def __init__(self, func, time):
        self.func = func
        self.time = time
        self.is_started = False
        self._task = None

    async def start(self):
        if not self.is_started:
            self.is_started = True
            # Start task to call func periodically:
            self._task = asyncio.ensure_future(self._run())

    async def stop(self):
        if self.is_started:
            self.is_started = False
            # Stop task and await it stopped:
            self._task.cancel()
            with suppress(asyncio.CancelledError):
                await self._task

    async def _run(self):
        while True:
            await asyncio.sleep(self.time)
            self.func()

Testen wir es:

async def main():
    p = Periodic(lambda: print('test'), 1)
    try:
        print('Start')
        await p.start()
        await asyncio.sleep(3.1)

        print('Stop')
        await p.stop()
        await asyncio.sleep(3.1)

        print('Start')
        await p.start()
        await asyncio.sleep(3.1)
    finally:
        await p.stop()  # we should stop task finally


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Ausgabe:

Start
test
test
test

Stop

Start
test
test
test

[Finished in 9.5s]

Wie Sie sehen, startstarten wir einfach eine Aufgabe, die einige Funktionen aufruft und einige Zeit in einer Endlosschleife schläft. Auf stopbrechen wir diese Aufgabe einfach ab. Beachten Sie, dass diese Aufgabe zum Zeitpunkt des Programmabschlusses gestoppt werden sollte.

Eine weitere wichtige Sache ist, dass die Ausführung Ihres Rückrufs nicht lange dauern sollte (da sonst Ihre Ereignisschleife einfriert). Wenn Sie vorhaben, etwas Langfristiges aufzurufen func, müssen Sie es möglicherweise in Executor ausführen .

Mikhail Gerasimov
quelle
Die bisher vollständigste und klarste Antwort! Vielen Dank. Ist es eine gute Idee, zu verlangen func, dass das eine Coroutine ist, damit wir: await self.func()in der _runMethode?
Sergey Belash
1
@ SergeyBelash, sicher, es wird in Ordnung sein. Beachten Sie nur, dass Ihre Funktion auch zur zufälligen Zeit abgebrochen werden kann, da wir die Aufgabe zur zufälligen Zeit abbrechen. Dies bedeutet, dass jede Wartezeile in Ihrer Funktion möglicherweise CancelledError auslösen kann. Aber es ist für jede asynchrone Funktion überhaupt aktuell (genau wie KeyboardInterrupt in normalem nicht-asynchronem Code zufällig ausgelöst werden kann).
Mikhail Gerasimov
Ich mache mir damit (und anderen Antworten) Sorgen, dass die Wiederholungsrate nicht genau dem Zeitwert entspricht. Wenn die Ausführung von func eine nennenswerte Zeit in Anspruch nimmt, wird sie nicht einmal in der Nähe sein, und über einen langen Zeitraum wird sie driften, selbst wenn func vernachlässigbare Zeit benötigt.
Ian Goldby
Genau genommen start()muss es nicht sein async.
Fgiraldeau
Dies kann aktualisiert werden, um sowohl normale als auch asynchrone Funktionen zu unterstützen: `` `async def _run (self): while True: Warten auf asyncio.sleep (self.time) # Unterstützung von normalen und asynchronen Funktionen res = self.func () bei Inspektion. isawaitable (res): warte auf res `` `
Airstriker
23

Es gibt keine integrierte Unterstützung für regelmäßige Anrufe, nein.

Erstellen Sie einfach Ihre eigene Scheduler-Schleife, die alle geplanten Aufgaben in den Ruhezustand versetzt und ausführt:

import math, time

async def scheduler():
    while True:
        # sleep until the next whole second
        now = time.time()
        await asyncio.sleep(math.ceil(now) - now)

        # execute any scheduled tasks
        await for task in scheduled_tasks(time.time()):
            await task()

Der scheduled_tasks()Iterator sollte Aufgaben erstellen, die zum angegebenen Zeitpunkt ausgeführt werden können. Beachten Sie, dass das Erstellen des Zeitplans und das Starten aller Aufgaben theoretisch länger als 1 Sekunde dauern kann. Die Idee dabei ist, dass der Scheduler alle Aufgaben liefert, die seit der letzten Überprüfung hätten gestartet werden sollen.

Martijn Pieters
quelle
Die asyncioEreignisschleife verfügt über eine time()Methode, die anstelle des timeModuls verwendet werden kann.
krs013
3
@ krs013: Das ist eine andere Uhr ; Es gibt Ihnen nicht unbedingt Echtzeit (es hängt von der Implementierung der Ereignisschleife ab und kann CPU-Zeit-Ticks oder ein anderes monoton ansteigendes Taktmaß messen). Da nicht garantiert wird, dass ein Maß in Sekunden angegeben wird, sollte es hier nicht verwendet werden.
Martijn Pieters
Oh, guter Punkt, danke. Ich dachte mir, dass es gut genug für das Intervall-Timing wäre, aber es sieht so aus, als ob keine Garantie für die Genauigkeit in schlafenden Fäden gegeben wird. Die Implementierungen, die ich gesehen habe, scheinen nur die Verfügbarkeit der Maschinen in Nanosekunden zu nutzen, aber ja, Sie haben Recht. Ich glaube, ich muss jetzt einen Code reparieren ...
krs013
In der Dokumentation der loop.timeMethode heißt es: "Dies ist ein Gleitkommawert, der seit einer Epoche in Sekunden ausgedrückt wird. Epoche, Präzision, Genauigkeit und Drift sind jedoch nicht angegeben und können je nach Ereignisschleife unterschiedlich sein." Hier interpretiere ich dies als "SI Sekunden seit einer Epoche", daher tickt die CPU-Zeit oder andere nicht "einheitliche" Uhren gelten nicht als gültig für loop.time(). Da das OP nur alle x Millisekunden um einen regelmäßigen Rückruf gebeten hat, scheint es mir loop.time()für diesen Zweck angemessen zu sein.
Stefano M
@StefanoM: Ja, es mag angemessen sein, ist jedoch von der Implementierung der Ereignisschleife abhängig und die Dokumentzeichenfolge gibt den Implementierungen viel Spielraum. Es mag gut genug sein, um Aufgaben zu wiederholen, aber meine Antwort beschreibt einen Planer , der oft Cron-ähnliche Dinge tun muss (z. B. Aufgaben zu bestimmten realen Zeiten ausführen).
Martijn Pieters
12

Eine Variante, die hilfreich sein kann: Wenn Sie möchten, dass Ihr wiederkehrender Aufruf alle n Sekunden statt n Sekunden zwischen dem Ende der letzten Ausführung und dem Beginn der nächsten ausgeführt wird und Sie nicht möchten, dass sich die Aufrufe zeitlich überschneiden, gehen Sie wie folgt vor ist einfacher:

async def repeat(interval, func, *args, **kwargs):
    """Run func every interval seconds.

    If func has not finished before *interval*, will run again
    immediately when the previous iteration finished.

    *args and **kwargs are passed as the arguments to func.
    """
    while True:
        await asyncio.gather(
            func(*args, **kwargs),
            asyncio.sleep(interval),
        )

Und ein Beispiel für die Verwendung einiger Aufgaben im Hintergrund:

async def f():
    await asyncio.sleep(1)
    print('Hello')


async def g():
    await asyncio.sleep(0.5)
    print('Goodbye')


async def main():
    t1 = asyncio.ensure_future(repeat(3, f))
    t2 = asyncio.ensure_future(repeat(2, g))
    await t1
    await t2

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Fred Ross
quelle
Vielen Dank! Ich hatte dieses Problem, während mein Server stark ausgelastet war, und bei vielen Wiederholungen kam es zu einem Zeitversatz. Dies löst es elegant.
Christian Oudard
Warum verwenden Sie sure_future in main ()? Warum nicht einfach await repeat(3, f)und await repeat(2, g)?
Marcoc88
8

Alternative Version mit Dekorator für Python 3.7

import asyncio
import time


def periodic(period):
    def scheduler(fcn):

        async def wrapper(*args, **kwargs):

            while True:
                asyncio.create_task(fcn(*args, **kwargs))
                await asyncio.sleep(period)

        return wrapper

    return scheduler


@periodic(2)
async def do_something(*args, **kwargs):
    await asyncio.sleep(5)  # Do some heavy calculation
    print(time.time())


if __name__ == '__main__':
    asyncio.run(do_something('Maluzinha do papai!', secret=42))
Fernando José Esteves de Souza
quelle
4

Basierend auf @A. Die Antwort von Jesse Jiryu Davis (mit Kommentaren von @Torkel Bjørnson-Langen und @ReWrite) ist eine Verbesserung, die Drift vermeidet.

import time
import asyncio

@asyncio.coroutine
def periodic(period):
    def g_tick():
        t = time.time()
        count = 0
        while True:
            count += 1
            yield max(t + count * period - time.time(), 0)
    g = g_tick()

    while True:
        print('periodic', time.time())
        yield from asyncio.sleep(next(g))

loop = asyncio.get_event_loop()
task = loop.create_task(periodic(1))
loop.call_later(5, task.cancel)

try:
    loop.run_until_complete(task)
except asyncio.CancelledError:
    pass
Wojciech Migda
quelle
2
periodicsollte wahrscheinlich loop.time()bevorzugt verwendet werden, time.time()da loop.time()die Zeitreferenz intern von verwendet wird asyncio.sleep(). Gibt die loop.time()monotone Zeit zurück, während time.time()die Wanduhrzeit zurückgegeben wird. Die beiden unterscheiden sich beispielsweise, wenn ein Systemadministrator das Datum auf dem System ändert oder wenn NTP die Wanduhrzeit anpasst.
user4815162342