Ich habe ein Szenario wie das folgende:
- Lösen Sie a
Task 1
undTask 2
nur aus, wenn neue Daten für sie in der Quelltabelle (Athena) verfügbar sind. Der Auslöser für Task1 und Task2 sollte bei einer neuen Datenparition an einem Tag erfolgen. - Trigger
Task 3
erst nach Abschluss vonTask 1
undTask 2
- Löse
Task 4
nur den Abschluss von ausTask 3
Mein Code
from airflow import DAG
from airflow.contrib.sensors.aws_glue_catalog_partition_sensor import AwsGlueCatalogPartitionSensor
from datetime import datetime, timedelta
from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS
yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': yesterday,
'email': FAILURE_EMAILS,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='@daily')
Athena_Trigger_for_Task1 = AwsGlueCatalogPartitionSensor(
task_id='athena_wait_for_Task1_partition_exists',
database_name='DB',
table_name='Table1',
expression='load_date={{ ds_nodash }}',
timeout=60,
dag=dag)
Athena_Trigger_for_Task2 = AwsGlueCatalogPartitionSensor(
task_id='athena_wait_for_Task2_partition_exists',
database_name='DB',
table_name='Table2',
expression='load_date={{ ds_nodash }}',
timeout=60,
dag=dag)
execute_Task1 = PostgresOperator(
task_id='Task1',
postgres_conn_id='REDSHIFT_CONN',
sql="/sql/flow/Task1.sql",
params={'limit': '50'},
trigger_rule='all_success',
dag=dag
)
execute_Task2 = PostgresOperator(
task_id='Task2',
postgres_conn_id='REDSHIFT_CONN',
sql="/sql/flow/Task2.sql",
params={'limit': '50'},
trigger_rule='all_success',
dag=dag
)
execute_Task3 = PostgresOperator(
task_id='Task3',
postgres_conn_id='REDSHIFT_CONN',
sql="/sql/flow/Task3.sql",
params={'limit': '50'},
trigger_rule='all_success',
dag=dag
)
execute_Task4 = PostgresOperator(
task_id='Task4',
postgres_conn_id='REDSHIFT_CONN',
sql="/sql/flow/Task4",
params={'limit': '50'},
dag=dag
)
execute_Task1.set_upstream(Athena_Trigger_for_Task1)
execute_Task2.set_upstream(Athena_Trigger_for_Task2)
execute_Task3.set_upstream(execute_Task1)
execute_Task3.set_upstream(execute_Task2)
execute_Task4.set_upstream(execute_Task3)
Was ist der beste optimale Weg, um dies zu erreichen?
Task1
undTask2
in Schleife. Für mich werden Daten in die Athena-Quellentabelle 10 Uhr MEZ geladen.Antworten:
Ich glaube, Ihre Frage befasst sich mit zwei Hauptproblemen:
schedule_interval
explizit zu konfigurieren , damit @daily etwas einrichtet, das Sie nicht erwarten.Die kurze Antwort: Legen Sie Ihr Zeitplanintervall explizit mit einem Cron-Jobformat fest und überprüfen Sie es von Zeit zu Zeit mithilfe von Sensoroperatoren
Wo
startime
beginnt Ihre tägliche Aufgabe, zuendtime
welcher letzten Tageszeit sollten Sie überprüfen, ob ein Ereignis ausgeführt wurde, bevor Sie als fehlgeschlagen gekennzeichnet werden, und in welchempoke_time
Intervallsensor_operator
überprüfen Sie, ob das Ereignis aufgetreten ist.So adressieren Sie den Cron-Job explizit, wenn Sie Ihren Tag so einstellen,
@daily
wie Sie es getan haben:Aus den Dokumenten können Sie ersehen, dass Sie tatsächlich Folgendes tun:
@daily - Run once a day at midnight
Was jetzt Sinn macht, warum Sie einen Timeout-Fehler erhalten und nach 5 Minuten fehlschlägt, weil Sie
'retries': 1
und eingestellt haben'retry_delay': timedelta(minutes=5)
. Also versucht es den Dag um Mitternacht laufen zu lassen, es schlägt fehl. versucht es 5 Minuten später erneut und schlägt erneut fehl, sodass es als fehlgeschlagen markiert wird.Im Grunde genommen setzt @daily run einen impliziten Cron-Job von:
Das Cron-Jobformat hat das folgende Format und Sie setzen den Wert auf,
*
wenn Sie "alle" sagen möchten.Minute Hour Day_of_Month Month Day_of_Week
@Daily sagt also im Grunde, dass dies alle: Minute 0 Stunde 0 aller Tage des Monats aller Monate aller Tage der Woche ausgeführt wird
Ihr Fall wird also alle: Minute 0 Stunde 10 aller Tage_von_Monaten aller_Monate aller Tage_der_Woche ausgeführt. Dies übersetzt im Cron-Job-Format zu:
So lösen Sie die Ausführung des Dags aus und versuchen es erneut, wenn Sie von einem externen Ereignis abhängig sind, um die Ausführung abzuschließen
Mit dem Befehl können Sie einen Tag im Luftstrom von einem externen Ereignis auslösen
airflow trigger_dag
. Dies wäre möglich, wenn Sie ein Lambda-Funktions- / Python-Skript auslösen könnten, um auf Ihre Luftstrominstanz abzuzielen.Wenn Sie den Tag nicht extern auslösen können, verwenden Sie einen Sensoroperator wie OP, setzen Sie eine poke_time darauf und legen Sie eine angemessen hohe Anzahl von Wiederholungsversuchen fest.
quelle
airflow trigger_dag
. Dies wäre möglich, wenn Sie ein Lambda-Funktions- / Python-Skript auslösen könnten, um auf Ihre Luftstrominstanz abzuzielen.AwsGlueCatalogPartitionSensor
zusammen mit dem Luftstrombefehl{{ds_nodash}}
für die Partitionsausgänge aufgerufen wurde . Meine Frage dann, wie man das plant.