Sellerie parallel verteilte Aufgabe mit Mehrfachverarbeitung

79

Ich habe eine CPU-intensive Sellerie-Aufgabe. Ich möchte die gesamte Verarbeitungsleistung (Kerne) in vielen EC2-Instanzen nutzen, um diesen Job schneller zu erledigen (eine Sellerie-parallel verteilte Aufgabe mit Mehrfachverarbeitung - glaube ich ) .

Die Begriffe Threading , Multiprocessing , Distributed Computing und Distributed Parallel Processing sind alles Begriffe, die ich besser verstehen möchte.

Beispielaufgabe:

  @app.task
  for item in list_of_millions_of_ids:
      id = item # do some long complicated equation here very CPU heavy!!!!!!! 
      database.objects(newid=id).save()

Wie würde man unter Verwendung des obigen Codes (mit einem Beispiel, wenn möglich) diese Aufgabe mit Celery verteilen, indem man zulässt, dass diese eine Aufgabe unter Verwendung der gesamten Rechen-CPU-Leistung auf allen verfügbaren Computern in der Cloud aufgeteilt wird?

Prometheus
quelle
Ich dachte, MapReduce wurde für Ihre Art von Anwendung entwickelt: console.aws.amazon.com/elasticmapreduce/vnext/… :
AStopher

Antworten:

119

Ihre Ziele sind:

  1. Verteilen Sie Ihre Arbeit auf viele Maschinen (verteiltes Rechnen / verteilte Parallelverarbeitung)
  2. Verteilen Sie die Arbeit auf einem bestimmten Computer auf alle CPUs (Multiprocessing / Threading).

Sellerie kann beides ziemlich einfach für Sie erledigen. Das erste, was Sie verstehen müssen, ist, dass jeder Sellerie-Arbeiter standardmäßig so konfiguriert ist, dass er so viele Aufgaben ausführt, wie auf einem System CPU-Kerne verfügbar sind:

Parallelität ist die Anzahl der Prefork-Worker-Prozesse, die zur gleichzeitigen Verarbeitung Ihrer Aufgaben verwendet werden. Wenn alle diese Aufgaben ausgeführt werden, müssen neue Aufgaben warten, bis eine der Aufgaben abgeschlossen ist, bevor sie verarbeitet werden können.

Die Standard-Parallelitätsnummer ist die Anzahl der CPUs auf diesem Computer (einschließlich der Kerne) . Sie können eine benutzerdefinierte Nummer mit der Option -c angeben. Es gibt keinen empfohlenen Wert, da die optimale Anzahl von einer Reihe von Faktoren abhängt. Wenn Ihre Aufgaben jedoch hauptsächlich an E / A gebunden sind, können Sie versuchen, sie zu erhöhen. Experimente haben gezeigt, dass das Hinzufügen von mehr als der doppelten Anzahl von CPUs selten ist effektiv und wahrscheinlich stattdessen die Leistung verschlechtern.

Dies bedeutet, dass sich jede einzelne Aufgabe nicht um die Verwendung von Multiprocessing / Threading kümmern muss, um mehrere CPUs / Kerne zu verwenden. Stattdessen führt Sellerie gleichzeitig genügend Aufgaben aus, um jede verfügbare CPU zu nutzen.

Wenn dies nicht möglich ist, müssen Sie im nächsten Schritt eine Aufgabe erstellen, die die Verarbeitung einer Teilmenge Ihrer Daten übernimmt list_of_millions_of_ids. Hier haben Sie mehrere Möglichkeiten: Eine besteht darin, dass jede Aufgabe eine einzelne ID verarbeitet, sodass Sie N Aufgaben ausführen, wobei N == len(list_of_millions_of_ids). Dies garantiert, dass die Arbeit gleichmäßig auf alle Ihre Aufgaben verteilt wird, da es niemals einen Fall geben wird, in dem ein Mitarbeiter vorzeitig fertig wird und nur wartet. Wenn es Arbeit braucht, kann es eine ID aus der Warteschlange ziehen. Sie können dies (wie von John Doe erwähnt) mit dem Sellerie tun group.

task.py:

@app.task
def process_id(item):
    id = item #long complicated equation here
    database.objects(newid=id).save()

Und um die Aufgaben auszuführen:

from celery import group
from tasks import process_id

jobs = group(process_id.s(item) for item in list_of_millions_of_ids)
result = jobs.apply_async()

Eine andere Möglichkeit besteht darin, die Liste in kleinere Teile aufzuteilen und die Teile an Ihre Mitarbeiter zu verteilen. Bei diesem Ansatz besteht die Gefahr, dass einige Zyklen verschwendet werden, da möglicherweise einige Mitarbeiter warten, während andere noch arbeiten. In der Selleriedokumentation wird jedoch darauf hingewiesen, dass dieses Anliegen häufig unbegründet ist:

Einige befürchten möglicherweise, dass das Aufteilen Ihrer Aufgaben zu einer Verschlechterung der Parallelität führt. Dies gilt jedoch selten für einen ausgelasteten Cluster. In der Praxis kann dies die Leistung erheblich steigern, da Sie den Aufwand für Messaging vermeiden.

Möglicherweise stellen Sie fest, dass das Aufteilen der Liste und das Verteilen der Teile auf die einzelnen Aufgaben aufgrund des geringeren Messaging-Overheads eine bessere Leistung erbringt. Auf diese Weise können Sie wahrscheinlich auch die Datenbank etwas entlasten, indem Sie jede ID berechnen, in einer Liste speichern und dann die gesamte Liste zur Datenbank hinzufügen, sobald Sie fertig sind, anstatt jeweils eine ID zu erstellen . Der Chunking-Ansatz würde ungefähr so ​​aussehen

task.py:

@app.task
def process_ids(items):
    for item in items:
        id = item #long complicated equation here
        database.objects(newid=id).save() # Still adding one id at a time, but you don't have to.

Und um die Aufgaben zu starten:

from tasks import process_ids

jobs = process_ids.chunks(list_of_millions_of_ids, 30) # break the list into 30 chunks. Experiment with what number works best here.
jobs.apply_async()

Sie können ein wenig experimentieren, mit welcher Chunking-Größe Sie das beste Ergebnis erzielen. Sie möchten einen Sweet Spot finden, an dem Sie den Messaging-Aufwand reduzieren und gleichzeitig die Größe so klein halten, dass die Mitarbeiter ihren Block nicht viel schneller als ein anderer Mitarbeiter fertigstellen und dann einfach warten, ohne etwas zu tun.

dano
quelle
Der Teil, in dem ich eine "komplizierte CPU-schwere Aufgabe (3D-Rendering vielleicht)" erledige, wird automatisch parallel verarbeitet, dh 1 Aufgabe verbraucht so viel Rechenleistung, wie in allen Instanzen verfügbar ist - und das alles aus -die Kiste? Ja wirklich? Beeindruckend. PS gute Antwort, danke, dass du mir das besser erklärt hast.
Prometheus
3
@ Spike Nicht ganz. Die Aufgaben, wie sie derzeit geschrieben werden, können immer nur einen Kern verwenden. Damit eine einzelne Aufgabe mehr als einen Kern verwendet, müssen wir threadingoder einführen multiprocessing. Stattdessen lässt jeder Sellerie-Arbeiter so viele Aufgaben ausführen, wie auf der Maschine Kerne verfügbar sind (dies geschieht standardmäßig bei Sellerie). Das bedeutet, dass in Ihrem gesamten Cluster jeder Kern zur Verarbeitung Ihres Kerns verwendet werden kann list_of_million_ids, indem für jede Aufgabe ein einziger Kern verwendet wird. Anstatt dass eine einzelne Aufgabe viele Kerne verwendet, verwenden viele Aufgaben jeweils einen Kern. Ist das sinnvoll?
Dano
1
"Damit eine einzelne Aufgabe mehr als einen Kern verwendet, müssen wir einführen threadingoder multiprocessing". Angenommen, wir können diese schwere Aufgabe nicht in mehrere aufteilen. Wie würden Sie Threading oder Multiprocessing verwenden, um Sellerie dazu zu bringen, die Aufgabe auf mehrere Instanzen aufzuteilen? danke
Tristan
@Tristan Es hängt davon ab, was die Aufgabe tatsächlich tut. In den meisten Fällen würde ich jedoch sagen, dass es wahrscheinlich schwierig ist multiprocessing, die Arbeit innerhalb der Aufgabe selbst aufzuteilen, wenn Sie die Aufgabe selbst nicht in Unteraufgaben aufteilen können, da beide Ansätze letztendlich die Ausführung der Aufgabe erfordern Das Gleiche: Aufteilen einer Aufgabe in kleinere Aufgaben, die parallel ausgeführt werden können. Sie ändern wirklich nur den Punkt, an dem Sie die Aufteilung durchführen.
Dano
1
@PirateApp Dieses Problem besagt, dass Sie es nicht multiprocessing in einer Sellerie-Aufgabe verwenden können. Sellerie selbst verwendet billiard(eine multiprocessingGabelung), um Ihre Aufgaben in separaten Prozessen auszuführen. Du darfst sie dann einfach nicht in multiprocessingihnen benutzen .
Dano
12

In der Welt des Vertriebs gibt es nur eines, an das Sie sich vor allem erinnern sollten:

Vorzeitige Optimierung ist die Wurzel allen Übels. Von D. Knuth

Ich weiß, dass es offensichtlich klingt, aber bevor Sie Double Check verteilen, verwenden Sie den besten Algorithmus (falls vorhanden ...). Die Optimierung der Verteilung ist jedoch ein Spagat zwischen drei Dingen:

  1. Schreiben / Lesen von Daten von einem persistenten Medium,
  2. Verschieben von Daten von Medium A nach Medium B,
  3. Daten verarbeiten,

Computer werden so hergestellt, dass je näher Sie Ihrer Verarbeitungseinheit (3) kommen, desto schneller und effizienter (1) und (2) sind. Die Reihenfolge in einem klassischen Cluster lautet: Netzwerkfestplatte, lokale Festplatte, RAM, innerhalb des Gebiets der Verarbeitungseinheit ... Heutzutage werden Prozessoren so hoch entwickelt, dass sie als Ensemble unabhängiger Hardwareverarbeitungseinheiten betrachtet werden können, die üblicherweise als Kerne bezeichnet werden. Diese Kerne verarbeiten Daten (3) durch Gewinde (2). Stellen Sie sich vor, Ihr Kern ist so schnell, dass Sie beim Senden von Daten mit einem Thread 50% der Computerleistung verbrauchen. Wenn der Kern 2 Threads hat, verwenden Sie 100%. Zwei Threads pro Kern werden als Hyper-Threading bezeichnet, und Ihr Betriebssystem sieht 2 CPUs pro Hyper-Thread-Kern.

Das Verwalten von Threads in einem Prozessor wird üblicherweise als Multithreading bezeichnet. Das Verwalten von CPUs vom Betriebssystem aus wird allgemein als Multiverarbeitung bezeichnet. Das Verwalten gleichzeitiger Aufgaben in einem Cluster wird üblicherweise als parallele Programmierung bezeichnet. Das Verwalten abhängiger Aufgaben in einem Cluster wird üblicherweise als verteilte Programmierung bezeichnet.

Wo liegt also Ihr Engpass?

  • In (1): Versuchen Sie, auf der oberen Ebene zu bleiben und zu streamen (diejenige, die näher an Ihrer Verarbeitungseinheit liegt, z. B. wenn die Netzwerkfestplatte langsam ist, speichern Sie sie zuerst auf der lokalen Festplatte)
  • In (2): Dies ist die häufigste, versuchen Sie, Kommunikationspakete zu vermeiden, die für die Verteilung nicht benötigt werden, oder komprimieren Sie "on the fly" -Pakete (z. B. wenn die Festplatte langsam ist, speichern Sie nur eine "Batch-berechnete" Nachricht und behalten Sie die Zwischenergebnisse in RAM).
  • In (3): Du bist fertig! Sie nutzen die gesamte Rechenleistung, die Ihnen zur Verfügung steht.

Was ist mit Sellerie?

Sellerie ist ein Messaging-Framework für die verteilte Programmierung, das ein Broker-Modul für die Kommunikation (2) und ein Backend-Modul für die Persistenz (1) verwendet. Dies bedeutet, dass Sie die Konfiguration ändern können, um die meisten Engpässe (wenn möglich) zu vermeiden Ihr Netzwerk und nur in Ihrem Netzwerk. Profilieren Sie zuerst Ihren Code, um die beste Leistung auf einem einzelnen Computer zu erzielen. Verwenden Sie dann Sellerie in Ihrem Cluster mit der Standardkonfiguration und legen Sie Folgendes fest CELERY_RESULT_PERSISTENT=True:

from celery import Celery

app = Celery('tasks', 
             broker='amqp://guest@localhost//',
             backend='redis://localhost')

@app.task
def process_id(all_the_data_parameters_needed_to_process_in_this_computer):
    #code that does stuff
    return result

Während der Ausführung öffnen Sie Ihre bevorzugten Überwachungstools. Ich verwende die Standardeinstellung für rabbitMQ und Flower für Sellerie und top für cpus. Ihre Ergebnisse werden in Ihrem Backend gespeichert. Ein Beispiel für einen Netzwerkengpass ist die wachsende Warteschlange für Aufgaben, die die Ausführung verzögert. Sie können die Modul- oder Selleriekonfiguration ändern, wenn sich Ihr Engpass nicht an einer anderen Stelle befindet.

tk.
quelle
9

Warum nicht groupSellerie Aufgabe dafür verwenden?

http://celery.readthedocs.org/en/latest/userguide/canvas.html#groups

Grundsätzlich sollten Sie sich idsin Blöcke (oder Bereiche) aufteilen und diese einer Reihe von Aufgaben in geben group.

Für etwas anspruchsvollere, wie das Zusammenfassen von Ergebnissen bestimmter Sellerie-Aufgaben, habe ich die chordAufgabe erfolgreich für ähnliche Zwecke verwendet:

http://celery.readthedocs.org/en/latest/userguide/canvas.html#chords

Erhöhen Sie sich settings.CELERYD_CONCURRENCYauf eine Zahl, die angemessen ist und die Sie sich leisten können, dann führen diese Sellerie-Arbeiter Ihre Aufgaben in einer Gruppe oder einem Akkord weiter aus, bis sie erledigt sind.

Hinweis: Aufgrund eines Fehlers in kombuder Vergangenheit gab es Probleme bei der Wiederverwendung von Mitarbeitern für eine große Anzahl von Aufgaben. Ich weiß nicht, ob dies jetzt behoben ist. Vielleicht ist es das, aber wenn nicht, reduzieren Sie CELERYD_MAX_TASKS_PER_CHILD.

Beispiel basierend auf vereinfachtem und modifiziertem Code, den ich ausführe:

@app.task
def do_matches():
    match_data = ...
    result = chord(single_batch_processor.s(m) for m in match_data)(summarize.s())

summarizeerhält Ergebnisse aller single_batch_processorAufgaben. Jede Aufgabe läuft auf jedem Sellerie-Arbeiter, kombukoordiniert das.

Jetzt verstehe ich es: single_batch_processorund es müssen summarizeAUCH Sellerie-Aufgaben sein, keine regulären Funktionen - sonst wird es natürlich nicht parallelisiert (ich bin mir nicht einmal sicher, ob der Akkordkonstruktor es akzeptiert, wenn es keine Sellerie-Aufgabe ist).

LetMeSOThat4U
quelle
Nach meinem Verständnis würde dies die Aufgabe aufteilen, verwendet jedoch keine parallel verteilte Sellerie-Aufgabe mit Mehrfachverarbeitung. dh nur die gesamte freie CPU-Leistung auf allen Cloud-Computern nutzen.
Prometheus
Ich bin mir nicht sicher, warum dies passieren würde - Sellerie funktioniert so, als hätten Sie eine Menge Arbeiter, unabhängig davon, wo sie sich befinden, sie könnten sich sogar auf einer anderen Maschine befinden. Natürlich müssen Sie mehr als einen Arbeiter haben. chord(Wenn CELERYD_CONCURRENCY auf Dutzende von Workern == logische CPU- / Hardware-Threads eingestellt ist) verarbeite ich eine große Anzahl von Protokolldateistapeln parallel über mehrere Kerne.
LetMeSOThat4U
Dies ist ein wirklich schlechtes Beispiel für Code. Die Aufgabe do_matcheswird durch Warten auf den Akkord blockiert. Dies kann möglicherweise zu einem teilweisen oder vollständigen Stillstand führen, da viele / alle Mitarbeiter möglicherweise auf Unteraufgaben warten, von denen keine ausgeführt wird (da die Mitarbeiter auf Unteraufgaben warten, anstatt hart zu arbeiten).
Prisacari Dmitrii
@PrisacariDmitrii Also, was wäre dann die richtige Lösung?
LetMeSOThat4U
4

Das Hinzufügen weiterer Sellerie-Arbeiter wird die Ausführung der Aufgabe sicherlich beschleunigen. Möglicherweise haben Sie jedoch einen anderen Engpass: die Datenbank. Stellen Sie sicher, dass die gleichzeitigen Einfügungen / Aktualisierungen verarbeitet werden können.

Zu Ihrer Frage: Sie fügen Sellerie-Arbeiter hinzu, indem Sie Ihren EC2-Instanzen einen anderen Prozess als zuweisen celeryd. Je nachdem, wie viele Mitarbeiter Sie benötigen, möchten Sie möglicherweise noch mehr Instanzen hinzufügen.

Torsten Engelbrecht
quelle
> Wenn Sie mehr Sellerie-Arbeiter hinzufügen, wird die Ausführung der Aufgabe sicherlich beschleunigt. --- Macht es? Ihr Sprichwort Sellerie wird also diese eine Aufgabe auf alle meine Instanzen verteilen, ohne dass ich sie aufschlitzen muss?
Prometheus
Moment mal. Ich habe gerade Ihren Code noch einmal gelesen und da es nur eine Aufgabe ist, wird dies nicht helfen. Sie können eine Aufgabe pro ID (oder mehrere IDs) auslösen. Oder Sie folgen dem Rat von John Doe in der anderen Antwort. Dann können Sie von der Anzahl der Sellerie-Arbeiter profitieren. Und ja, in diesem Fall müssen Sie nicht viel tun. Stellen Sie einfach sicher, dass die Mitarbeiter dieselben Warteschlangen belegen.
Torsten Engelbrecht