Wie erstelle ich gleichzeitige Aufgaben richtig und führe sie mit dem Asyncio-Modul von Python aus?

76

Ich versuche, zwei gleichzeitig ausgeführte TaskObjekte mit dem relativ neuen asyncioModul von Python 3 richtig zu verstehen und zu implementieren .

Kurz gesagt, Asyncio scheint für asynchrone Prozesse und die gleichzeitige TaskAusführung über eine Ereignisschleife ausgelegt zu sein. Es fördert die Verwendung von await(in asynchronen Funktionen angewendet) als rückruffreie Möglichkeit, auf ein Ergebnis zu warten und es zu verwenden, ohne die Ereignisschleife zu blockieren. (Futures und Callbacks sind immer noch eine praktikable Alternative.)

Es bietet auch die asyncio.Task()Klasse, eine spezielle Unterklasse von Future Coroutinen zum Umwickeln. Vorzugsweise unter Verwendung der asyncio.ensure_future()Methode aufgerufen . Die beabsichtigte Verwendung von Asyncio-Aufgaben besteht darin, dass unabhängig ausgeführte Aufgaben "gleichzeitig" mit anderen Aufgaben innerhalb derselben Ereignisschleife ausgeführt werden können. Nach meinem Verständnis Taskssind diese mit der Ereignisschleife verbunden, die dann automatisch die Coroutine zwischen den awaitAnweisungen steuert .

Ich mag die Idee, gleichzeitige Aufgaben verwenden zu können, ohne eine der ExecutorKlassen verwenden zu müssen, aber ich habe nicht viel über die Implementierung herausgefunden.

So mache ich es gerade:

import asyncio

print('running async test')

async def say_boo():
    i = 0
    while True:
        await asyncio.sleep(0)
        print('...boo {0}'.format(i))
        i += 1

async def say_baa():
    i = 0
    while True:
        await asyncio.sleep(0)
        print('...baa {0}'.format(i))
        i += 1

# wrap in Task object
# -> automatically attaches to event loop and executes
boo = asyncio.ensure_future(say_boo())
baa = asyncio.ensure_future(say_baa())

loop = asyncio.get_event_loop()
loop.run_forever()

Beim Versuch, zwei Schleifenaufgaben gleichzeitig auszuführen, ist mir aufgefallen, dass die Aufgabe, sofern awaitsie whilekeinen internen Ausdruck hat, in der Schleife hängen bleibt und andere Aufgaben effektiv daran hindert, ausgeführt zu werden (ähnlich wie bei einer normalen whileSchleife). Sobald die Aufgaben jedoch (a) warten müssen, scheinen sie ohne Probleme gleichzeitig ausgeführt zu werden.

Somit awaitscheinen die Anweisungen der Ereignisschleife einen Halt zu geben, um zwischen den Aufgaben hin und her zu wechseln, was den Effekt der Parallelität ergibt.

Beispielausgabe mit intern await:

running async test
...boo 0
...baa 0
...boo 1
...baa 1
...boo 2
...baa 2

Beispielausgabe ohne interne await:

...boo 0
...boo 1
...boo 2
...boo 3
...boo 4

Fragen

Gilt diese Implementierung als "richtiges" Beispiel für gleichzeitige Schleifenaufgaben asyncio?

Ist es richtig, dass dies nur funktioniert, wenn a Taskeinen Blockierungspunkt ( awaitAusdruck) bereitstellt , damit die Ereignisschleife mehrere Aufgaben jonglieren kann?

Songololo
quelle
4
Ja, die Aufgabe wird atomar von yield fromder nächsten ausgeführt yield from.
Andrew Svetlov

Antworten:

79

Ja, jede Coroutine, die in Ihrer Ereignisschleife ausgeführt wird, blockiert die Ausführung anderer Coroutinen und Aufgaben, sofern dies nicht der Fall ist

  1. Ruft eine andere Coroutine mit yield fromoder auf await(wenn Sie Python 3.5+ verwenden).
  2. Kehrt zurück.

Dies liegt daran, dass asyncioSingle-Threaded ist; Die einzige Möglichkeit, die Ereignisschleife auszuführen, besteht darin, dass keine andere Coroutine aktiv ausgeführt wird. Durch vorübergehende Verwendung yield from/ awaitUnterbrechung der Coroutine kann die Ereignisschleife ausgeführt werden.

Ihr Beispielcode ist in Ordnung, aber in vielen Fällen möchten Sie wahrscheinlich nicht, dass Code mit langer Laufzeit, der keine asynchrone E / A ausführt, zunächst in der Ereignisschleife ausgeführt wird. In diesen Fällen ist es häufig sinnvoller, asyncio.loop.run_in_executorden Code in einem Hintergrundthread oder -prozess auszuführen. ProcessPoolExecutorwäre die bessere Wahl, wenn Ihre Aufgabe CPU-gebunden ist, ThreadPoolExecutorwürde verwendet werden, wenn Sie eine E / A asyncioausführen müssen, die nicht freundlich ist.

Ihre beiden Schleifen sind beispielsweise vollständig CPU-gebunden und haben keinen gemeinsamen Status. Die beste Leistung ergibt sich daher aus der ProcessPoolExecutorparallelen Ausführung jeder Schleife zwischen CPUs:

import asyncio
from concurrent.futures import ProcessPoolExecutor

print('running async test')

def say_boo():
    i = 0
    while True:
        print('...boo {0}'.format(i))
        i += 1


def say_baa():
    i = 0
    while True:
        print('...baa {0}'.format(i))
        i += 1

if __name__ == "__main__":
    executor = ProcessPoolExecutor(2)
    loop = asyncio.get_event_loop()
    boo = asyncio.create_task(loop.run_in_executor(executor, say_boo))
    baa = asyncio.create_task(loop.run_in_executor(executor, say_baa))

    loop.run_forever()
dano
quelle
Vielen Dank. Tolles Timing, da ich mich gerade über dieses Thema bezüglich der Verwendung von Executoren gewundert habe.
Songololo
Versuchen Sie den obigen Code und stellen Sie fest, dass die Boo-Task die Ausführung von Baa blockiert, es sei denn, ich füge die Ausbeute von asyncio.sleep (0) in jede der while-True-Schleifen ein?
Songololo
Außerdem wurden die run_in_executor-Bits wie folgt überarbeitet: loop.run_in_executor (executor, asyncio.Task (say_boo ()))
songololo
2
@shongololo Sorry, behoben. asyncio.asyncsollte anstelle des asyncio.TaskKonstruktors direkt verwendet werden. Wir wollen say_boound wollen say_baakeine Coroutinen sein, sie sollten nur gewöhnliche Funktionen sein, die außerhalb der Ereignisschleife ausgeführt werden. Sie sollten ihnen also keine yield fromAufrufe hinzufügen oder sie in eine einschließen asyncio.Task.
Dano
1
Sieht aus wie asyncio.async ist ein Alias, um_future sicherzustellen und ist jetzt veraltet
srobinson
14

Sie müssen nicht unbedingt yield from xdie Ereignisschleife steuern.

In Ihrem Beispiel denke ich, dass der richtige Weg darin besteht, ein yield Noneoder gleichwertiges einfaches yieldstatt eines yield from asyncio.sleep(0.001):

import asyncio

@asyncio.coroutine
def say_boo():
  i = 0
  while True:
    yield None
    print("...boo {0}".format(i))
    i += 1

@asyncio.coroutine
def say_baa():
  i = 0
  while True:
    yield
    print("...baa {0}".format(i))
    i += 1

boo_task = asyncio.async(say_boo())
baa_task = asyncio.async(say_baa())

loop = asyncio.get_event_loop()
loop.run_forever()

Coroutinen sind einfach alte Python-Generatoren. Intern zeichnet die asyncioEreignisschleife diese Generatoren auf und ruft gen.send()sie einzeln in einer Endlosschleife auf. Wann immer Sie yield, wird der Aufruf gen.send()abgeschlossen und die Schleife kann fortgesetzt werden. (Ich vereinfache es. Schauen Sie sich in https://hg.python.org/cpython/file/3.4/Lib/asyncio/tasks.py#l265 nach dem tatsächlichen Code um.)

Trotzdem würde ich immer noch den run_in_executorWeg gehen , wenn Sie CPU-intensive Berechnungen durchführen müssen, ohne Daten gemeinsam zu nutzen.

Jashandeep Sohi
quelle
Funktioniert in Python 3.4, scheint aber in Python 3.5 nicht zu funktionieren. Gibt es einen ähnlichen Ansatz für 3.5? ( Nonescheint eleganter zu sein als asyncio.sleep()überall zu verwenden ...)
Songololo
22
Seit Python 3.5 ist der richtige Weg, dies zu tun, mit a asyncio.sleep(0). Siehe diese Diskussion.
Jashandeep Sohi