Wie überprüfe ich den Aufgabenstatus in Sellerie?

90

Wie prüft man, ob eine Aufgabe in Sellerie ausgeführt wird (insbesondere verwende ich Sellerie-Django)?

Ich habe die Dokumentation gelesen und gegoogelt, kann aber keinen Anruf wie folgt sehen:

my_example_task.state() == RUNNING

Mein Anwendungsfall ist, dass ich einen externen (Java) Dienst zum Transcodieren habe. Wenn ich ein Dokument zur Transcodierung sende, möchte ich überprüfen, ob die Aufgabe, die diesen Dienst ausführt, ausgeführt wird, und wenn nicht, es (neu) starten.

Ich verwende die aktuellen stabilen Versionen - 2.4, glaube ich.

Marcin
quelle

Antworten:

96

Geben Sie die task_id (die von .delay () angegeben wird) zurück und fragen Sie anschließend die Sellerieinstanz nach dem Status:

x = method.delay(1,2)
print x.task_id

Wenn Sie fragen, erhalten Sie ein neues AsyncResult mit dieser task_id:

from celery.result import AsyncResult
res = AsyncResult("your-task-id")
res.ready()
Gregor
quelle
10
Danke, aber was ist, wenn ich keinen Zugang habe x?
Marcin
4
Wo reihen Sie Ihre Jobs in Sellerie ein? Dort müssen Sie die task_id zurückgeben, um den Job in Zukunft zu verfolgen.
Gregor
Im Gegensatz zu @ Marcin verwendet diese Antwort nicht die statische Methode Task.AsyncResult () als Factory des AsyncResult, die die Backend-Konfiguration hilfreich wiederverwendet. Andernfalls wird beim Versuch, das Ergebnis abzurufen, ein Fehler ausgegeben.
ArnauOrriols
2
@Chris Die Kontroverse mit @gregor Code ist in der Instanziierung von async_result. In Ihrem Anwendungsfall, in dem Sie die Instanz bereits haben, können Sie loslegen. Aber was passiert, wenn Sie nur die Task-ID haben und eine async_resultInstanz instanziieren müssen, um aufrufen zu können async_result.get()? Dies ist eine Instanz der AsyncResultKlasse, aber Sie können die Raw-Klasse nicht verwenden celery.result.AsyncResult. Sie müssen die Klasse aus der Funktion abrufen, die von umschlossen wird app.task(). In async_result = run_instance.AsyncResult('task-id')
Ihrem
1
but you cannot use the raw class celery.result.AsyncResult, you need to get the class from the function wrapped by app.task(). - Ich denke, so sollte es eigentlich verwendet werden. Lesen Sie den Code: github.com/celery/celery/blob/…
nevelis
68

Das Erstellen eines AsyncResultObjekts aus der Aufgaben-ID wird in den häufig gestellten Fragen empfohlen , um den Aufgabenstatus zu erhalten, wenn Sie nur die Aufgaben-ID haben.

Ab Sellerie 3.x gibt es jedoch erhebliche Einschränkungen, die Menschen beißen können, wenn sie nicht auf sie achten. Es hängt wirklich vom spezifischen Anwendungsfall ab.

Standardmäßig zeichnet Sellerie keinen "laufenden" Zustand auf.

Damit Sellerie aufzeichnet, dass eine Aufgabe ausgeführt wird, müssen Sie festlegen task_track_started, dass True. Hier ist eine einfache Aufgabe, die dies testet:

@app.task(bind=True)
def test(self):
    print self.AsyncResult(self.request.id).state

Wann task_track_startedist False, was die Standardeinstellung ist , wird die Statusanzeige angezeigt, PENDINGobwohl die Aufgabe gestartet wurde. Wenn Sie setzen task_track_startedauf True, dann wird der Staat sein STARTED.

Der Staat PENDINGbedeutet "Ich weiß es nicht."

Ein AsyncResultmit dem Staat PENDINGbedeutet nichts weiter als dass Sellerie den Status der Aufgabe nicht kennt. Dies kann verschiedene Gründe haben.

Zum einen AsyncResultkann mit ungültigen Task-IDs erstellt werden. Solche "Aufgaben" werden von Sellerie als anhängig angesehen:

>>> task.AsyncResult("invalid").status
'PENDING'

Ok, also wird niemand offensichtlich ungültige IDs füttern AsyncResult. Fair genug, aber es hat auch Auswirkungen, AsyncResultdie auch eine Aufgabe berücksichtigen, die erfolgreich ausgeführt wurde, die Sellerie jedoch vergessen hat PENDING. Auch in einigen Anwendungsszenarien kann dies ein Problem sein. Ein Teil des Problems hängt davon ab, wie Sellerie so konfiguriert ist, dass die Ergebnisse von Aufgaben gespeichert werden, da dies von der Verfügbarkeit der "Grabsteine" im Ergebnis-Backend abhängt. ("Tombstones" ist der Begriff, der in der Sellerie-Dokumentation für die Datenblöcke verwendet wird, die aufzeichnen, wie die Aufgabe beendet wurde.) Die Verwendung AsyncResultfunktioniert überhaupt nicht, wenn dies der Fall task_ignore_resultist True. Ein ärgerlicheres Problem ist, dass Sellerie die Grabsteine ​​standardmäßig abläuft. Dasresult_expiresDie Standardeinstellung ist 24 Stunden. Wenn Sie also eine Aufgabe starten und die ID im Langzeitspeicher aufzeichnen und 24 Stunden später eine ID damit erstellen AsyncResult, lautet der Status PENDING.

Alle "echten Aufgaben" beginnen im PENDINGZustand. Das Einsteigen PENDINGin eine Aufgabe kann also bedeuten, dass die Aufgabe angefordert wurde, aber nie weiter fortgeschritten ist (aus welchem ​​Grund auch immer). Oder es könnte bedeuten, dass die Aufgabe ausgeführt wurde, aber Sellerie ihren Zustand vergessen hat.

Autsch! AsyncResultwird nicht für mich arbeiten. Was kann ich sonst noch tun?

Ich ziehe es vor, die Ziele im Auge zu behalten, als die Aufgaben selbst im Auge zu behalten . Ich behalte einige Aufgabeninformationen, aber es ist wirklich zweitrangig, die Ziele zu verfolgen. Die Ziele werden unabhängig von Sellerie gespeichert. Wenn eine Anforderung eine Berechnung durchführen muss, die davon abhängt, dass ein Ziel erreicht wurde, prüft sie, ob das Ziel bereits erreicht wurde. Wenn ja, verwendet sie dieses zwischengespeicherte Ziel. Andernfalls wird die Aufgabe gestartet, die das Ziel beeinflusst, und an gesendet Der Client, der die HTTP-Anforderung zu einer Antwort gemacht hat, die angibt, dass er auf ein Ergebnis warten soll.


Die obigen Variablennamen und Hyperlinks gelten für Sellerie 4.x. In 3.x die entsprechenden Variablen und Hyperlinks sind: CELERY_TRACK_STARTED, CELERY_IGNORE_RESULT, CELERY_TASK_RESULT_EXPIRES.

Louis
quelle
Wenn ich also das Ergebnis später überprüfen möchte (vielleicht sogar innerhalb eines anderen Prozesses), bin ich mit meiner eigenen Implementierung besser dran? Das Ergebnis manuell in der Datenbank speichern?
Franklin Yu
Ja, ich würde die Verfolgung des "Ziels" von der Verfolgung der "Aufgaben" trennen. Ich schrieb "führe eine Berechnung durch, die von einem Ziel abhängt". Normalerweise ist das "Ziel" auch eine Berechnung. Wenn ich beispielsweise Artikel X einem Benutzer anzeigen möchte, muss ich ihn von XML in HTML konvertieren, aber zuvor muss ich alle bibliografischen Verweise aufgelöst haben. (X ist wie ein Zeitschriftenartikel.) Ich überprüfe, ob das Ziel "Artikel X mit allen aufgelösten Literaturangaben" vorhanden ist, und verwende dies, anstatt zu versuchen, den Aufgabenstatus einer Sellerie-Aufgabe zu überprüfen, die das gewünschte Ziel berechnet hätte.
Louis
Die Informationen "Artikel X mit allen aufgelösten Literaturangaben" werden in einem Speichercache und in einer eXist-db-Datenbank gespeichert.
Louis
61

Jedes TaskObjekt hat eine .requestEigenschaft, die es enthält AsyncRequest. Dementsprechend gibt die folgende Zeile den Status einer Aufgabe an task:

task.AsyncResult(task.request.id).state
Marcin
quelle
2
Gibt es eine Möglichkeit, den Prozentsatz des Fortschritts einer Aufgabe zu speichern?
Patrick
4
Wenn ich das mache, bekomme ich ein permanent PENDING AsyncResult, auch wenn ich lange genug warte, bis die Aufgabe beendet ist. Gibt es eine Möglichkeit, dies zu sehen, um Statusänderungen zu sehen? Ich glaube, mein Backend ist konfiguriert und ich habe versucht, CELERY_TRACK_STARTED = True ohne Erfolg zu setzen.
Dstromberg
1
@dstromberg Leider ist es 4 Jahre her, dass dies ein Problem für mich war, also kann ich nicht anders. Sie müssen mit ziemlicher Sicherheit Sellerie konfigurieren, um den Status zu verfolgen.
Marcin
16

Sie können auch benutzerdefinierte Status erstellen und die Ausführung der Aufgaben für den Wert aktualisieren. Dieses Beispiel stammt aus Dokumenten:

@app.task(bind=True)
def upload_files(self, filenames):
    for i, file in enumerate(filenames):
        if not self.request.called_directly:
            self.update_state(state='PROGRESS',
                meta={'current': i, 'total': len(filenames)})

http://celery.readthedocs.org/en/latest/userguide/tasks.html#custom-states

msangel
quelle
10

Alte Frage, aber ich bin kürzlich auf dieses Problem gestoßen.

Wenn Sie versuchen, die task_id abzurufen, können Sie dies folgendermaßen tun:

import celery
from celery_app import add
from celery import uuid

task_id = uuid()
result = add.apply_async((2, 2), task_id=task_id)

Jetzt wissen Sie genau, was die task_id ist, und können sie jetzt verwenden, um das AsyncResult abzurufen:

# grab the AsyncResult 
result = celery.result.AsyncResult(task_id)

# print the task id
print result.task_id
09dad9cf-c9fa-4aee-933f-ff54dae39bdf

# print the AsyncResult's status
print result.status
SUCCESS

# print the result returned 
print result.result
4
Cesar Rios
quelle
3
Es ist absolut nicht erforderlich, eine eigene Aufgaben-ID zu erstellen und an diese weiterzugeben apply_async. Das von zurückgegebene Objekt apply_async ist ein AsyncResultObjekt, das die ID der von Celery generierten Aufgabe enthält.
Louis
Korrigieren Sie mich, wenn ich falsch liege, aber ist es manchmal nicht nützlich, eine UUID basierend auf einigen Eingaben zu generieren, damit alle Anrufe, die dieselben Eingaben erhalten, dieselbe UUID erhalten? IOW, vielleicht ist es manchmal nützlich, Ihre task_id anzugeben.
Dstromberg
1
@dstromberg Die vom OP gestellte Frage lautet "Wie überprüfe ich den Aufgabenstatus?" und die Antwort hier lautet "Wenn Sie versuchen, die task_id zu erhalten ...". Weder überprüfen Sie den Aufgabenstatus, noch task_idmüssen Sie selbst eine Aufgaben-ID generieren . In Ihrem Kommentar haben Sie sich einen Grund vorgestellt, der über "Wie überprüfe ich den Aufgabenstatus?" Und "Wenn Sie versuchen, die task_id zu erhalten ..." hinausgeht. Großartig, wenn Sie diesen Bedarf haben, aber dies ist nicht der Fall hier. (Außerdem macht die Verwendung uuid()zum Generieren einer Aufgaben-ID absolut nichts anderes als das, was Sellerie standardmäßig tut.)
Louis
6

Verwenden Sie einfach diese API aus den Sellerie-FAQ

result = app.AsyncResult(task_id)

Das funktioniert gut.

David Ding
quelle
1

Antwort von 2020:

#### tasks.py
@celery.task()
def mytask(arg1):
    print(arg1)

#### blueprint.py
@bp.route("/args/arg1=<arg1>")
def sleeper(arg1):
    process = mytask.apply_async(args=(arg1,)) #mytask.delay(arg1)
    state = process.state
    return f"Thanks for your patience, your job {process.task_id} \
             is being processed. Status {state}"
Adrian Garcia Moreno
quelle
0

Versuchen:

task.AsyncResult(task.request.id).state

Dadurch wird der Status der Sellerie-Aufgabe angezeigt. Wenn sich die Sellerie-Aufgabe bereits im Status FEHLER befindet , wird eine Ausnahme ausgelöst :

raised unexpected: KeyError('exc_type',)

Gogasca
quelle
0

Ich fand hilfreiche Informationen in der

Sellerieprojekt Arbeiterhandbuch Inspektionsarbeiter

In meinem Fall überprüfe ich, ob Sellerie läuft.

inspect_workers = task.app.control.inspect()
if inspect_workers.registered() is None:
    state = 'FAILURE'
else:
    state = str(task.state) 

Sie können mit inspect spielen, um Ihre Bedürfnisse zu erfüllen.

Nullocog
quelle
0
  • Zuerst , in Ihrer Sellerie-App:

vi my_celery_apps / app1.py

app = Celery(worker_name)
  • und als nächstes wechseln Sie in die Aufgabendatei , App aus Ihrem Sellerie-App-Modul importieren.

vi task / task1.py

from my_celery_apps.app1 import app

app.AsyncResult(taskid)

try:
   if task.state.lower() != "success":
        return
except:
    """ do something """

Sie ZhengChuan
quelle
-1

Abgesehen von dem oben genannten programmatischen Ansatz ist der Status der Verwendung der Blumenaufgabe leicht zu erkennen.

Echtzeitüberwachung mit Sellerieereignissen. Flower ist ein webbasiertes Tool zur Überwachung und Verwaltung von Sellerie-Clustern.

  1. Aufgabenfortschritt und -verlauf
  2. Möglichkeit, Aufgabendetails anzuzeigen (Argumente, Startzeit, Laufzeit usw.)
  3. Grafiken und Statistiken

Offizielles Dokument: Blumen - Sellerie - Überwachungstool

Installation:

$ pip install flower

Verwendung:

http://localhost:5555
Roshan Bagdiya
quelle
-1
res = method.delay()
    
print(f"id={res.id}, state={res.state}, status={res.status} ")

print(res.get())
Saurabh ich
quelle
2
Bitte posten Sie nicht nur Code als Antwort, sondern geben Sie auch eine Erklärung, was Ihr Code tut und wie er das Problem der Frage löst. Antworten mit einer Erklärung sind in der Regel hilfreicher und von besserer Qualität und ziehen eher positive Stimmen an.
Mark Rotteveel