Richtige Methode zum Erstellen dynamischer Workflows in Airflow

95

Problem

Gibt es in Airflow eine Möglichkeit, einen Workflow so zu erstellen, dass die Anzahl der Aufgaben B * bis zum Abschluss von Aufgabe A unbekannt ist? Ich habe Subdags betrachtet, aber es sieht so aus, als ob es nur mit einer statischen Reihe von Aufgaben funktionieren kann, die bei der Dag-Erstellung festgelegt werden müssen.

Würden Dag-Trigger funktionieren? Und wenn ja, geben Sie bitte ein Beispiel.

Ich habe ein Problem, bei dem es unmöglich ist, die Anzahl der Aufgaben B zu ermitteln, die zur Berechnung von Aufgabe C erforderlich sind, bis Aufgabe A abgeschlossen ist. Die Berechnung jeder Aufgabe B. * dauert mehrere Stunden und kann nicht kombiniert werden.

              |---> Task B.1 --|
              |---> Task B.2 --|
 Task A ------|---> Task B.3 --|-----> Task C
              |       ....     |
              |---> Task B.N --|

Idee # 1

Diese Lösung gefällt mir nicht, da ich einen blockierenden ExternalTaskSensor erstellen muss und alle Aufgaben B * zwischen 2 und 24 Stunden dauern. Daher halte ich dies nicht für eine praktikable Lösung. Sicher gibt es einen einfacheren Weg? Oder war Airflow nicht dafür ausgelegt?

Dag 1
Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C

Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator)
               |-- Task B.1 --|
               |-- Task B.2 --|
Task Dummy A --|-- Task B.3 --|-----> Task Dummy B
               |     ....     |
               |-- Task B.N --|

Bearbeiten 1:

Bis jetzt hat diese Frage noch keine gute Antwort . Ich wurde von mehreren Personen kontaktiert, die nach einer Lösung suchten.

costrouc
quelle
Sind alle Aufgaben B * insofern ähnlich, als sie in einer Schleife erstellt werden können?
Daniel Lee
Ja, alle B. * -Aufgaben können nach Abschluss von Aufgabe A schnell in einer Schleife erstellt werden. Aufgabe A dauert ca. 2 Stunden.
Costrouc
Haben Sie eine Lösung für das Problem gefunden? Würde es Ihnen etwas ausmachen, es vielleicht zu veröffentlichen?
Daniel Dubovski
3
Eine nützliche Ressource für Idee Nr. 1: linkedin.com/pulse/…
Juan Riaza
1
Hier ist ein Artikel, den ich geschrieben habe und der erklärt, wie das geht. Linkedin.com/pulse/dynamic-workflows-airflow-kyle-bridenstine
Kyle Bridenstine

Antworten:

29

So habe ich es mit einer ähnlichen Anfrage ohne Subtags gemacht:

Erstellen Sie zunächst eine Methode, die die gewünschten Werte zurückgibt

def values_function():
     return values

Als nächstes erstellen Sie eine Methode, mit der die Jobs dynamisch generiert werden:

def group(number, **kwargs):
        #load the values if needed in the command you plan to execute
        dyn_value = "{{ task_instance.xcom_pull(task_ids='push_func') }}"
        return BashOperator(
                task_id='JOB_NAME_{}'.format(number),
                bash_command='script.sh {} {}'.format(dyn_value, number),
                dag=dag)

Und dann kombinieren Sie sie:

push_func = PythonOperator(
        task_id='push_func',
        provide_context=True,
        python_callable=values_function,
        dag=dag)

complete = DummyOperator(
        task_id='All_jobs_completed',
        dag=dag)

for i in values_function():
        push_func >> group(i) >> complete
Oleg Yamin
quelle
Wo werden Werte definiert?
Mönch
10
Stattdessen for i in values_function()würde ich so etwas erwarten for i in push_func_output. Das Problem ist, dass ich keinen Weg finde, diese Ausgabe dynamisch zu erhalten. Die Ausgabe des PythonOperator befindet sich nach der Ausführung in Xcom, aber ich weiß nicht, ob ich sie aus der DAG-Definition referenzieren kann.
Ena
@Ena Hast du einen Weg gefunden, das zu erreichen?
Eldos
1
@eldos siehe meine Antwort unten
Ena
1
Was wäre, wenn wir eine Reihe von schrittabhängigen Schritten innerhalb der Schleife ausführen müssten? Würde es eine zweite Abhängigkeitskette innerhalb der groupFunktion geben?
CodingInCircles
12

Ich habe einen Weg gefunden, Workflows basierend auf dem Ergebnis früherer Aufgaben zu erstellen.
Grundsätzlich möchten Sie zwei Subtags mit den folgenden Elementen haben:

  1. Xcom verschiebt eine Liste (oder was auch immer Sie benötigen, um den dynamischen Workflow später zu erstellen) in den Subtag, der zuerst ausgeführt wird (siehe test1.py def return_list()).
  2. Übergeben Sie das Haupt-Dag-Objekt als Parameter an Ihr zweites Subdag
  3. Wenn Sie nun das Hauptobjekt dag haben, können Sie es verwenden, um eine Liste seiner Aufgabeninstanzen abzurufen. Aus dieser Liste von Aufgabeninstanzen können Sie eine Aufgabe des aktuellen Laufs herausfiltern, indem Sie parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1]) verwenden. Hier könnten wahrscheinlich weitere Filter hinzugefügt werden.
  4. Mit dieser Task-Instanz können Sie xcom pull verwenden, um den gewünschten Wert zu erhalten, indem Sie die dag_id für den ersten Unterag angeben: dag_id='%s.%s' % (parent_dag_name, 'test1')
  5. Verwenden Sie die Liste / den Wert, um Ihre Aufgaben dynamisch zu erstellen

Jetzt habe ich dies in meiner lokalen Luftstrominstallation getestet und es funktioniert einwandfrei. Ich weiß nicht, ob das xcom-Pull-Teil Probleme haben wird, wenn mehr als eine Instanz des Dags gleichzeitig ausgeführt wird, aber dann würden Sie wahrscheinlich entweder einen eindeutigen Schlüssel oder ähnliches verwenden, um das xcom eindeutig zu identifizieren Wert, den Sie wollen. Man könnte wahrscheinlich den 3. Schritt optimieren, um 100% sicher zu sein, dass eine bestimmte Aufgabe des aktuellen Haupttags erhalten wird, aber für meine Verwendung funktioniert dies gut genug. Ich denke, man benötigt nur ein task_instance-Objekt, um xcom_pull zu verwenden.

Außerdem bereinige ich die xcoms für den ersten Subtag vor jeder Ausführung, um sicherzustellen, dass ich nicht versehentlich einen falschen Wert erhalte.

Ich kann es ziemlich schlecht erklären, also hoffe ich, dass der folgende Code alles klar macht:

test1.py

from airflow.models import DAG
import logging
from airflow.operators.python_operator import PythonOperator
from airflow.operators.postgres_operator import PostgresOperator

log = logging.getLogger(__name__)


def test1(parent_dag_name, start_date, schedule_interval):
    dag = DAG(
        '%s.test1' % parent_dag_name,
        schedule_interval=schedule_interval,
        start_date=start_date,
    )

    def return_list():
        return ['test1', 'test2']

    list_extract_folder = PythonOperator(
        task_id='list',
        dag=dag,
        python_callable=return_list
    )

    clean_xcoms = PostgresOperator(
        task_id='clean_xcoms',
        postgres_conn_id='airflow_db',
        sql="delete from xcom where dag_id='{{ dag.dag_id }}'",
        dag=dag)

    clean_xcoms >> list_extract_folder

    return dag

test2.py

from airflow.models import DAG, settings
import logging
from airflow.operators.dummy_operator import DummyOperator

log = logging.getLogger(__name__)


def test2(parent_dag_name, start_date, schedule_interval, parent_dag=None):
    dag = DAG(
        '%s.test2' % parent_dag_name,
        schedule_interval=schedule_interval,
        start_date=start_date
    )

    if len(parent_dag.get_active_runs()) > 0:
        test_list = parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1].xcom_pull(
            dag_id='%s.%s' % (parent_dag_name, 'test1'),
            task_ids='list')
        if test_list:
            for i in test_list:
                test = DummyOperator(
                    task_id=i,
                    dag=dag
                )

    return dag

und der Hauptworkflow:

test.py

from datetime import datetime
from airflow import DAG
from airflow.operators.subdag_operator import SubDagOperator
from subdags.test1 import test1
from subdags.test2 import test2

DAG_NAME = 'test-dag'

dag = DAG(DAG_NAME,
          description='Test workflow',
          catchup=False,
          schedule_interval='0 0 * * *',
          start_date=datetime(2018, 8, 24))

test1 = SubDagOperator(
    subdag=test1(DAG_NAME,
                 dag.start_date,
                 dag.schedule_interval),
    task_id='test1',
    dag=dag
)

test2 = SubDagOperator(
    subdag=test2(DAG_NAME,
                 dag.start_date,
                 dag.schedule_interval,
                 parent_dag=dag),
    task_id='test2',
    dag=dag
)

test1 >> test2
Christopher Beck
quelle
unter Airflow 1.9 wurden diese nicht geladen, als sie zum DAG-Ordner hinzugefügt wurden. Fehlt mir etwas?
Anthony Keane
@AnthonyKeane hast du test1.py und test2.py in einen Ordner namens subdags in deinem dag-Ordner gelegt?
Christopher Beck
Ich habe ja getan. Kopierte beide Dateien in Subdags und legte die Datei test.py im Ordner dag ab. Der Fehler wird weiterhin angezeigt. Defekte DAG: [/home/airflow/gcs/dags/test.py] Kein Modul mit dem Namen subdags.test1 Hinweis Ich verwende Google Cloud Composer (Googles verwalteter Airflow 1.9.0)
Anthony Keane
@AnthonyKeane ist dies der einzige Fehler, den Sie in den Protokollen sehen? Eine defekte DAG kann durch einen Kompilierungsfehler des Subtags verursacht werden.
Christopher Beck
3
Hallo @Christopher Beck, ich habe MEINEN Fehler gefunden, den ich _ _init_ _.pyzum Subdags-Ordner hinzufügen musste. Anfängerfehler
Anthony Keane
8

Ja, das ist möglich. Ich habe eine Beispiel-DAG erstellt, die dies demonstriert.

import airflow
from airflow.operators.python_operator import PythonOperator
import os
from airflow.models import Variable
import logging
from airflow import configuration as conf
from airflow.models import DagBag, TaskInstance
from airflow import DAG, settings
from airflow.operators.bash_operator import BashOperator

main_dag_id = 'DynamicWorkflow2'

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
    'provide_context': True
}

dag = DAG(
    main_dag_id,
    schedule_interval="@once",
    default_args=args)


def start(*args, **kwargs):

    value = Variable.get("DynamicWorkflow_Group1")
    logging.info("Current DynamicWorkflow_Group1 value is " + str(value))


def resetTasksStatus(task_id, execution_date):
    logging.info("Resetting: " + task_id + " " + execution_date)

    dag_folder = conf.get('core', 'DAGS_FOLDER')
    dagbag = DagBag(dag_folder)
    check_dag = dagbag.dags[main_dag_id]
    session = settings.Session()

    my_task = check_dag.get_task(task_id)
    ti = TaskInstance(my_task, execution_date)
    state = ti.current_state()
    logging.info("Current state of " + task_id + " is " + str(state))
    ti.set_state(None, session)
    state = ti.current_state()
    logging.info("Updated state of " + task_id + " is " + str(state))


def bridge1(*args, **kwargs):

    # You can set this value dynamically e.g., from a database or a calculation
    dynamicValue = 2

    variableValue = Variable.get("DynamicWorkflow_Group2")
    logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue))

    logging.info("Setting the Airflow Variable DynamicWorkflow_Group2 to " + str(dynamicValue))
    os.system('airflow variables --set DynamicWorkflow_Group2 ' + str(dynamicValue))

    variableValue = Variable.get("DynamicWorkflow_Group2")
    logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue))

    # Below code prevents this bug: https://issues.apache.org/jira/browse/AIRFLOW-1460
    for i in range(dynamicValue):
        resetTasksStatus('secondGroup_' + str(i), str(kwargs['execution_date']))


def bridge2(*args, **kwargs):

    # You can set this value dynamically e.g., from a database or a calculation
    dynamicValue = 3

    variableValue = Variable.get("DynamicWorkflow_Group3")
    logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue))

    logging.info("Setting the Airflow Variable DynamicWorkflow_Group3 to " + str(dynamicValue))
    os.system('airflow variables --set DynamicWorkflow_Group3 ' + str(dynamicValue))

    variableValue = Variable.get("DynamicWorkflow_Group3")
    logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue))

    # Below code prevents this bug: https://issues.apache.org/jira/browse/AIRFLOW-1460
    for i in range(dynamicValue):
        resetTasksStatus('thirdGroup_' + str(i), str(kwargs['execution_date']))


def end(*args, **kwargs):
    logging.info("Ending")


def doSomeWork(name, index, *args, **kwargs):
    # Do whatever work you need to do
    # Here I will just create a new file
    os.system('touch /home/ec2-user/airflow/' + str(name) + str(index) + '.txt')


starting_task = PythonOperator(
    task_id='start',
    dag=dag,
    provide_context=True,
    python_callable=start,
    op_args=[])

# Used to connect the stream in the event that the range is zero
bridge1_task = PythonOperator(
    task_id='bridge1',
    dag=dag,
    provide_context=True,
    python_callable=bridge1,
    op_args=[])

DynamicWorkflow_Group1 = Variable.get("DynamicWorkflow_Group1")
logging.info("The current DynamicWorkflow_Group1 value is " + str(DynamicWorkflow_Group1))

for index in range(int(DynamicWorkflow_Group1)):
    dynamicTask = PythonOperator(
        task_id='firstGroup_' + str(index),
        dag=dag,
        provide_context=True,
        python_callable=doSomeWork,
        op_args=['firstGroup', index])

    starting_task.set_downstream(dynamicTask)
    dynamicTask.set_downstream(bridge1_task)

# Used to connect the stream in the event that the range is zero
bridge2_task = PythonOperator(
    task_id='bridge2',
    dag=dag,
    provide_context=True,
    python_callable=bridge2,
    op_args=[])

DynamicWorkflow_Group2 = Variable.get("DynamicWorkflow_Group2")
logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group2))

for index in range(int(DynamicWorkflow_Group2)):
    dynamicTask = PythonOperator(
        task_id='secondGroup_' + str(index),
        dag=dag,
        provide_context=True,
        python_callable=doSomeWork,
        op_args=['secondGroup', index])

    bridge1_task.set_downstream(dynamicTask)
    dynamicTask.set_downstream(bridge2_task)

ending_task = PythonOperator(
    task_id='end',
    dag=dag,
    provide_context=True,
    python_callable=end,
    op_args=[])

DynamicWorkflow_Group3 = Variable.get("DynamicWorkflow_Group3")
logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group3))

for index in range(int(DynamicWorkflow_Group3)):

    # You can make this logic anything you'd like
    # I chose to use the PythonOperator for all tasks
    # except the last task will use the BashOperator
    if index < (int(DynamicWorkflow_Group3) - 1):
        dynamicTask = PythonOperator(
            task_id='thirdGroup_' + str(index),
            dag=dag,
            provide_context=True,
            python_callable=doSomeWork,
            op_args=['thirdGroup', index])
    else:
        dynamicTask = BashOperator(
            task_id='thirdGroup_' + str(index),
            bash_command='touch /home/ec2-user/airflow/thirdGroup_' + str(index) + '.txt',
            dag=dag)

    bridge2_task.set_downstream(dynamicTask)
    dynamicTask.set_downstream(ending_task)

# If you do not connect these then in the event that your range is ever zero you will have a disconnection between your stream
# and your tasks will run simultaneously instead of in your desired stream order.
starting_task.set_downstream(bridge1_task)
bridge1_task.set_downstream(bridge2_task)
bridge2_task.set_downstream(ending_task)

Bevor Sie die DAG ausführen, erstellen Sie diese drei Luftstromvariablen

airflow variables --set DynamicWorkflow_Group1 1

airflow variables --set DynamicWorkflow_Group2 0

airflow variables --set DynamicWorkflow_Group3 0

Sie werden sehen, dass die DAG davon ausgeht

Geben Sie hier die Bildbeschreibung ein

Dazu, nachdem es gelaufen ist

Geben Sie hier die Bildbeschreibung ein

Weitere Informationen zu dieser DAG finden Sie in meinem Artikel zum Erstellen dynamischer Workflows in Airflow .

Kyle Bridenstine
quelle
1
Aber was passiert, wenn Sie mehrere DagRuns dieser DAG haben? Teilen sie alle die gleichen Variablen?
Mar-k
1
Ja, sie würden dieselbe Variable verwenden. Ich spreche dies in meinem Artikel ganz am Ende an. Sie müssten die Variable dynamisch erstellen und die dag run id im Variablennamen verwenden. Mein Beispiel ist einfach, nur um die dynamische Möglichkeit zu demonstrieren, aber Sie müssen es Produktionsqualität machen :)
Kyle Bridenstine
Sind die Brücken notwendig, um dynamische Aufgaben zu erstellen? Ich werde Ihren Artikel kurz vollständig lesen, wollte aber fragen. Ich habe gerade Probleme damit, eine dynamische Aufgabe basierend auf einer vorgelagerten Aufgabe zu erstellen, und beginne damit, herauszufinden, wo ich falsch gelaufen bin. Mein aktuelles Problem ist, dass ich aus irgendeinem Grund die DAG nicht dazu bringen kann, sich mit dem DAG-Bag zu synchronisieren. Meine DAG wurde synchronisiert, als ich eine statische Liste im Modul verwendete, wurde jedoch gestoppt, als ich diese statische Liste aus einer vorgelagerten Aufgabe herausstellte.
lucid_goose
6

OA: "Gibt es in Airflow eine Möglichkeit, einen Workflow so zu erstellen, dass die Anzahl der Aufgaben B * bis zum Abschluss von Aufgabe A unbekannt ist?"

Kurze Antwort ist nein. Der Luftstrom baut den DAG-Strom auf, bevor er ausgeführt wird.

Das heißt, wir sind zu einem einfachen Schluss gekommen, das heißt, wir haben keine solchen Bedürfnisse. Wenn Sie einige Arbeiten parallelisieren möchten, sollten Sie die verfügbaren Ressourcen und nicht die Anzahl der zu verarbeitenden Elemente bewerten.

Wir haben es so gemacht: Wir generieren dynamisch eine feste Anzahl von Aufgaben, z. B. 10, die den Job aufteilen. Wenn wir beispielsweise 100 Dateien verarbeiten müssen, verarbeitet jede Aufgabe 10 davon. Ich werde den Code später heute veröffentlichen.

Aktualisieren

Hier ist der Code, entschuldigen Sie die Verzögerung.

from datetime import datetime, timedelta

import airflow
from airflow.operators.dummy_operator import DummyOperator

args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 1, 8),
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(seconds=5)
}

dag = airflow.DAG(
    'parallel_tasks_v1',
    schedule_interval="@daily",
    catchup=False,
    default_args=args)

# You can read this from variables
parallel_tasks_total_number = 10

start_task = DummyOperator(
    task_id='start_task',
    dag=dag
)


# Creates the tasks dynamically.
# Each one will elaborate one chunk of data.
def create_dynamic_task(current_task_number):
    return DummyOperator(
        provide_context=True,
        task_id='parallel_task_' + str(current_task_number),
        python_callable=parallelTask,
        # your task will take as input the total number and the current number to elaborate a chunk of total elements
        op_args=[current_task_number, int(parallel_tasks_total_number)],
        dag=dag)


end = DummyOperator(
    task_id='end',
    dag=dag)

for page in range(int(parallel_tasks_total_number)):
    created_task = create_dynamic_task(page)
    start_task >> created_task
    created_task >> end

Code-Erklärung:

Hier haben wir eine einzelne Startaufgabe und eine einzelne Endaufgabe (beide Dummy).

Dann erstellen wir von der Startaufgabe mit der for-Schleife 10 Aufgaben mit demselben aufrufbaren Python. Die Aufgaben werden in der Funktion create_dynamic_task erstellt.

An jeden aufrufbaren Python übergeben wir als Argumente die Gesamtzahl der parallelen Aufgaben und den aktuellen Aufgabenindex.

Angenommen, Sie müssen 1000 Elemente ausarbeiten: Die erste Aufgabe erhält als Eingabe, dass sie den ersten von 10 Blöcken ausarbeiten soll. Es wird die 1000 Gegenstände in 10 Stücke teilen und den ersten ausarbeiten.

Ena
quelle
1
Dies ist eine gute Lösung, solange Sie keine bestimmte Aufgabe pro Element benötigen (wie Fortschritt, Ergebnis, Erfolg /
Misserfolg
@Ena parallelTaskist nicht definiert: Fehlt mir etwas?
Anthony Keane
2
@AnthonyKeane Dies ist die Python-Funktion, die Sie aufrufen sollten, um tatsächlich etwas zu tun. Wie im Code kommentiert, werden die Gesamtzahl und die aktuelle Zahl als Eingabe verwendet, um einen Teil der Gesamtelemente zu erstellen.
Ena
4

Ich denke, Sie suchen nach einer dynamischen DAG-Erstellung. Ich bin vor einigen Tagen auf diese Art von Situation gestoßen, nachdem ich diesen Blog nach einiger Suche gefunden habe .

Dynamische Aufgabengenerierung

start = DummyOperator(
    task_id='start',
    dag=dag
)

end = DummyOperator(
    task_id='end',
    dag=dag)

def createDynamicETL(task_id, callableFunction, args):
    task = PythonOperator(
        task_id = task_id,
        provide_context=True,
        #Eval is used since the callableFunction var is of type string
        #while the python_callable argument for PythonOperators only receives objects of type callable not strings.
        python_callable = eval(callableFunction),
        op_kwargs = args,
        xcom_push = True,
        dag = dag,
    )
    return task

Einstellen des DAG-Workflows

with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
    # Use safe_load instead to load the YAML file
    configFile = yaml.safe_load(f)

    # Extract table names and fields to be processed
    tables = configFile['tables']

    # In this loop tasks are created for each table defined in the YAML file
    for table in tables:
        for table, fieldName in table.items():
            # In our example, first step in the workflow for each table is to get SQL data from db.
            # Remember task id is provided in order to exchange data among tasks generated in dynamic way.
            get_sql_data_task = createDynamicETL('{}-getSQLData'.format(table),
                                                 'getSQLData',
                                                 {'host': 'host', 'user': 'user', 'port': 'port', 'password': 'pass',
                                                  'dbname': configFile['dbname']})

            # Second step is upload data to s3
            upload_to_s3_task = createDynamicETL('{}-uploadDataToS3'.format(table),
                                                 'uploadDataToS3',
                                                 {'previous_task_id': '{}-getSQLData'.format(table),
                                                  'bucket_name': configFile['bucket_name'],
                                                  'prefix': configFile['prefix']})

            # This is where the magic lies. The idea is that
            # once tasks are generated they should linked with the
            # dummy operators generated in the start and end tasks. 
            # Then you are done!
            start >> get_sql_data_task
            get_sql_data_task >> upload_to_s3_task
            upload_to_s3_task >> end

So sieht unsere DAG nach dem Zusammenstellen des Codes aus Geben Sie hier die Bildbeschreibung ein

import yaml
import airflow
from airflow import DAG
from datetime import datetime, timedelta, time
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator

start = DummyOperator(
    task_id='start',
    dag=dag
)


def createDynamicETL(task_id, callableFunction, args):
    task = PythonOperator(
        task_id=task_id,
        provide_context=True,
        # Eval is used since the callableFunction var is of type string
        # while the python_callable argument for PythonOperators only receives objects of type callable not strings.
        python_callable=eval(callableFunction),
        op_kwargs=args,
        xcom_push=True,
        dag=dag,
    )
    return task


end = DummyOperator(
    task_id='end',
    dag=dag)

with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
    # use safe_load instead to load the YAML file
    configFile = yaml.safe_load(f)

    # Extract table names and fields to be processed
    tables = configFile['tables']

    # In this loop tasks are created for each table defined in the YAML file
    for table in tables:
        for table, fieldName in table.items():
            # In our example, first step in the workflow for each table is to get SQL data from db.
            # Remember task id is provided in order to exchange data among tasks generated in dynamic way.
            get_sql_data_task = createDynamicETL('{}-getSQLData'.format(table),
                                                 'getSQLData',
                                                 {'host': 'host', 'user': 'user', 'port': 'port', 'password': 'pass',
                                                  'dbname': configFile['dbname']})

            # Second step is upload data to s3
            upload_to_s3_task = createDynamicETL('{}-uploadDataToS3'.format(table),
                                                 'uploadDataToS3',
                                                 {'previous_task_id': '{}-getSQLData'.format(table),
                                                  'bucket_name': configFile['bucket_name'],
                                                  'prefix': configFile['prefix']})

            # This is where the magic lies. The idea is that
            # once tasks are generated they should linked with the
            # dummy operators generated in the start and end tasks. 
            # Then you are done!
            start >> get_sql_data_task
            get_sql_data_task >> upload_to_s3_task
            upload_to_s3_task >> end

Es war sehr hilfreich, volle Hoffnung. Es wird auch jemand anderem helfen

Muhammad Bin Ali
quelle
Hast du es selbst geschafft? Ich bin müde Aber ich habe versagt.
Newt
Ja, es hat bei mir funktioniert. Vor welchem ​​Problem stehen Sie?
Muhammad Bin Ali
1
Ich habe es verstanden. Mein Problem wurde gelöst. Vielen Dank. Ich habe einfach nicht den richtigen Weg gefunden, um Umgebungsvariablen in Docker-Bildern zu lesen.
Newt
3

Ich glaube, ich habe unter https://github.com/mastak/airflow_multi_dagrun eine bessere Lösung dafür gefunden, bei der DagRuns einfach in die Warteschlange gestellt werden, indem mehrere Dagruns ausgelöst werden, ähnlich wie bei TriggerDagRuns . Die meisten Credits gehen an https://github.com/mastak , obwohl ich einige Details patchen musste , damit es mit dem neuesten Luftstrom funktioniert.

Die Lösung verwendet einen benutzerdefinierten Operator, der mehrere DagRuns auslöst :

from airflow import settings
from airflow.models import DagBag
from airflow.operators.dagrun_operator import DagRunOrder, TriggerDagRunOperator
from airflow.utils.decorators import apply_defaults
from airflow.utils.state import State
from airflow.utils import timezone


class TriggerMultiDagRunOperator(TriggerDagRunOperator):
    CREATED_DAGRUN_KEY = 'created_dagrun_key'

    @apply_defaults
    def __init__(self, op_args=None, op_kwargs=None,
                 *args, **kwargs):
        super(TriggerMultiDagRunOperator, self).__init__(*args, **kwargs)
        self.op_args = op_args or []
        self.op_kwargs = op_kwargs or {}

    def execute(self, context):

        context.update(self.op_kwargs)
        session = settings.Session()
        created_dr_ids = []
        for dro in self.python_callable(*self.op_args, **context):
            if not dro:
                break
            if not isinstance(dro, DagRunOrder):
                dro = DagRunOrder(payload=dro)

            now = timezone.utcnow()
            if dro.run_id is None:
                dro.run_id = 'trig__' + now.isoformat()

            dbag = DagBag(settings.DAGS_FOLDER)
            trigger_dag = dbag.get_dag(self.trigger_dag_id)
            dr = trigger_dag.create_dagrun(
                run_id=dro.run_id,
                execution_date=now,
                state=State.RUNNING,
                conf=dro.payload,
                external_trigger=True,
            )
            created_dr_ids.append(dr.id)
            self.log.info("Created DagRun %s, %s", dr, now)

        if created_dr_ids:
            session.commit()
            context['ti'].xcom_push(self.CREATED_DAGRUN_KEY, created_dr_ids)
        else:
            self.log.info("No DagRun created")
        session.close()

Sie können dann mehrere Dagruns von der aufrufbaren Funktion in Ihrem PythonOperator senden, zum Beispiel:

from airflow.operators.dagrun_operator import DagRunOrder
from airflow.models import DAG
from airflow.operators import TriggerMultiDagRunOperator
from airflow.utils.dates import days_ago


def generate_dag_run(**kwargs):
    for i in range(10):
        order = DagRunOrder(payload={'my_variable': i})
        yield order

args = {
    'start_date': days_ago(1),
    'owner': 'airflow',
}

dag = DAG(
    dag_id='simple_trigger',
    max_active_runs=1,
    schedule_interval='@hourly',
    default_args=args,
)

gen_target_dag_run = TriggerMultiDagRunOperator(
    task_id='gen_target_dag_run',
    dag=dag,
    trigger_dag_id='common_target',
    python_callable=generate_dag_run
)

Ich habe eine Verzweigung mit dem Code unter https://github.com/flinz/airflow_multi_dagrun erstellt

flinz
quelle
3

Das Jobdiagramm wird nicht zur Laufzeit generiert. Vielmehr wird das Diagramm erstellt, wenn es von Airflow aus Ihrem Dags-Ordner aufgenommen wird. Daher wird es nicht wirklich möglich sein, bei jeder Ausführung ein anderes Diagramm für den Job zu erstellen. Sie können einen Job so konfigurieren, dass beim Laden ein Diagramm basierend auf einer Abfrage erstellt wird. Dieses Diagramm bleibt danach für jeden Lauf gleich, was wahrscheinlich nicht sehr nützlich ist.

Mithilfe eines Zweigoperators können Sie ein Diagramm entwerfen, das bei jedem Lauf unterschiedliche Aufgaben basierend auf den Abfrageergebnissen ausführt.

Was ich getan habe, ist, eine Reihe von Aufgaben vorkonfigurieren und dann die Abfrageergebnisse zu nehmen und sie auf die Aufgaben zu verteilen. Dies ist wahrscheinlich sowieso besser, denn wenn Ihre Abfrage viele Ergebnisse liefert, möchten Sie den Scheduler wahrscheinlich sowieso nicht mit vielen gleichzeitigen Aufgaben überfluten. Um noch sicherer zu sein, habe ich auch einen Pool verwendet, um sicherzustellen, dass meine Parallelität bei einer unerwartet großen Abfrage nicht außer Kontrolle gerät.

"""
 - This is an idea for how to invoke multiple tasks based on the query results
"""
import logging
from datetime import datetime

from airflow import DAG
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.mysql_operator import MySqlOperator
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from include.run_celery_task import runCeleryTask

########################################################################

default_args = {
    'owner': 'airflow',
    'catchup': False,
    'depends_on_past': False,
    'start_date': datetime(2019, 7, 2, 19, 50, 00),
    'email': ['rotten@stackoverflow'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 0,
    'max_active_runs': 1
}

dag = DAG('dynamic_tasks_example', default_args=default_args, schedule_interval=None)

totalBuckets = 5

get_orders_query = """
select 
    o.id,
    o.customer
from 
    orders o
where
    o.created_at >= current_timestamp at time zone 'UTC' - '2 days'::interval
    and
    o.is_test = false
    and
    o.is_processed = false
"""

###########################################################################################################

# Generate a set of tasks so we can parallelize the results
def createOrderProcessingTask(bucket_number):
    return PythonOperator( 
                           task_id=f'order_processing_task_{bucket_number}',
                           python_callable=runOrderProcessing,
                           pool='order_processing_pool',
                           op_kwargs={'task_bucket': f'order_processing_task_{bucket_number}'},
                           provide_context=True,
                           dag=dag
                          )


# Fetch the order arguments from xcom and doStuff() to them
def runOrderProcessing(task_bucket, **context):
    orderList = context['ti'].xcom_pull(task_ids='get_open_orders', key=task_bucket)

    if orderList is not None:
        for order in orderList:
            logging.info(f"Processing Order with Order ID {order[order_id]}, customer ID {order[customer_id]}")
            doStuff(**op_kwargs)


# Discover the orders we need to run and group them into buckets for processing
def getOpenOrders(**context):
    myDatabaseHook = PostgresHook(postgres_conn_id='my_database_conn_id')

    # initialize the task list buckets
    tasks = {}
    for task_number in range(0, totalBuckets):
        tasks[f'order_processing_task_{task_number}'] = []

    # populate the task list buckets
    # distribute them evenly across the set of buckets
    resultCounter = 0
    for record in myDatabaseHook.get_records(get_orders_query):

        resultCounter += 1
        bucket = (resultCounter % totalBuckets)

        tasks[f'order_processing_task_{bucket}'].append({'order_id': str(record[0]), 'customer_id': str(record[1])})

    # push the order lists into xcom
    for task in tasks:
        if len(tasks[task]) > 0:
            logging.info(f'Task {task} has {len(tasks[task])} orders.')
            context['ti'].xcom_push(key=task, value=tasks[task])
        else:
            # if we didn't have enough tasks for every bucket
            # don't bother running that task - remove it from the list
            logging.info(f"Task {task} doesn't have any orders.")
            del(tasks[task])

    return list(tasks.keys())

###################################################################################################


# this just makes sure that there aren't any dangling xcom values in the database from a crashed dag
clean_xcoms = MySqlOperator(
    task_id='clean_xcoms',
    mysql_conn_id='airflow_db',
    sql="delete from xcom where dag_id='{{ dag.dag_id }}'",
    dag=dag)


# Ideally we'd use BranchPythonOperator() here instead of PythonOperator so that if our
# query returns fewer results than we have buckets, we don't try to run them all.
# Unfortunately I couldn't get BranchPythonOperator to take a list of results like the
# documentation says it should (Airflow 1.10.2). So we call all the bucket tasks for now.
get_orders_task = PythonOperator(
                                 task_id='get_orders',
                                 python_callable=getOpenOrders,
                                 provide_context=True,
                                 dag=dag
                                )
get_orders_task.set_upstream(clean_xcoms)

# set up the parallel tasks -- these are configured at compile time, not at run time:
for bucketNumber in range(0, totalBuckets):
    taskBucket = createOrderProcessingTask(bucketNumber)
    taskBucket.set_upstream(get_orders_task)


###################################################################################################
verfault
quelle
Beachten Sie, dass es möglicherweise möglich ist, Subdags als Ergebnis einer Aufgabe im laufenden Betrieb zu erstellen. In den meisten Dokumentationen zu Subtags, die ich gefunden habe, wird jedoch dringend empfohlen, sich von dieser Funktion fernzuhalten, da sie mehr Probleme verursacht als löst in den meisten Fällen. Ich habe Vorschläge gesehen, dass Subdags bald als integrierte Funktion entfernt werden könnten.
fauler
Beachten Sie auch, dass for tasks in tasksich in der Schleife in meinem Beispiel das Objekt lösche, über das ich iteriere. Das ist eine schlechte Idee. Holen Sie sich stattdessen eine Liste der Schlüssel und wiederholen Sie diese - oder überspringen Sie die Löschvorgänge. Wenn xcom_pull None zurückgibt (anstelle einer Liste oder einer leeren Liste), schlägt auch die for-Schleife fehl. Möglicherweise möchten Sie xcom_pull vor dem 'for' ausführen und dann prüfen, ob es None ist - oder sicherstellen, dass dort mindestens eine leere Liste vorhanden ist. YMMV. Viel Glück!
fauler
1
was ist in der open_order_task?
Alltej
Sie haben Recht, das ist ein Tippfehler in meinem Beispiel. Es sollte get_orders_task.set_upstream () sein. Ich werde es reparieren.
fauler
0

Verstehst du nicht was das Problem ist?

Hier ist ein Standardbeispiel. Nun , wenn in Funktion subdag ersetzt for i in range(5):mit for i in range(random.randint(0, 10)):dann alles funktionieren wird. Stellen Sie sich nun vor, der Operator 'start' fügt die Daten in eine Datei ein und anstelle eines zufälligen Werts liest die Funktion diese Daten. Dann beeinflusst der 'Start' des Bedieners die Anzahl der Aufgaben.

Das Problem tritt nur in der Anzeige in der Benutzeroberfläche auf, da beim Eingeben des Subtags die Anzahl der Aufgaben dem zuletzt aus der Datei / Datenbank / XCom gelesenen Wert entspricht. Dies gibt automatisch eine Beschränkung für mehrere Starts eines Tages gleichzeitig.

Denis Shcheglov
quelle
-1

Ich habe diesen mittleren Beitrag gefunden, der dieser Frage sehr ähnlich ist. Es ist jedoch voller Tippfehler und funktioniert nicht, als ich versuchte, es zu implementieren.

Meine Antwort auf das oben Gesagte lautet wie folgt:

Wenn Sie Aufgaben dynamisch erstellen, müssen Sie dies tun, indem Sie über etwas iterieren, das nicht von einer vorgelagerten Aufgabe erstellt wurde oder unabhängig von dieser Aufgabe definiert werden kann. Ich habe erfahren, dass Sie Ausführungsdaten oder andere Luftstromvariablen nicht an etwas außerhalb einer Vorlage (z. B. einer Aufgabe) übergeben können, wie viele andere bereits erwähnt haben. Siehe auch diesen Beitrag .

MarMat
quelle
Wenn Sie sich meinen Kommentar ansehen, werden Sie sehen, dass es tatsächlich möglich ist, Aufgaben basierend auf dem Ergebnis von Upstream-Aufgaben zu erstellen.
Christopher Beck