Problem
Gibt es in Airflow eine Möglichkeit, einen Workflow so zu erstellen, dass die Anzahl der Aufgaben B * bis zum Abschluss von Aufgabe A unbekannt ist? Ich habe Subdags betrachtet, aber es sieht so aus, als ob es nur mit einer statischen Reihe von Aufgaben funktionieren kann, die bei der Dag-Erstellung festgelegt werden müssen.
Würden Dag-Trigger funktionieren? Und wenn ja, geben Sie bitte ein Beispiel.
Ich habe ein Problem, bei dem es unmöglich ist, die Anzahl der Aufgaben B zu ermitteln, die zur Berechnung von Aufgabe C erforderlich sind, bis Aufgabe A abgeschlossen ist. Die Berechnung jeder Aufgabe B. * dauert mehrere Stunden und kann nicht kombiniert werden.
|---> Task B.1 --|
|---> Task B.2 --|
Task A ------|---> Task B.3 --|-----> Task C
| .... |
|---> Task B.N --|
Idee # 1
Diese Lösung gefällt mir nicht, da ich einen blockierenden ExternalTaskSensor erstellen muss und alle Aufgaben B * zwischen 2 und 24 Stunden dauern. Daher halte ich dies nicht für eine praktikable Lösung. Sicher gibt es einen einfacheren Weg? Oder war Airflow nicht dafür ausgelegt?
Dag 1
Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C
Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator)
|-- Task B.1 --|
|-- Task B.2 --|
Task Dummy A --|-- Task B.3 --|-----> Task Dummy B
| .... |
|-- Task B.N --|
Bearbeiten 1:
Bis jetzt hat diese Frage noch keine gute Antwort . Ich wurde von mehreren Personen kontaktiert, die nach einer Lösung suchten.
Antworten:
So habe ich es mit einer ähnlichen Anfrage ohne Subtags gemacht:
Erstellen Sie zunächst eine Methode, die die gewünschten Werte zurückgibt
Als nächstes erstellen Sie eine Methode, mit der die Jobs dynamisch generiert werden:
Und dann kombinieren Sie sie:
quelle
for i in values_function()
würde ich so etwas erwartenfor i in push_func_output
. Das Problem ist, dass ich keinen Weg finde, diese Ausgabe dynamisch zu erhalten. Die Ausgabe des PythonOperator befindet sich nach der Ausführung in Xcom, aber ich weiß nicht, ob ich sie aus der DAG-Definition referenzieren kann.group
Funktion geben?Ich habe einen Weg gefunden, Workflows basierend auf dem Ergebnis früherer Aufgaben zu erstellen.
Grundsätzlich möchten Sie zwei Subtags mit den folgenden Elementen haben:
def return_list()
).parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1]
) verwenden. Hier könnten wahrscheinlich weitere Filter hinzugefügt werden.dag_id='%s.%s' % (parent_dag_name, 'test1')
Jetzt habe ich dies in meiner lokalen Luftstrominstallation getestet und es funktioniert einwandfrei. Ich weiß nicht, ob das xcom-Pull-Teil Probleme haben wird, wenn mehr als eine Instanz des Dags gleichzeitig ausgeführt wird, aber dann würden Sie wahrscheinlich entweder einen eindeutigen Schlüssel oder ähnliches verwenden, um das xcom eindeutig zu identifizieren Wert, den Sie wollen. Man könnte wahrscheinlich den 3. Schritt optimieren, um 100% sicher zu sein, dass eine bestimmte Aufgabe des aktuellen Haupttags erhalten wird, aber für meine Verwendung funktioniert dies gut genug. Ich denke, man benötigt nur ein task_instance-Objekt, um xcom_pull zu verwenden.
Außerdem bereinige ich die xcoms für den ersten Subtag vor jeder Ausführung, um sicherzustellen, dass ich nicht versehentlich einen falschen Wert erhalte.
Ich kann es ziemlich schlecht erklären, also hoffe ich, dass der folgende Code alles klar macht:
test1.py
test2.py
und der Hauptworkflow:
test.py
quelle
_ _init_ _.py
zum Subdags-Ordner hinzufügen musste. AnfängerfehlerJa, das ist möglich. Ich habe eine Beispiel-DAG erstellt, die dies demonstriert.
Bevor Sie die DAG ausführen, erstellen Sie diese drei Luftstromvariablen
Sie werden sehen, dass die DAG davon ausgeht
Dazu, nachdem es gelaufen ist
Weitere Informationen zu dieser DAG finden Sie in meinem Artikel zum Erstellen dynamischer Workflows in Airflow .
quelle
OA: "Gibt es in Airflow eine Möglichkeit, einen Workflow so zu erstellen, dass die Anzahl der Aufgaben B * bis zum Abschluss von Aufgabe A unbekannt ist?"
Kurze Antwort ist nein. Der Luftstrom baut den DAG-Strom auf, bevor er ausgeführt wird.
Das heißt, wir sind zu einem einfachen Schluss gekommen, das heißt, wir haben keine solchen Bedürfnisse. Wenn Sie einige Arbeiten parallelisieren möchten, sollten Sie die verfügbaren Ressourcen und nicht die Anzahl der zu verarbeitenden Elemente bewerten.
Wir haben es so gemacht: Wir generieren dynamisch eine feste Anzahl von Aufgaben, z. B. 10, die den Job aufteilen. Wenn wir beispielsweise 100 Dateien verarbeiten müssen, verarbeitet jede Aufgabe 10 davon. Ich werde den Code später heute veröffentlichen.
Aktualisieren
Hier ist der Code, entschuldigen Sie die Verzögerung.
Code-Erklärung:
Hier haben wir eine einzelne Startaufgabe und eine einzelne Endaufgabe (beide Dummy).
Dann erstellen wir von der Startaufgabe mit der for-Schleife 10 Aufgaben mit demselben aufrufbaren Python. Die Aufgaben werden in der Funktion create_dynamic_task erstellt.
An jeden aufrufbaren Python übergeben wir als Argumente die Gesamtzahl der parallelen Aufgaben und den aktuellen Aufgabenindex.
Angenommen, Sie müssen 1000 Elemente ausarbeiten: Die erste Aufgabe erhält als Eingabe, dass sie den ersten von 10 Blöcken ausarbeiten soll. Es wird die 1000 Gegenstände in 10 Stücke teilen und den ersten ausarbeiten.
quelle
parallelTask
ist nicht definiert: Fehlt mir etwas?Ich denke, Sie suchen nach einer dynamischen DAG-Erstellung. Ich bin vor einigen Tagen auf diese Art von Situation gestoßen, nachdem ich diesen Blog nach einiger Suche gefunden habe .
Dynamische Aufgabengenerierung
Einstellen des DAG-Workflows
So sieht unsere DAG nach dem Zusammenstellen des Codes aus
Es war sehr hilfreich, volle Hoffnung. Es wird auch jemand anderem helfen
quelle
Ich glaube, ich habe unter https://github.com/mastak/airflow_multi_dagrun eine bessere Lösung dafür gefunden, bei der DagRuns einfach in die Warteschlange gestellt werden, indem mehrere Dagruns ausgelöst werden, ähnlich wie bei TriggerDagRuns . Die meisten Credits gehen an https://github.com/mastak , obwohl ich einige Details patchen musste , damit es mit dem neuesten Luftstrom funktioniert.
Die Lösung verwendet einen benutzerdefinierten Operator, der mehrere DagRuns auslöst :
Sie können dann mehrere Dagruns von der aufrufbaren Funktion in Ihrem PythonOperator senden, zum Beispiel:
Ich habe eine Verzweigung mit dem Code unter https://github.com/flinz/airflow_multi_dagrun erstellt
quelle
Das Jobdiagramm wird nicht zur Laufzeit generiert. Vielmehr wird das Diagramm erstellt, wenn es von Airflow aus Ihrem Dags-Ordner aufgenommen wird. Daher wird es nicht wirklich möglich sein, bei jeder Ausführung ein anderes Diagramm für den Job zu erstellen. Sie können einen Job so konfigurieren, dass beim Laden ein Diagramm basierend auf einer Abfrage erstellt wird. Dieses Diagramm bleibt danach für jeden Lauf gleich, was wahrscheinlich nicht sehr nützlich ist.
Mithilfe eines Zweigoperators können Sie ein Diagramm entwerfen, das bei jedem Lauf unterschiedliche Aufgaben basierend auf den Abfrageergebnissen ausführt.
Was ich getan habe, ist, eine Reihe von Aufgaben vorkonfigurieren und dann die Abfrageergebnisse zu nehmen und sie auf die Aufgaben zu verteilen. Dies ist wahrscheinlich sowieso besser, denn wenn Ihre Abfrage viele Ergebnisse liefert, möchten Sie den Scheduler wahrscheinlich sowieso nicht mit vielen gleichzeitigen Aufgaben überfluten. Um noch sicherer zu sein, habe ich auch einen Pool verwendet, um sicherzustellen, dass meine Parallelität bei einer unerwartet großen Abfrage nicht außer Kontrolle gerät.
quelle
for tasks in tasks
ich in der Schleife in meinem Beispiel das Objekt lösche, über das ich iteriere. Das ist eine schlechte Idee. Holen Sie sich stattdessen eine Liste der Schlüssel und wiederholen Sie diese - oder überspringen Sie die Löschvorgänge. Wenn xcom_pull None zurückgibt (anstelle einer Liste oder einer leeren Liste), schlägt auch die for-Schleife fehl. Möglicherweise möchten Sie xcom_pull vor dem 'for' ausführen und dann prüfen, ob es None ist - oder sicherstellen, dass dort mindestens eine leere Liste vorhanden ist. YMMV. Viel Glück!open_order_task
?Verstehst du nicht was das Problem ist?
Hier ist ein Standardbeispiel. Nun , wenn in Funktion subdag ersetzt
for i in range(5):
mitfor i in range(random.randint(0, 10)):
dann alles funktionieren wird. Stellen Sie sich nun vor, der Operator 'start' fügt die Daten in eine Datei ein und anstelle eines zufälligen Werts liest die Funktion diese Daten. Dann beeinflusst der 'Start' des Bedieners die Anzahl der Aufgaben.Das Problem tritt nur in der Anzeige in der Benutzeroberfläche auf, da beim Eingeben des Subtags die Anzahl der Aufgaben dem zuletzt aus der Datei / Datenbank / XCom gelesenen Wert entspricht. Dies gibt automatisch eine Beschränkung für mehrere Starts eines Tages gleichzeitig.
quelle
Ich habe diesen mittleren Beitrag gefunden, der dieser Frage sehr ähnlich ist. Es ist jedoch voller Tippfehler und funktioniert nicht, als ich versuchte, es zu implementieren.
Meine Antwort auf das oben Gesagte lautet wie folgt:
Wenn Sie Aufgaben dynamisch erstellen, müssen Sie dies tun, indem Sie über etwas iterieren, das nicht von einer vorgelagerten Aufgabe erstellt wurde oder unabhängig von dieser Aufgabe definiert werden kann. Ich habe erfahren, dass Sie Ausführungsdaten oder andere Luftstromvariablen nicht an etwas außerhalb einer Vorlage (z. B. einer Aufgabe) übergeben können, wie viele andere bereits erwähnt haben. Siehe auch diesen Beitrag .
quelle