Ich versuche, s3-Dateien mithilfe des Luftstroms aus einem "nicht löschenden" Bucket (dh ich kann die Dateien nicht löschen) in GCS zu verschieben. Ich kann nicht garantieren, dass jeden Tag neue Dateien vorhanden sind, aber ich muss jeden Tag nach neuen Dateien suchen.
Mein Problem ist die dynamische Erstellung von Subtags. Wenn es Dateien gibt, brauche ich Subtags. Wenn es KEINE Dateien gibt, brauche ich keine Subtags. Mein Problem sind die Upstream / Downstream-Einstellungen. In meinem Code erkennt es zwar Dateien, startet aber die Subtags nicht so, wie sie sollen. Ich vermisse etwas.
Hier ist mein Code:
from airflow import models
from airflow.utils.helpers import chain
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.contrib.operators.s3_to_gcs_operator import S3ToGoogleCloudStorageOperator
from airflow.utils import dates
from airflow.models import Variable
import logging
args = {
'owner': 'Airflow',
'start_date': dates.days_ago(1),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_success': True,
}
bucket = 'mybucket'
prefix = 'myprefix/'
LastBDEXDate = int(Variable.get("last_publish_date"))
maxdate = LastBDEXDate
files = []
parent_dag = models.DAG(
dag_id='My_Ingestion',
default_args=args,
schedule_interval='@daily',
catchup=False
)
def Check_For_Files(**kwargs):
s3 = S3Hook(aws_conn_id='S3_BOX')
s3.get_conn()
bucket = bucket
LastBDEXDate = int(Variable.get("last_publish_date"))
maxdate = LastBDEXDate
files = s3.list_keys(bucket_name=bucket, prefix='myprefix/file')
for file in files:
print(file)
print(file.split("_")[-2])
print(file.split("_")[-2][-8:]) ##proves I can see a date in the file name is ok.
maxdate = maxdate if maxdate > int(file.split("_")[-2][-8:]) else int(file.split("_")[-2][-8:])
if maxdate > LastBDEXDate:
return 'Start_Process'
return 'finished'
def create_subdag(dag_parent, dag_id_child_prefix, file_name):
# dag params
dag_id_child = '%s.%s' % (dag_parent.dag_id, dag_id_child_prefix)
# dag
subdag = models.DAG(dag_id=dag_id_child,
default_args=args,
schedule_interval=None)
# operators
s3_to_gcs_op = S3ToGoogleCloudStorageOperator(
task_id=dag_id_child,
bucket=bucket,
prefix=file_name,
dest_gcs_conn_id='GCP_Account',
dest_gcs='gs://my_files/To_Process/',
replace=False,
gzip=True,
dag=subdag)
return subdag
def create_subdag_operator(dag_parent, filename, index):
tid_subdag = 'file_{}'.format(index)
subdag = create_subdag(dag_parent, tid_subdag, filename)
sd_op = SubDagOperator(task_id=tid_subdag, dag=dag_parent, subdag=subdag)
return sd_op
def create_subdag_operators(dag_parent, file_list):
subdags = [create_subdag_operator(dag_parent, file, file_list.index(file)) for file in file_list]
# chain subdag-operators together
chain(*subdags)
return subdags
check_for_files = BranchPythonOperator(
task_id='Check_for_s3_Files',
provide_context=True,
python_callable=Check_For_Files,
dag=parent_dag
)
finished = DummyOperator(
task_id='finished',
dag=parent_dag
)
decision_to_continue = DummyOperator(
task_id='Start_Process',
dag=parent_dag
)
if len(files) > 0:
subdag_ops = create_subdag_operators(parent_dag, files)
check_for_files >> decision_to_continue >> subdag_ops[0] >> subdag_ops[-1] >> finished
check_for_files >> finished
python
airflow
directed-acyclic-graphs
arcee123
quelle
quelle
spark
Jobs oder einpython
Skript und wie verwenden Sie sie, um sie auszuführen,livy
oder eine andere Methodefiles
eine leere Liste?Antworten:
Im Folgenden finden Sie die empfohlene Methode zum Erstellen einer dynamischen DAG oder Sub-DAG im Luftstrom. Es gibt jedoch auch andere Möglichkeiten, aber ich denke, dies ist weitgehend auf Ihr Problem anwendbar.
Erstellen Sie zunächst eine Datei,
(yaml/csv)
die die Liste allers3
Dateien und Speicherorte enthält. In Ihrem Fall haben Sie eine Funktion zum Speichern in einer Liste geschrieben. Ich würde sagen, speichern Sie sie in einer separatenyaml
Datei und laden Sie sie zur Laufzeit in airflow env und erstellen Sie sie dann DAGs.Unten ist eine Beispieldatei
yaml
:dynamicDagConfigFile.yaml
Sie können Ihre
Check_For_Files
Funktion ändern , um sie in eineryaml
Datei zu speichern .Jetzt können wir mit der dynamischen Dag-Erstellung fortfahren:
Definieren Sie zunächst zwei Aufgaben mit Dummy-Operatoren, die Start- und die Endaufgabe. Solche Aufgaben sind diejenigen, auf denen wir aufbauen werden, indem wir
DAG
dynamisch Aufgaben zwischen ihnen erstellen:Dynamische DAG: Wir werden
PythonOperators
im Luftstrom verwenden. Die Funktion sollte als Argumente die Task-ID erhalten. eine auszuführende Python-Funktion, dh python_callable für den Python-Operator; und eine Reihe von Argumenten, die während der Ausführung verwendet werden sollen.Fügen Sie ein Argument hinzu, das
task id
. So können wir Daten zwischen Aufgaben austauschen, die auf dynamische Weise generiert wurden, zXCOM
. B. über .Sie können Ihre Operationsfunktion innerhalb dieses dynamischen Tages festlegen
s3_to_gcs_op
.Basierend auf dem Speicherort in der Yaml-Datei können Sie schließlich dynamische Dags erstellen. Lesen Sie zuerst die
yaml
Datei wie folgt und erstellen Sie dynamische Dags :Endgültige DAG-Definition:
Die Idee ist das
Vollständiger Luftstromcode in der Reihenfolge:
quelle
upload_s3_toGCS
nicht existiert und Fehler im Luftstrom auftreten.yaml
Datei entfernen, sobald alle diese Dateien in GCS hochgeladen wurden. Auf diese Weise sind nur neue Dateien in deryaml
Datei vorhanden. Und falls keine neuen Dateien vorhanden sind, ist dieyaml
Datei leer und es wird kein dynamischer Tag erstellt. Aus diesem Grund ist eineyaml
Datei eine viel bessere Option als das Speichern von Dateien in einer Liste.yaml
Datei hilft auch dabei, die Protokollierung der s3-Dateien auf eine Weise aufrechtzuerhalten. Wenn ein Teil der s3-Datei nicht in GCS hochgeladen werden kann, können Sie auch ein dieser Datei entsprechendes Flag verwalten und diese beim nächsten DAG-Lauf erneut versuchen.if
der DAG eine Bedingung vorlegen, die nach neuen Dateien inyaml
Dateien sucht, wenn neue Dateien vorhanden sind. Führen Sie sie aus, andernfalls überspringen Sie sie.