Wie kann eine Airflow-Task nur ausgelöst werden, wenn neue Partitionen / Daten mit DAG in Python in der AWS-Athena-Tabelle verfügbar sind?

9

Ich habe ein Szenario wie das folgende:

  1. Lösen Sie a Task 1und Task 2nur aus, wenn neue Daten für sie in der Quelltabelle (Athena) verfügbar sind. Der Auslöser für Task1 und Task2 sollte bei einer neuen Datenparition an einem Tag erfolgen.
  2. Trigger Task 3erst nach Abschluss von Task 1undTask 2
  3. Löse Task 4nur den Abschluss von ausTask 3

Geben Sie hier die Bildbeschreibung ein

Mein Code

from airflow import DAG

from airflow.contrib.sensors.aws_glue_catalog_partition_sensor import AwsGlueCatalogPartitionSensor
from datetime import datetime, timedelta

from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS

yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': yesterday,
    'email': FAILURE_EMAILS,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='@daily')

Athena_Trigger_for_Task1 = AwsGlueCatalogPartitionSensor(
    task_id='athena_wait_for_Task1_partition_exists',
    database_name='DB',
    table_name='Table1',
    expression='load_date={{ ds_nodash }}',
    timeout=60,
    dag=dag)

Athena_Trigger_for_Task2 = AwsGlueCatalogPartitionSensor(
    task_id='athena_wait_for_Task2_partition_exists',
    database_name='DB',
    table_name='Table2',
    expression='load_date={{ ds_nodash }}',
    timeout=60,
    dag=dag)

execute_Task1 = PostgresOperator(
    task_id='Task1',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task1.sql",
    params={'limit': '50'},
    trigger_rule='all_success',
    dag=dag
)

execute_Task2 = PostgresOperator(
    task_id='Task2',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task2.sql",
    params={'limit': '50'},
    trigger_rule='all_success',
    dag=dag
)



execute_Task3 = PostgresOperator(
    task_id='Task3',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task3.sql",
    params={'limit': '50'},
    trigger_rule='all_success',
    dag=dag
)

execute_Task4 = PostgresOperator(
    task_id='Task4',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task4",
    params={'limit': '50'},
    dag=dag
)



execute_Task1.set_upstream(Athena_Trigger_for_Task1)
execute_Task2.set_upstream(Athena_Trigger_for_Task2)

execute_Task3.set_upstream(execute_Task1)
execute_Task3.set_upstream(execute_Task2)

execute_Task4.set_upstream(execute_Task3)

Was ist der beste optimale Weg, um dies zu erreichen?

pankaj
quelle
Haben Sie Probleme mit dieser Lösung?
Bernardo stearns reisen
@ Bernardostearnsreisen, Manchmal geht das Task1und Task2in Schleife. Für mich werden Daten in die Athena-Quellentabelle 10 Uhr MEZ geladen.
Pankaj
Sie meinen, der Luftstrom wiederholt Task1 und Task2 viele Male, bis er erfolgreich ist.
Bernardo stearns reisen
@Bernardostearnsreisen, yup genau
pankaj
1
@Bernardostearnsreisen, ich wusste nicht, wie man das Kopfgeld
vergibt

Antworten:

1

Ich glaube, Ihre Frage befasst sich mit zwei Hauptproblemen:

  1. Vergessen Sie, das schedule_intervalexplizit zu konfigurieren , damit @daily etwas einrichtet, das Sie nicht erwarten.
  2. So lösen Sie die Ausführung des Dags aus und versuchen es erneut, wenn Sie von einem externen Ereignis abhängig sind, um die Ausführung abzuschließen

Die kurze Antwort: Legen Sie Ihr Zeitplanintervall explizit mit einem Cron-Jobformat fest und überprüfen Sie es von Zeit zu Zeit mithilfe von Sensoroperatoren

default_args={
        'retries': (endtime - starttime)*60/poke_time
}
dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='0 10 * * *')
Athena_Trigger_for_Task1 = AwsGlueCatalogPartitionSensor(
     ....
    poke_time= 60*5 #<---- set a poke_time in seconds
    dag=dag)

Wo startimebeginnt Ihre tägliche Aufgabe, zu endtimewelcher letzten Tageszeit sollten Sie überprüfen, ob ein Ereignis ausgeführt wurde, bevor Sie als fehlgeschlagen gekennzeichnet werden, und in welchem poke_timeIntervall sensor_operatorüberprüfen Sie, ob das Ereignis aufgetreten ist.

So adressieren Sie den Cron-Job explizit, wenn Sie Ihren Tag so einstellen,@dailywie Sie es getan haben:

dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='@daily')

Aus den Dokumenten können Sie ersehen, dass Sie tatsächlich Folgendes tun: @daily - Run once a day at midnight

Was jetzt Sinn macht, warum Sie einen Timeout-Fehler erhalten und nach 5 Minuten fehlschlägt, weil Sie 'retries': 1und eingestellt haben 'retry_delay': timedelta(minutes=5). Also versucht es den Dag um Mitternacht laufen zu lassen, es schlägt fehl. versucht es 5 Minuten später erneut und schlägt erneut fehl, sodass es als fehlgeschlagen markiert wird.

Im Grunde genommen setzt @daily run einen impliziten Cron-Job von:

@daily -> Run once a day at midnight -> 0 0 * * *

Das Cron-Jobformat hat das folgende Format und Sie setzen den Wert auf, *wenn Sie "alle" sagen möchten.

Minute Hour Day_of_Month Month Day_of_Week

@Daily sagt also im Grunde, dass dies alle: Minute 0 Stunde 0 aller Tage des Monats aller Monate aller Tage der Woche ausgeführt wird

Ihr Fall wird also alle: Minute 0 Stunde 10 aller Tage_von_Monaten aller_Monate aller Tage_der_Woche ausgeführt. Dies übersetzt im Cron-Job-Format zu:

0 10 * * *

So lösen Sie die Ausführung des Dags aus und versuchen es erneut, wenn Sie von einem externen Ereignis abhängig sind, um die Ausführung abzuschließen

  1. Mit dem Befehl können Sie einen Tag im Luftstrom von einem externen Ereignis auslösen airflow trigger_dag. Dies wäre möglich, wenn Sie ein Lambda-Funktions- / Python-Skript auslösen könnten, um auf Ihre Luftstrominstanz abzuzielen.

  2. Wenn Sie den Tag nicht extern auslösen können, verwenden Sie einen Sensoroperator wie OP, setzen Sie eine poke_time darauf und legen Sie eine angemessen hohe Anzahl von Wiederholungsversuchen fest.

Bernardo stearns reisen
quelle
Danke dafür. Auch wenn ich die Aufgaben basierend auf dem Ereignis und nicht auf der Zeit auslösen möchte, dh sobald eine neue Datenpartition in der Quelle "AWS Athena Tables" verfügbar ist, sollte die nächste Aufgabe ausgelöst werden. Wie plane ich dann? Ist mein aktueller Code passend genug?
Pankaj
@pankaj, ich sehe nur zwei Alternativen. Ich weiß nicht viel über aws athena, aber Sie könnten mit dem Befehl einen Tag im Luftstrom von einem externen Ereignis auslösen airflow trigger_dag. Dies wäre möglich, wenn Sie ein Lambda-Funktions- / Python-Skript auslösen könnten, um auf Ihre Luftstrominstanz abzuzielen.
Bernardo Stearns Reisen
Die andere Alternative ist mehr oder weniger das, was Sie tun, da Sie keinen ereignisbasierten Trigger haben, müssen Sie regelmäßig überprüfen, ob dieses Ereignis aufgetreten ist. Wenn Sie diese aktuelle Lösung verwenden, wird ein Cron-Job für eine Reihe von Stunden festgelegt, in denen der Tag in einer hohen Häufigkeit von Minuten ausgeführt wird. Viele werden fehlschlagen, aber nach dem Ereignis können sie relativ schnell abfangen
Bernardo stearns reisen
@Bernado, ich habe das Paket in Airflow herausgefunden, das AwsGlueCatalogPartitionSensorzusammen mit dem Luftstrombefehl {{ds_nodash}}für die Partitionsausgänge aufgerufen wurde . Meine Frage dann, wie man das plant.
Pankaj
@ Benado, können Sie sich meinen Code ansehen, in dem ich die oben erwähnte Prüfung implementiert habe, und Ihre Eingaben machen
pankaj