Löschen aller anstehenden Aufgaben in Sellerie / Kaninchen

199

Wie kann ich alle ausstehenden Aufgaben löschen, ohne die task_idfür jede Aufgabe zu kennen?

Nabizan
quelle

Antworten:

296

Aus den Dokumenten :

$ celery -A proj purge

oder

from proj.celery import app
app.control.purge()

(BEARBEITEN: Mit der aktuellen Methode aktualisiert.)

Philip Southam
quelle
56
Oder von Django für Sellerie 3.0+: manage.py celery purge( celeryctlist jetzt veraltet und wird in 3.1 weg sein).
Henrik Heimbuerger
3
Ich fand diese Antwort auf der Suche nach einer Möglichkeit, dies mit einem Redis-Backend zu tun. Die beste Methode, die ich gefunden habe, war redis-cli KEYS "celery*" | xargs redis-cli DELdie, die für mich funktioniert hat. Dadurch werden alle Aufgaben gelöscht, die im von Ihnen verwendeten Redis-Backend gespeichert sind.
Melignus
1
Wie kann ich das in Sellerie 3.0 machen?
luistm
2
Für mich war es einfach celery purge(innerhalb der relevanten virtuellen Umgebung). Hoppla - es gibt eine Antwort mit der gleichen unten ..... stackoverflow.com/a/20404976/1213425
Erve1879
Für Sellerie 4.0+ in Kombination mit Django ist es wieder dieser Befehl, bei dem das Argument -Adie Django-App ist, in der celery.pysich die befindet.
Gitaarik
120

Für Sellerie 3.0+:

$ celery purge

So löschen Sie eine bestimmte Warteschlange:

$ celery -Q queue_name purge
ToonAlfrink
quelle
9
Wenn Sie Verbindungsfehler erhalten, stellen Sie sicher, dass Sie die App angeben, z celery -A proj purge.
Kamil Sindi
25

Für Sellerie 2.x und 3.x:

Wenn Sie beispielsweise einen Worker mit dem Parameter -Q verwenden, um Warteschlangen zu definieren

celery worker -Q queue1,queue2,queue3

dann celery purgefunktioniert es nicht, da Sie die Warteschlangenparameter nicht an ihn übergeben können. Es wird nur die Standardwarteschlange gelöscht. Die Lösung besteht darin, Ihre Mitarbeiter mit folgenden --purgeParametern zu starten :

celery worker -Q queue1,queue2,queue3 --purge

Dadurch wird jedoch der Worker ausgeführt.

Eine andere Möglichkeit ist die Verwendung des Unterbefehls amqp von Sellerie

celery amqp queue.delete queue1
celery amqp queue.delete queue2
celery amqp queue.delete queue3
Smido
quelle
Ja, dies gilt für ältere (2.x und möglicherweise 3.x) Versionen von Sellerie. Ich kann die Antwort nicht bearbeiten
smido
9

Ich habe festgestellt, dass celery purgedies für meine komplexere Selleriekonfiguration nicht funktioniert. Ich verwende mehrere benannte Warteschlangen für verschiedene Zwecke:

$ sudo rabbitmqctl list_queues -p celery name messages consumers
Listing queues ...  # Output sorted, whitespaced for readability
celery                                          0   2
[email protected]                      0   1
[email protected]                      0   1
apns                                            0   1
[email protected]                        0   1
analytics                                       1   1
[email protected]                   0   1
bcast.361093f1-de68-46c5-adff-d49ea8f164c0      0   1
bcast.a53632b0-c8b8-46d9-bd59-364afe9998c1      0   1
celeryev.c27b070d-b07e-4e37-9dca-dbb45d03fd54   0   1
celeryev.c66a9bed-84bd-40b0-8fe7-4e4d0c002866   0   1
celeryev.b490f71a-be1a-4cd8-ae17-06a713cc2a99   0   1
celeryev.9d023165-ab4a-42cb-86f8-90294b80bd1e   0   1

Die erste Spalte ist der Warteschlangenname, die zweite die Anzahl der in der Warteschlange wartenden Nachrichten und die dritte die Anzahl der Listener für diese Warteschlange. Die Warteschlangen sind:

  • Sellerie - Warteschlange für standardmäßige, idempotente Sellerie-Aufgaben
  • apns - Warteschlange für Apple Push Notification Service-Aufgaben, nicht ganz so idempotent
  • Analytics - Warteschlange für lang laufende nächtliche Analysen
  • * .pidbox - Warteschlange für Worker-Befehle wie Herunterfahren und Zurücksetzen, einer pro Worker (2 Sellerie-Worker, ein Apns-Worker, ein Analytics-Worker)
  • bcast. * - Broadcast-Warteschlangen zum Senden von Nachrichten an alle Mitarbeiter, die eine Warteschlange abhören (und nicht nur die ersten, die sie abrufen)
  • celeryev. * - Sellerie-Ereigniswarteschlangen für die Berichterstellung von Aufgabenanalysen

Die Analyseaufgabe ist eine Brute-Force-Aufgabe, die bei kleinen Datenmengen hervorragend funktioniert hat. Die Verarbeitung dauert jetzt jedoch mehr als 24 Stunden. Gelegentlich geht etwas schief und es bleibt beim Warten auf die Datenbank hängen. Es muss neu geschrieben werden, aber bis dahin, wenn es stecken bleibt, töte ich die Aufgabe, entleere die Warteschlange und versuche es erneut. Ich erkenne "Stuckness", indem ich mir die Anzahl der Nachrichten für die Analytics-Warteschlange ansehe, die 0 (fertige Analytics) oder 1 (Warten auf das Ende der Analytics der letzten Nacht) sein sollte. 2 oder höher ist schlecht und ich bekomme eine E-Mail.

celery purge bietet an, Aufgaben aus einer der Broadcast-Warteschlangen zu löschen, und ich sehe keine Option zum Auswählen einer anderen benannten Warteschlange.

Hier ist mein Prozess:

$ sudo /etc/init.d/celeryd stop  # Wait for analytics task to be last one, Ctrl-C
$ ps -ef | grep analytics  # Get the PID of the worker, not the root PID reported by celery
$ sudo kill <PID>
$ sudo /etc/init.d/celeryd stop  # Confim dead
$ python manage.py celery amqp queue.purge analytics
$ sudo rabbitmqctl list_queues -p celery name messages consumers  # Confirm messages is 0
$ sudo /etc/init.d/celeryd start
jwhitlock
quelle
Keine Antwort, oder? Sehr informativ!
Amn
4
celeryctl purgefunktionierte nicht mit benannten Warteschlangen. python manage.py celery amqp queue.purge <queue_name>tat. Ich denke, der Kontext ist nützlich für diejenigen mit komplexen Setups, damit sie herausfinden können, was sie tun müssen, wenn celeryctl purgedies für sie fehlschlägt.
Jwhitlock
Ich kann manage.pyin meinem Sellerie 3.1.17 nicht finden , wurde die Datei entfernt oder nur neu verprügelt? Ich habe jedoch gefunden, wie die entsprechende Schnittstelle ( queue.purge) aussieht */bin/amqp.py. Aber nachdem ich versucht habe, den Inhalt der Datei mit der Dokumentation zu korrelieren, muss ich bedauerlicherweise zugeben, dass Sellerie absolut undokumentiert und auch eine sehr verworrene Arbeit ist, zumindest gemessen am Quellcode.
amn
manage.pyist das Django-Verwaltungsskript und manage.py celeryführt Sellerie aus, nachdem die Konfiguration aus den Django-Einstellungen geladen wurde. Ich habe Sellerie außerhalb von Django nicht verwendet, aber der enthaltene celeryBefehl könnte genau das sein, wonach Sie suchen: celery.readthedocs.org/en/latest/userguide/monitoring.html
jwhitlock
5

In Sellerie 3+

http://docs.celeryproject.org/en/3.1/faq.html#how-do-i-purge-all-waiting-tasks

CLI

Benannte Warteschlange löschen:

 celery -A proj amqp queue.purge <queue name>

Konfigurierte Warteschlange löschen

celery -A proj purge

Ich habe Nachrichten gelöscht, aber es sind noch Nachrichten in der Warteschlange? Antwort: Aufgaben werden bestätigt (aus der Warteschlange entfernt), sobald sie tatsächlich ausgeführt werden. Nachdem der Mitarbeiter eine Aufgabe erhalten hat, dauert es einige Zeit, bis sie tatsächlich ausgeführt wird, insbesondere wenn bereits viele Aufgaben auf die Ausführung warten. Nicht bestätigte Nachrichten werden vom Worker beibehalten, bis die Verbindung zum Broker (AMQP-Server) geschlossen wird. Wenn diese Verbindung geschlossen wird (z. B. weil der Worker gestoppt wurde), werden die Aufgaben vom Broker an den nächsten verfügbaren Worker (oder an denselben Worker nach dem Neustart) erneut gesendet, um die Warteschlange für wartende Aufgaben ordnungsgemäß zu leeren müssen alle Arbeiter stoppen und dann die Aufgaben mit celery.control.purge () löschen.

Um die gesamte Warteschlange zu löschen, müssen die Mitarbeiter gestoppt werden.

oneklc
quelle
5

Wenn Sie alle ausstehenden Aufgaben sowie die aktiven und reservierten Aufgaben entfernen möchten, um Sellerie vollständig zu stoppen, hat dies für mich funktioniert:

from proj.celery import app
from celery.task.control import inspect, revoke

# remove pending tasks
app.control.purge()

# remove active tasks
i = inspect()
jobs = i.active()
for hostname in jobs:
    tasks = jobs[hostname]
    for task in tasks:
        revoke(task['id'], terminate=True)

# remove reserved tasks
jobs = i.reserved()
for hostname in jobs:
    tasks = jobs[hostname]
    for task in tasks:
        revoke(task['id'], terminate=True)
Kahlo
quelle
2

1. Um die Warteschlange ordnungsgemäß von wartenden Aufgaben zu leeren, müssen Sie alle Mitarbeiter stoppen ( http://celery.readthedocs.io/en/latest/faq.html#i-ve-purged-messages-but-there-are- Standbildnachrichten in der Warteschlange ):

$ sudo rabbitmqctl stop

oder (falls RabbitMQ / Message Broker vom Supervisor verwaltet wird):

$ sudo supervisorctl stop all

2. ... und löschen Sie dann die Aufgaben aus einer bestimmten Warteschlange:

$ cd <source_dir>
$ celery amqp queue.purge <queue name>

3. Starten Sie RabbitMQ:

$ sudo rabbitmqctl start

oder (falls RabbitMQ vom Supervisor verwaltet wird):

$ sudo supervisorctl start all
Ukr
quelle
2

Sellerie 4+ Befehl zum Löschen von Sellerie zum Löschen aller konfigurierten Aufgabenwarteschlangen

celery -A *APPNAME* purge

programmatisch:

from proj.celery import app
app.control.purge()

Alle anstehenden Aufgaben werden gelöscht. Referenz: celerydoc

Roshan Bagdiya
quelle