Ich habe eine CPU-intensive Sellerie-Aufgabe. Ich möchte die gesamte Verarbeitungsleistung (Kerne) in vielen EC2-Instanzen nutzen, um diesen Job schneller zu erledigen (eine Sellerie-parallel verteilte Aufgabe mit Mehrfachverarbeitung - glaube ich ) .
Die Begriffe Threading , Multiprocessing , Distributed Computing und Distributed Parallel Processing sind alles Begriffe, die ich besser verstehen möchte.
Beispielaufgabe:
@app.task
for item in list_of_millions_of_ids:
id = item # do some long complicated equation here very CPU heavy!!!!!!!
database.objects(newid=id).save()
Wie würde man unter Verwendung des obigen Codes (mit einem Beispiel, wenn möglich) diese Aufgabe mit Celery verteilen, indem man zulässt, dass diese eine Aufgabe unter Verwendung der gesamten Rechen-CPU-Leistung auf allen verfügbaren Computern in der Cloud aufgeteilt wird?
python
django
multithreading
multiprocessing
celery
Prometheus
quelle
quelle
Antworten:
Ihre Ziele sind:
Sellerie kann beides ziemlich einfach für Sie erledigen. Das erste, was Sie verstehen müssen, ist, dass jeder Sellerie-Arbeiter standardmäßig so konfiguriert ist, dass er so viele Aufgaben ausführt, wie auf einem System CPU-Kerne verfügbar sind:
Dies bedeutet, dass sich jede einzelne Aufgabe nicht um die Verwendung von Multiprocessing / Threading kümmern muss, um mehrere CPUs / Kerne zu verwenden. Stattdessen führt Sellerie gleichzeitig genügend Aufgaben aus, um jede verfügbare CPU zu nutzen.
Wenn dies nicht möglich ist, müssen Sie im nächsten Schritt eine Aufgabe erstellen, die die Verarbeitung einer Teilmenge Ihrer Daten übernimmt
list_of_millions_of_ids
. Hier haben Sie mehrere Möglichkeiten: Eine besteht darin, dass jede Aufgabe eine einzelne ID verarbeitet, sodass Sie N Aufgaben ausführen, wobeiN == len(list_of_millions_of_ids)
. Dies garantiert, dass die Arbeit gleichmäßig auf alle Ihre Aufgaben verteilt wird, da es niemals einen Fall geben wird, in dem ein Mitarbeiter vorzeitig fertig wird und nur wartet. Wenn es Arbeit braucht, kann es eine ID aus der Warteschlange ziehen. Sie können dies (wie von John Doe erwähnt) mit dem Sellerie tungroup
.task.py:
@app.task def process_id(item): id = item #long complicated equation here database.objects(newid=id).save()
Und um die Aufgaben auszuführen:
from celery import group from tasks import process_id jobs = group(process_id.s(item) for item in list_of_millions_of_ids) result = jobs.apply_async()
Eine andere Möglichkeit besteht darin, die Liste in kleinere Teile aufzuteilen und die Teile an Ihre Mitarbeiter zu verteilen. Bei diesem Ansatz besteht die Gefahr, dass einige Zyklen verschwendet werden, da möglicherweise einige Mitarbeiter warten, während andere noch arbeiten. In der Selleriedokumentation wird jedoch darauf hingewiesen, dass dieses Anliegen häufig unbegründet ist:
Möglicherweise stellen Sie fest, dass das Aufteilen der Liste und das Verteilen der Teile auf die einzelnen Aufgaben aufgrund des geringeren Messaging-Overheads eine bessere Leistung erbringt. Auf diese Weise können Sie wahrscheinlich auch die Datenbank etwas entlasten, indem Sie jede ID berechnen, in einer Liste speichern und dann die gesamte Liste zur Datenbank hinzufügen, sobald Sie fertig sind, anstatt jeweils eine ID zu erstellen . Der Chunking-Ansatz würde ungefähr so aussehen
task.py:
@app.task def process_ids(items): for item in items: id = item #long complicated equation here database.objects(newid=id).save() # Still adding one id at a time, but you don't have to.
Und um die Aufgaben zu starten:
from tasks import process_ids jobs = process_ids.chunks(list_of_millions_of_ids, 30) # break the list into 30 chunks. Experiment with what number works best here. jobs.apply_async()
Sie können ein wenig experimentieren, mit welcher Chunking-Größe Sie das beste Ergebnis erzielen. Sie möchten einen Sweet Spot finden, an dem Sie den Messaging-Aufwand reduzieren und gleichzeitig die Größe so klein halten, dass die Mitarbeiter ihren Block nicht viel schneller als ein anderer Mitarbeiter fertigstellen und dann einfach warten, ohne etwas zu tun.
quelle
threading
oder einführenmultiprocessing
. Stattdessen lässt jeder Sellerie-Arbeiter so viele Aufgaben ausführen, wie auf der Maschine Kerne verfügbar sind (dies geschieht standardmäßig bei Sellerie). Das bedeutet, dass in Ihrem gesamten Cluster jeder Kern zur Verarbeitung Ihres Kerns verwendet werden kannlist_of_million_ids
, indem für jede Aufgabe ein einziger Kern verwendet wird. Anstatt dass eine einzelne Aufgabe viele Kerne verwendet, verwenden viele Aufgaben jeweils einen Kern. Ist das sinnvoll?threading
odermultiprocessing
". Angenommen, wir können diese schwere Aufgabe nicht in mehrere aufteilen. Wie würden Sie Threading oder Multiprocessing verwenden, um Sellerie dazu zu bringen, die Aufgabe auf mehrere Instanzen aufzuteilen? dankemultiprocessing
, die Arbeit innerhalb der Aufgabe selbst aufzuteilen, wenn Sie die Aufgabe selbst nicht in Unteraufgaben aufteilen können, da beide Ansätze letztendlich die Ausführung der Aufgabe erfordern Das Gleiche: Aufteilen einer Aufgabe in kleinere Aufgaben, die parallel ausgeführt werden können. Sie ändern wirklich nur den Punkt, an dem Sie die Aufteilung durchführen.multiprocessing
in einer Sellerie-Aufgabe verwenden können. Sellerie selbst verwendetbilliard
(einemultiprocessing
Gabelung), um Ihre Aufgaben in separaten Prozessen auszuführen. Du darfst sie dann einfach nicht inmultiprocessing
ihnen benutzen .In der Welt des Vertriebs gibt es nur eines, an das Sie sich vor allem erinnern sollten:
Ich weiß, dass es offensichtlich klingt, aber bevor Sie Double Check verteilen, verwenden Sie den besten Algorithmus (falls vorhanden ...). Die Optimierung der Verteilung ist jedoch ein Spagat zwischen drei Dingen:
Computer werden so hergestellt, dass je näher Sie Ihrer Verarbeitungseinheit (3) kommen, desto schneller und effizienter (1) und (2) sind. Die Reihenfolge in einem klassischen Cluster lautet: Netzwerkfestplatte, lokale Festplatte, RAM, innerhalb des Gebiets der Verarbeitungseinheit ... Heutzutage werden Prozessoren so hoch entwickelt, dass sie als Ensemble unabhängiger Hardwareverarbeitungseinheiten betrachtet werden können, die üblicherweise als Kerne bezeichnet werden. Diese Kerne verarbeiten Daten (3) durch Gewinde (2). Stellen Sie sich vor, Ihr Kern ist so schnell, dass Sie beim Senden von Daten mit einem Thread 50% der Computerleistung verbrauchen. Wenn der Kern 2 Threads hat, verwenden Sie 100%. Zwei Threads pro Kern werden als Hyper-Threading bezeichnet, und Ihr Betriebssystem sieht 2 CPUs pro Hyper-Thread-Kern.
Das Verwalten von Threads in einem Prozessor wird üblicherweise als Multithreading bezeichnet. Das Verwalten von CPUs vom Betriebssystem aus wird allgemein als Multiverarbeitung bezeichnet. Das Verwalten gleichzeitiger Aufgaben in einem Cluster wird üblicherweise als parallele Programmierung bezeichnet. Das Verwalten abhängiger Aufgaben in einem Cluster wird üblicherweise als verteilte Programmierung bezeichnet.
Wo liegt also Ihr Engpass?
Was ist mit Sellerie?
Sellerie ist ein Messaging-Framework für die verteilte Programmierung, das ein Broker-Modul für die Kommunikation (2) und ein Backend-Modul für die Persistenz (1) verwendet. Dies bedeutet, dass Sie die Konfiguration ändern können, um die meisten Engpässe (wenn möglich) zu vermeiden Ihr Netzwerk und nur in Ihrem Netzwerk. Profilieren Sie zuerst Ihren Code, um die beste Leistung auf einem einzelnen Computer zu erzielen. Verwenden Sie dann Sellerie in Ihrem Cluster mit der Standardkonfiguration und legen Sie Folgendes fest
CELERY_RESULT_PERSISTENT=True
:from celery import Celery app = Celery('tasks', broker='amqp://guest@localhost//', backend='redis://localhost') @app.task def process_id(all_the_data_parameters_needed_to_process_in_this_computer): #code that does stuff return result
Während der Ausführung öffnen Sie Ihre bevorzugten Überwachungstools. Ich verwende die Standardeinstellung für rabbitMQ und Flower für Sellerie und top für cpus. Ihre Ergebnisse werden in Ihrem Backend gespeichert. Ein Beispiel für einen Netzwerkengpass ist die wachsende Warteschlange für Aufgaben, die die Ausführung verzögert. Sie können die Modul- oder Selleriekonfiguration ändern, wenn sich Ihr Engpass nicht an einer anderen Stelle befindet.
quelle
Warum nicht
group
Sellerie Aufgabe dafür verwenden?http://celery.readthedocs.org/en/latest/userguide/canvas.html#groups
Grundsätzlich sollten Sie sich
ids
in Blöcke (oder Bereiche) aufteilen und diese einer Reihe von Aufgaben in gebengroup
.Für etwas anspruchsvollere, wie das Zusammenfassen von Ergebnissen bestimmter Sellerie-Aufgaben, habe ich die
chord
Aufgabe erfolgreich für ähnliche Zwecke verwendet:http://celery.readthedocs.org/en/latest/userguide/canvas.html#chords
Erhöhen Sie sich
settings.CELERYD_CONCURRENCY
auf eine Zahl, die angemessen ist und die Sie sich leisten können, dann führen diese Sellerie-Arbeiter Ihre Aufgaben in einer Gruppe oder einem Akkord weiter aus, bis sie erledigt sind.Hinweis: Aufgrund eines Fehlers in
kombu
der Vergangenheit gab es Probleme bei der Wiederverwendung von Mitarbeitern für eine große Anzahl von Aufgaben. Ich weiß nicht, ob dies jetzt behoben ist. Vielleicht ist es das, aber wenn nicht, reduzieren Sie CELERYD_MAX_TASKS_PER_CHILD.Beispiel basierend auf vereinfachtem und modifiziertem Code, den ich ausführe:
@app.task def do_matches(): match_data = ... result = chord(single_batch_processor.s(m) for m in match_data)(summarize.s())
summarize
erhält Ergebnisse allersingle_batch_processor
Aufgaben. Jede Aufgabe läuft auf jedem Sellerie-Arbeiter,kombu
koordiniert das.Jetzt verstehe ich es:
single_batch_processor
und es müssensummarize
AUCH Sellerie-Aufgaben sein, keine regulären Funktionen - sonst wird es natürlich nicht parallelisiert (ich bin mir nicht einmal sicher, ob der Akkordkonstruktor es akzeptiert, wenn es keine Sellerie-Aufgabe ist).quelle
chord
(Wenn CELERYD_CONCURRENCY auf Dutzende von Workern == logische CPU- / Hardware-Threads eingestellt ist) verarbeite ich eine große Anzahl von Protokolldateistapeln parallel über mehrere Kerne.do_matches
wird durch Warten auf den Akkord blockiert. Dies kann möglicherweise zu einem teilweisen oder vollständigen Stillstand führen, da viele / alle Mitarbeiter möglicherweise auf Unteraufgaben warten, von denen keine ausgeführt wird (da die Mitarbeiter auf Unteraufgaben warten, anstatt hart zu arbeiten).Das Hinzufügen weiterer Sellerie-Arbeiter wird die Ausführung der Aufgabe sicherlich beschleunigen. Möglicherweise haben Sie jedoch einen anderen Engpass: die Datenbank. Stellen Sie sicher, dass die gleichzeitigen Einfügungen / Aktualisierungen verarbeitet werden können.
Zu Ihrer Frage: Sie fügen Sellerie-Arbeiter hinzu, indem Sie Ihren EC2-Instanzen einen anderen Prozess als zuweisen
celeryd
. Je nachdem, wie viele Mitarbeiter Sie benötigen, möchten Sie möglicherweise noch mehr Instanzen hinzufügen.quelle