Erstellen einer asynchronen Aufgabe in Flask

94

Ich schreibe eine Anwendung in Flask, die sehr gut funktioniert, außer dass sie WSGIsynchron und blockierend ist. Ich habe insbesondere eine Aufgabe, die eine Drittanbieter-API aufruft, und diese Aufgabe kann einige Minuten dauern. Ich möchte diesen Anruf tätigen (es ist eigentlich eine Reihe von Anrufen) und ihn laufen lassen. während die Kontrolle an Flask zurückgegeben wird.

Meine Ansicht sieht aus wie:

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    data = json.loads(request.data)
    text_list = data.get('text_list')
    final_file = audio_class.render_audio(data=text_list)
    # do stuff
    return Response(
        mimetype='application/json',
        status=200
    )

Was ich jetzt tun möchte, ist die Leitung zu haben

final_file = audio_class.render_audio()

Führen Sie einen Rückruf aus und stellen Sie ihn bereit, der ausgeführt werden soll, wenn die Methode zurückgegeben wird, während Flask weiterhin Anforderungen verarbeiten kann. Dies ist die einzige Aufgabe, die Flask asynchron ausführen muss, und ich möchte einige Ratschläge, wie dies am besten implementiert werden kann.

Ich habe mir Twisted und Klein angesehen, bin mir aber nicht sicher, ob sie übertrieben sind, da Threading vielleicht ausreichen würde. Oder ist Sellerie dafür eine gute Wahl?

Darwin Tech
quelle
Ich benutze normalerweise Sellerie dafür ... es könnte übertrieben sein, aber afaik Threading funktioniert nicht gut in Webumgebungen (iirc ...)
Joran Beasley
Richtig. Ja - ich habe gerade Sellerie untersucht. Es könnte ein guter Ansatz sein. Einfach mit Flask zu implementieren?
Darwin Tech
heh ich neige dazu, auch einen Socket-Server zu verwenden (flask-socketio) und ja, ich fand es ziemlich einfach ... das Schwierigste war, alles zu installieren
Joran Beasley
3
Ich würde empfehlen, dies zu überprüfen . Dieser Typ schreibt großartige Tutorials für Kolben im Allgemeinen, und diese ist großartig, um zu verstehen, wie asynchrone Aufgaben in eine Kolben-App integriert werden.
Atlspin

Antworten:

98

Ich würde Sellerie verwenden , um die asynchrone Aufgabe für Sie zu erledigen. Sie müssen einen Broker installieren, der als Aufgabenwarteschlange dient (RabbitMQ und Redis werden empfohlen).

app.py::

from flask import Flask
from celery import Celery

broker_url = 'amqp://guest@localhost'          # Broker URL for RabbitMQ task queue

app = Flask(__name__)    
celery = Celery(app.name, broker=broker_url)
celery.config_from_object('celeryconfig')      # Your celery configurations in a celeryconfig.py

@celery.task(bind=True)
def some_long_task(self, x, y):
    # Do some long task
    ...

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    data = json.loads(request.data)
    text_list = data.get('text_list')
    final_file = audio_class.render_audio(data=text_list)
    some_long_task.delay(x, y)                 # Call your async task and pass whatever necessary variables
    return Response(
        mimetype='application/json',
        status=200
    )

Führen Sie Ihre Flask-App aus und starten Sie einen weiteren Prozess, um Ihren Sellerie-Arbeiter auszuführen.

$ celery worker -A app.celery --loglevel=debug

Ich verweise auch auf Miguel Gringberg die bis zu schreiben für eine mehr in die Tiefe Anleitung zur Verwendung von Sellerie mit Flask.

Connie
quelle
33

Einfädeln ist eine weitere mögliche Lösung. Obwohl die auf Sellerie basierende Lösung für skalierte Anwendungen besser geeignet ist, ist Threading eine praktikable Alternative, wenn Sie nicht zu viel Datenverkehr auf dem betreffenden Endpunkt erwarten.

Diese Lösung basiert auf Miguel Grinbergs PyCon 2016 Flask at Scale-Präsentation , insbesondere Folie 41 in seinem Dia-Deck. Sein Code ist auch auf Github für diejenigen verfügbar, die an der Originalquelle interessiert sind.

Aus Anwendersicht funktioniert der Code wie folgt:

  1. Sie rufen den Endpunkt auf, der die lange laufende Aufgabe ausführt.
  2. Dieser Endpunkt gibt 202 Akzeptiert mit einem Link zurück, um den Aufgabenstatus zu überprüfen.
  3. Aufrufe der Statusverbindung geben 202 zurück, während die Takes noch ausgeführt werden, und geben 200 (und das Ergebnis) zurück, wenn die Aufgabe abgeschlossen ist.

Um einen API-Aufruf in eine Hintergrundaufgabe zu konvertieren, fügen Sie einfach den @ async_api-Dekorator hinzu.

Hier ist ein vollständig enthaltenes Beispiel:

from flask import Flask, g, abort, current_app, request, url_for
from werkzeug.exceptions import HTTPException, InternalServerError
from flask_restful import Resource, Api
from datetime import datetime
from functools import wraps
import threading
import time
import uuid

tasks = {}

app = Flask(__name__)
api = Api(app)


@app.before_first_request
def before_first_request():
    """Start a background thread that cleans up old tasks."""
    def clean_old_tasks():
        """
        This function cleans up old tasks from our in-memory data structure.
        """
        global tasks
        while True:
            # Only keep tasks that are running or that finished less than 5
            # minutes ago.
            five_min_ago = datetime.timestamp(datetime.utcnow()) - 5 * 60
            tasks = {task_id: task for task_id, task in tasks.items()
                     if 'completion_timestamp' not in task or task['completion_timestamp'] > five_min_ago}
            time.sleep(60)

    if not current_app.config['TESTING']:
        thread = threading.Thread(target=clean_old_tasks)
        thread.start()


def async_api(wrapped_function):
    @wraps(wrapped_function)
    def new_function(*args, **kwargs):
        def task_call(flask_app, environ):
            # Create a request context similar to that of the original request
            # so that the task can have access to flask.g, flask.request, etc.
            with flask_app.request_context(environ):
                try:
                    tasks[task_id]['return_value'] = wrapped_function(*args, **kwargs)
                except HTTPException as e:
                    tasks[task_id]['return_value'] = current_app.handle_http_exception(e)
                except Exception as e:
                    # The function raised an exception, so we set a 500 error
                    tasks[task_id]['return_value'] = InternalServerError()
                    if current_app.debug:
                        # We want to find out if something happened so reraise
                        raise
                finally:
                    # We record the time of the response, to help in garbage
                    # collecting old tasks
                    tasks[task_id]['completion_timestamp'] = datetime.timestamp(datetime.utcnow())

                    # close the database session (if any)

        # Assign an id to the asynchronous task
        task_id = uuid.uuid4().hex

        # Record the task, and then launch it
        tasks[task_id] = {'task_thread': threading.Thread(
            target=task_call, args=(current_app._get_current_object(),
                               request.environ))}
        tasks[task_id]['task_thread'].start()

        # Return a 202 response, with a link that the client can use to
        # obtain task status
        print(url_for('gettaskstatus', task_id=task_id))
        return 'accepted', 202, {'Location': url_for('gettaskstatus', task_id=task_id)}
    return new_function


class GetTaskStatus(Resource):
    def get(self, task_id):
        """
        Return status about an asynchronous task. If this request returns a 202
        status code, it means that task hasn't finished yet. Else, the response
        from the task is returned.
        """
        task = tasks.get(task_id)
        if task is None:
            abort(404)
        if 'return_value' not in task:
            return '', 202, {'Location': url_for('gettaskstatus', task_id=task_id)}
        return task['return_value']


class CatchAll(Resource):
    @async_api
    def get(self, path=''):
        # perform some intensive processing
        print("starting processing task, path: '%s'" % path)
        time.sleep(10)
        print("completed processing task, path: '%s'" % path)
        return f'The answer is: {path}'


api.add_resource(CatchAll, '/<path:path>', '/')
api.add_resource(GetTaskStatus, '/status/<task_id>')


if __name__ == '__main__':
    app.run(debug=True)
Jürgen Strydom
quelle
8

Sie können auch versuchen, multiprocessing.Processmit zu verwenden daemon=True; Die process.start()Methode blockiert nicht und Sie können eine Antwort / einen Status sofort an den Aufrufer zurückgeben, während Ihre teure Funktion im Hintergrund ausgeführt wird.

Ich hatte ein ähnliches Problem, als ich mit dem Falcon Framework arbeitete und den daemonProzess half.

Sie müssten Folgendes tun:

from multiprocessing import Process

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    heavy_process = Process(  # Create a daemonic process with heavy "my_func"
        target=my_func,
        daemon=True
    )
    heavy_process.start()
    return Response(
        mimetype='application/json',
        status=200
    )

# Define some heavy function
def my_func():
    time.sleep(10)
    print("Process finished")

Sie sollten sofort eine Antwort erhalten und nach 10 Sekunden sollte eine gedruckte Nachricht in der Konsole angezeigt werden.

HINWEIS: Beachten Sie, dass daemonicProzesse keine untergeordneten Prozesse erzeugen dürfen.

Tomasz Bartkowiak
quelle
Asynchron ist eine bestimmte Art von Parallelität, die weder Threading noch Multiprocessing ist. Threading ist jedoch viel näher als asynchrone Aufgabe,
Tortal
3
Ich verstehe deinen Standpunkt nicht. Der Autor spricht von einer asynchronen Aufgabe, bei der es sich um die Aufgabe handelt, die "im Hintergrund" ausgeführt wird, sodass der Anrufer nicht blockiert, bis er eine Antwort erhält. Das Laichen eines Deamon-Prozesses ist ein Beispiel dafür, wo eine solche Asynchronität erreicht werden kann.
Tomasz Bartkowiak
Was ist, wenn der /render/<id>Endpunkt etwas erwartet my_func()?
Will Gu
Sie können beispielsweise eine my_funcAntwort / einen Herzschlag an einen anderen Endpunkt senden. Oder Sie können eine Nachrichtenwarteschlange einrichten und freigeben, über die Sie kommunizieren könnenmy_func
Tomasz Bartkowiak