from celery.app.control importInspect# Inspect all nodes.
i =Inspect()# Show the items that have an ETA or are scheduled for later processing
i.scheduled()# Show tasks that are currently active.
i.active()# Show tasks that have been claimed by workers
i.reserved()
Ich habe das versucht, aber es ist wirklich langsam (wie 1 Sek.). Ich verwende es synchron in einer Tornado-App, um den Fortschritt zu überwachen, also muss es schnell sein.
JulienFr
41
Dadurch wird keine Liste der Aufgaben in der Warteschlange zurückgegeben, die noch verarbeitet werden müssen.
Ed J
9
Verwenden Sie i.reserved()diese Option , um eine Liste der Aufgaben in der Warteschlange abzurufen.
Banane
4
Hat jemand erfahren, dass i.reserved () keine genaue Liste der aktiven Aufgaben hat? Ich habe Aufgaben ausgeführt, die nicht in der Liste angezeigt werden. Ich bin auf Django-Sellerie == 3.1.10
Seperman
6
Bei der Angabe des Arbeiters musste ich eine Liste als Argument verwenden : inspect(['celery@Flatty']). Riesige Geschwindigkeitsverbesserung vorbei inspect().
Adversus
42
Wenn Sie rabbitMQ verwenden, verwenden Sie dies im Terminal:
sudo rabbitmqctl list_queues
Es wird eine Liste der Warteschlangen mit der Anzahl der ausstehenden Aufgaben gedruckt. beispielsweise:
Ich bin damit vertraut, wenn ich über Sudo-Berechtigungen verfüge, aber ich möchte, dass ein nicht privilegierter Systembenutzer dies überprüfen kann - irgendwelche Vorschläge?
Salbei
Darüber hinaus können Sie dies durchleiten grep -e "^celery\s" | cut -f2, um zu extrahieren, 166ob Sie diese Nummer später verarbeiten möchten, z. B. für Statistiken.
Jamesc
21
Wenn Sie keine priorisierten Aufgaben verwenden, ist dies ziemlich einfach, wenn Sie Redis verwenden. So erhalten Sie die Anzahl der Aufgaben:
redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME
Priorisierte Aufgaben verwenden jedoch einen anderen Schlüssel in Redis , sodass das Gesamtbild etwas komplizierter ist. Das vollständige Bild ist, dass Sie Redis für jede Priorität der Aufgabe abfragen müssen. In Python (und aus dem Flower-Projekt) sieht dies folgendermaßen aus:
PRIORITY_SEP ='\x06\x16'
DEFAULT_PRIORITY_STEPS =[0,3,6,9]def make_queue_name_for_pri(queue, pri):"""Make a queue name for redis
Celery uses PRIORITY_SEP to separate different priorities of tasks into
different queues in Redis. Each queue-priority combination becomes a key in
redis with names like:
- batch1\x06\x163 <-- P3 queue named batch1
There's more information about this in Github, but it doesn't look like it
will change any time soon:
- https://github.com/celery/kombu/issues/422
In that ticket the code below, from the Flower project, is referenced:
- https://github.com/mher/flower/blob/master/flower/utils/broker.py#L135
:param queue: The name of the queue to make a name for.
:param pri: The priority to make a name with.
:return: A name for the queue-priority pair.
"""if pri notin DEFAULT_PRIORITY_STEPS:raiseValueError('Priority not in priority steps')return'{0}{1}{2}'.format(*((queue, PRIORITY_SEP, pri)if pri else(queue,'','')))def get_queue_length(queue_name='celery'):"""Get the number of tasks in a celery queue.
:param queue_name: The name of the queue you want to inspect.
:return: the number of items in the queue.
"""
priority_names =[make_queue_name_for_pri(queue_name, pri)for pri in
DEFAULT_PRIORITY_STEPS]
r = redis.StrictRedis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DATABASES['CELERY'],)return sum([r.llen(x)for x in priority_names])
Wenn Sie eine tatsächliche Aufgabe erhalten möchten, können Sie Folgendes verwenden:
redis-cli -h HOST -p PORT -n DATABASE_NUMBER lrange QUEUE_NAME 0-1
Von dort aus müssen Sie die zurückgegebene Liste deserialisieren. In meinem Fall konnte ich dies mit etwas erreichen wie:
r = redis.StrictRedis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DATABASES['CELERY'],)
l = r.lrange('celery',0,-1)
pickle.loads(base64.decodestring(json.loads(l[0])['body']))
Seien Sie nur gewarnt, dass die Deserialisierung einen Moment dauern kann und Sie die obigen Befehle anpassen müssen, um mit verschiedenen Prioritäten zu arbeiten.
Ich habe das oben Gesagte aktualisiert, um priorisierte Aufgaben zu erledigen. Fortschritt!
mlissner
1
Nur um die Dinge zu formulieren, DATABASE_NUMBERwird standardmäßig verwendet 0und das QUEUE_NAMEist celery, also redis-cli -n 0 llen celerywird die Anzahl der Nachrichten in der Warteschlange zurückgegeben.
Vineet Bansal
Für meinen Sellerie ist der Name der Warteschlange '{{{0}}}{1}{2}'anstelle von '{0}{1}{2}'. Davon abgesehen funktioniert das perfekt!
Zupo
12
Verwenden Sie diese Option, um Aufgaben aus dem Backend abzurufen
Wenn Sie Sellerie + Django verwenden , können Sie Aufgaben am einfachsten mithilfe von Befehlen direkt von Ihrem Terminal in Ihrer virtuellen Umgebung oder mithilfe eines vollständigen Pfads zum Sellerie überprüfen :
Wenn Sie ein definiertes Projekt haben, können Sie verwendencelery -A my_proj inspect reserved
sashaboulouds
6
Eine Copy-Paste-Lösung für Redis mit JSON-Serialisierung:
def get_celery_queue_items(queue_name):import base64
import json
# Get a configured instance of a celery app:from yourproject.celery import app as celery_app
with celery_app.pool.acquire(block=True)as conn:
tasks = conn.default_channel.client.lrange(queue_name,0,-1)
decoded_tasks =[]for task in tasks:
j = json.loads(task)
body = json.loads(base64.b64decode(j['body']))
decoded_tasks.append(body)return decoded_tasks
Es funktioniert mit Django. Vergiss nur nicht, dich zu ändern yourproject.celery.
Wenn Sie den Pickle-Serializer verwenden, können Sie die body =Zeile in ändern body = pickle.loads(base64.b64decode(j['body'])).
Jim Hunziker
4
Das Sellerie-Inspektionsmodul scheint die Aufgaben nur aus Sicht der Arbeitnehmer zu kennen. Wenn Sie die Nachrichten anzeigen möchten, die sich in der Warteschlange befinden (noch von den Mitarbeitern abgerufen werden müssen ), empfehle ich die Verwendung von Pyrabbit , das mit der rabbitmq http-API verbunden werden kann, um alle Arten von Informationen aus der Warteschlange abzurufen.
Ich denke, die einzige Möglichkeit, die wartenden Aufgaben zu erhalten, besteht darin, eine Liste der von Ihnen gestarteten Aufgaben zu führen und die Aufgabe beim Starten von der Liste entfernen zu lassen.
in meinem Fall jobsist immer Null ... irgendeine Idee?
Daveoncode
@daveoncode Ich glaube nicht, dass das genug Informationen sind, um hilfreich zu antworten. Sie könnten Ihre eigene Frage öffnen. Ich denke nicht, dass es ein Duplikat von diesem wäre, wenn Sie angeben, dass Sie die Informationen in Python abrufen möchten. Ich würde zurück zu stackoverflow.com/a/19465670/9843399 gehen , worauf ich meine Antwort gestützt habe, und sicherstellen, dass dies zuerst funktioniert.
Caleb Syring
@CalebSyring Dies ist der erste Ansatz, der mir die Aufgaben in der Warteschlange wirklich zeigt. Sehr schön. Das einzige Problem für mich ist, dass das Anhängen der Liste nicht zu funktionieren scheint. Irgendwelche Ideen, wie ich die Rückruffunktion dazu bringen kann, in die Liste zu schreiben?
Varlor
@Varlor Es tut mir leid, jemand hat meine Antwort falsch bearbeitet. Sie können im Bearbeitungsverlauf nach der ursprünglichen Antwort suchen, die höchstwahrscheinlich für Sie funktioniert. Ich arbeite daran, dies zu beheben. (BEARBEITEN: Ich bin gerade reingegangen und habe die Bearbeitung abgelehnt, bei der ein offensichtlicher Python-Fehler aufgetreten ist. Lassen Sie mich wissen, ob dies Ihr Problem behoben hat oder nicht.)
Caleb Syring
@CalebSyring Ich habe jetzt Ihren Code in einer Klasse verwendet, wobei die Liste als Klassenattribut funktioniert!
Varlor
2
Soweit ich weiß, gibt Celery keine API zum Untersuchen von Aufgaben an, die in der Warteschlange warten. Dies ist maklerspezifisch. Wenn Sie Redis als Broker für ein Beispiel verwenden, ist das Untersuchen von Aufgaben, die in der celery(Standard-) Warteschlange warten, so einfach wie:
Stellen Sie eine Verbindung zur Broker-Datenbank her
Listenelemente in der celeryListe (LRANGE-Befehl für ein Beispiel)
Denken Sie daran, dass dies Aufgaben sind, die darauf warten, von verfügbaren Mitarbeitern ausgewählt zu werden. In Ihrem Cluster werden möglicherweise einige Aufgaben ausgeführt. Diese werden nicht in dieser Liste aufgeführt, da sie bereits ausgewählt wurden.
Ich bin zu dem Schluss gekommen, dass der beste Weg, um die Anzahl der Jobs in einer Warteschlange zu ermitteln, die Verwendung ist, rabbitmqctlwie hier mehrmals vorgeschlagen wurde. Damit jeder ausgewählte Benutzer den Befehl mit ausführen kann, habe sudoich die Anweisungen hier befolgt (ich habe die Bearbeitung des Profilteils übersprungen, da es mir nichts ausmacht, vor dem Befehl sudo einzugeben.)
Ich schnappte mir auch Jamesc's grepund cutSnippet und wickelte es in Subprozessaufrufe ein.
from subprocess importPopen, PIPE
p1 =Popen(["sudo","rabbitmqctl","list_queues","-p","[name of your virtula host"], stdout=PIPE)
p2 =Popen(["grep","-e","^celery\s"], stdin=p1.stdout, stdout=PIPE)
p3 =Popen(["cut","-f2"], stdin=p2.stdout, stdout=PIPE)
p1.stdout.close()
p2.stdout.close()print("number of jobs on queue: %i"% int(p3.communicate()[0]))
from celery.task.control import inspect
def key_in_list(k, l):return bool([Truefor i in l if k in i.values()])def check_task(task_id):
task_value_dict = inspect().active().values()for task_list in task_value_dict:if self.key_in_list(task_id, task_list):returnTruereturnFalse
Wenn Sie den Code der Aufgaben steuern, können Sie das Problem umgehen, indem Sie eine Aufgabe bei der ersten Ausführung einen trivialen Wiederholungsversuch auslösen lassen und dann überprüfen inspect().reserved(). Der Wiederholungsversuch registriert die Aufgabe mit dem Ergebnis-Backend, und Sellerie kann das sehen. Die Aufgabe muss selfoder contextals erster Parameter akzeptieren, damit wir auf die Anzahl der Wiederholungen zugreifen können.
Diese Lösung ist maklerunabhängig, dh. Sie müssen sich keine Gedanken darüber machen, ob Sie RabbitMQ oder Redis zum Speichern der Aufgaben verwenden.
EDIT: Nach dem Testen habe ich festgestellt, dass dies nur eine Teillösung ist. Die Größe von reserviert ist auf die Prefetch-Einstellung für den Worker beschränkt.
Antworten:
BEARBEITEN: Weitere Antworten zum Abrufen einer Liste der Aufgaben in der Warteschlange finden Sie unter Andere Antworten.
Sie sollten hier nachsehen: Sellerie-Leitfaden - Inspektion von Arbeitern
Grundsätzlich ist dies:
Je nachdem was du willst
quelle
i.reserved()
diese Option , um eine Liste der Aufgaben in der Warteschlange abzurufen.inspect(['celery@Flatty'])
. Riesige Geschwindigkeitsverbesserung vorbeiinspect()
.Wenn Sie rabbitMQ verwenden, verwenden Sie dies im Terminal:
Es wird eine Liste der Warteschlangen mit der Anzahl der ausstehenden Aufgaben gedruckt. beispielsweise:
Die Nummer in der rechten Spalte gibt die Anzahl der Aufgaben in der Warteschlange an. oben hat die Sellerie-Warteschlange 166 ausstehende Aufgaben.
quelle
grep -e "^celery\s" | cut -f2
, um zu extrahieren,166
ob Sie diese Nummer später verarbeiten möchten, z. B. für Statistiken.Wenn Sie keine priorisierten Aufgaben verwenden, ist dies ziemlich einfach, wenn Sie Redis verwenden. So erhalten Sie die Anzahl der Aufgaben:
Priorisierte Aufgaben verwenden jedoch einen anderen Schlüssel in Redis , sodass das Gesamtbild etwas komplizierter ist. Das vollständige Bild ist, dass Sie Redis für jede Priorität der Aufgabe abfragen müssen. In Python (und aus dem Flower-Projekt) sieht dies folgendermaßen aus:
Wenn Sie eine tatsächliche Aufgabe erhalten möchten, können Sie Folgendes verwenden:
Von dort aus müssen Sie die zurückgegebene Liste deserialisieren. In meinem Fall konnte ich dies mit etwas erreichen wie:
Seien Sie nur gewarnt, dass die Deserialisierung einen Moment dauern kann und Sie die obigen Befehle anpassen müssen, um mit verschiedenen Prioritäten zu arbeiten.
quelle
DATABASE_NUMBER
wird standardmäßig verwendet0
und dasQUEUE_NAME
istcelery
, alsoredis-cli -n 0 llen celery
wird die Anzahl der Nachrichten in der Warteschlange zurückgegeben.'{{{0}}}{1}{2}'
anstelle von'{0}{1}{2}'
. Davon abgesehen funktioniert das perfekt!Verwenden Sie diese Option, um Aufgaben aus dem Backend abzurufen
quelle
Wenn Sie Sellerie + Django verwenden , können Sie Aufgaben am einfachsten mithilfe von Befehlen direkt von Ihrem Terminal in Ihrer virtuellen Umgebung oder mithilfe eines vollständigen Pfads zum Sellerie überprüfen :
Doc : http://docs.celeryproject.org/en/latest/userguide/workers.html?highlight=revoke#inspecting-workers
Auch wenn Sie Sellerie + RabbitMQ verwenden , können Sie die Liste der Warteschlangen mit dem folgenden Befehl überprüfen :
Weitere Informationen : https://linux.die.net/man/1/rabbitmqctl
quelle
celery -A my_proj inspect reserved
Eine Copy-Paste-Lösung für Redis mit JSON-Serialisierung:
Es funktioniert mit Django. Vergiss nur nicht, dich zu ändern
yourproject.celery
.quelle
body =
Zeile in ändernbody = pickle.loads(base64.b64decode(j['body']))
.Das Sellerie-Inspektionsmodul scheint die Aufgaben nur aus Sicht der Arbeitnehmer zu kennen. Wenn Sie die Nachrichten anzeigen möchten, die sich in der Warteschlange befinden (noch von den Mitarbeitern abgerufen werden müssen ), empfehle ich die Verwendung von Pyrabbit , das mit der rabbitmq http-API verbunden werden kann, um alle Arten von Informationen aus der Warteschlange abzurufen.
Ein Beispiel finden Sie hier: Abrufen der Warteschlangenlänge mit Sellerie (RabbitMQ, Django)
quelle
Ich denke, die einzige Möglichkeit, die wartenden Aufgaben zu erhalten, besteht darin, eine Liste der von Ihnen gestarteten Aufgaben zu führen und die Aufgabe beim Starten von der Liste entfernen zu lassen.
Mit rabbitmqctl und list_queues erhalten Sie einen Überblick darüber, wie viele Aufgaben warten, nicht jedoch die Aufgaben selbst: http://www.rabbitmq.com/man/rabbitmqctl.1.man.html
Wenn die gewünschte Aufgabe enthalten ist, aber noch nicht abgeschlossen ist, können Sie eine Liste Ihrer Aufgaben führen und deren Status überprüfen:
Oder Sie lassen Sellerie die Ergebnisse mit CELERY_RESULT_BACKEND speichern und überprüfen, welche Ihrer Aufgaben dort nicht enthalten sind.
quelle
Dies hat bei mir in meiner Bewerbung funktioniert:
active_jobs
wird eine Liste von Zeichenfolgen sein, die Aufgaben in der Warteschlange entsprechen.Vergessen Sie nicht, CELERY_APP_INSTANCE gegen Ihr eigenes auszutauschen.
Vielen Dank an @ashish, der mich mit seiner Antwort hier in die richtige Richtung gelenkt hat: https://stackoverflow.com/a/19465670/9843399
quelle
jobs
ist immer Null ... irgendeine Idee?Soweit ich weiß, gibt Celery keine API zum Untersuchen von Aufgaben an, die in der Warteschlange warten. Dies ist maklerspezifisch. Wenn Sie Redis als Broker für ein Beispiel verwenden, ist das Untersuchen von Aufgaben, die in der
celery
(Standard-) Warteschlange warten, so einfach wie:celery
Liste (LRANGE-Befehl für ein Beispiel)Denken Sie daran, dass dies Aufgaben sind, die darauf warten, von verfügbaren Mitarbeitern ausgewählt zu werden. In Ihrem Cluster werden möglicherweise einige Aufgaben ausgeführt. Diese werden nicht in dieser Liste aufgeführt, da sie bereits ausgewählt wurden.
quelle
Ich bin zu dem Schluss gekommen, dass der beste Weg, um die Anzahl der Jobs in einer Warteschlange zu ermitteln, die Verwendung ist,
rabbitmqctl
wie hier mehrmals vorgeschlagen wurde. Damit jeder ausgewählte Benutzer den Befehl mit ausführen kann, habesudo
ich die Anweisungen hier befolgt (ich habe die Bearbeitung des Profilteils übersprungen, da es mir nichts ausmacht, vor dem Befehl sudo einzugeben.)Ich schnappte mir auch Jamesc's
grep
undcut
Snippet und wickelte es in Subprozessaufrufe ein.quelle
quelle
Wenn Sie den Code der Aufgaben steuern, können Sie das Problem umgehen, indem Sie eine Aufgabe bei der ersten Ausführung einen trivialen Wiederholungsversuch auslösen lassen und dann überprüfen
inspect().reserved()
. Der Wiederholungsversuch registriert die Aufgabe mit dem Ergebnis-Backend, und Sellerie kann das sehen. Die Aufgabe mussself
odercontext
als erster Parameter akzeptieren, damit wir auf die Anzahl der Wiederholungen zugreifen können.Diese Lösung ist maklerunabhängig, dh. Sie müssen sich keine Gedanken darüber machen, ob Sie RabbitMQ oder Redis zum Speichern der Aufgaben verwenden.
EDIT: Nach dem Testen habe ich festgestellt, dass dies nur eine Teillösung ist. Die Größe von reserviert ist auf die Prefetch-Einstellung für den Worker beschränkt.
quelle
Mit
subprocess.run
:Achten Sie darauf,
my_proj
mit zu ändernyour_proj
quelle