Please find the code below which we seem to construct the DAG but the tasks come in a state of removed.
We in the code below are trying to get a list of blobs and then based on that create dynamically dataflow template caller task operator which would run by taking in the input file parameter dynamically . So each call of the dynamic task which is the dataflow template caller would cater to one input file.
The DAG code :
list_of_blobs=[]
def read_text_file(config_file_path):
if os.path.exists(config_file_path):
try:
with open(config_file_path, 'r', encoding='utf-8') as f:
configuration = f.read()#json.load(f)
con = configuration.split(',')
global list_of_blobs
for c in con:
list_of_blobs.append(c)
except IOError as e:
print(e)
def read_config(**kwargs):
today = date.today()
bucket = "mystoragebucket"
blob_name = "config/configuration_r2c.json"
storage_client = storage.Client()
bucket = storage_client.get_bucket(str(bucket))
blob = bucket.blob(str(blob_name))
downloaded_blob = blob.download_as_string()
data = json.loads(downloaded_blob.decode("utf-8"))
kwargs['ti'].xcom_push(key='input_file', value=data['input_file'])
kwargs['ti'].xcom_push(key='delimiter', value=data['delimiter'])
kwargs['ti'].xcom_push(key='cols', value=data['cols'])
kwargs['ti'].xcom_push(key='p_output_file', value=data['p_output_file'])
kwargs['ti'].xcom_push(key='b_output_file', value=data['b_output_file'])
kwargs['ti'].xcom_push(key='Cleansed_Bucket_FolderPath', value=data['Cleansed_Bucket_FolderPath'])
kwargs['ti'].xcom_push(key='GCSSourceFolder', value=data['GCSSourceFolder'])
kwargs['ti'].xcom_push(key='File_name', value=data['File_name'])
kwargs['ti'].xcom_push(key='Cleansed_Bucket_Name', value=data['Cleansed_Bucket_Name'])
kwargs['ti'].xcom_push(key='Source_Raw_Bucket_Name', value=data['Source_Raw_Bucket_Name'])
kwargs['ti'].xcom_push(key='BigQueryTargetTableName', value=data['BigQueryTargetTableName'])
source_bucket = storage_client.get_bucket(data['Source_Raw_Bucket_Name'])
folder = 'processing' + '/' + data['GCSSourceFolder'] + '/' + data['File_name'] + '/' + str(today.year) + '/' + str(today.month) + '/' + str(today.day)
blobs = source_bucket.list_blobs(prefix=folder)
print(blobs)
#blob_list= ['aaa','hhhhhh']
blob_list = ''
i=0
for blob in blobs:
if(blob.name.endswith('.csv') or blob.name.endswith('.dat') or blob.name.endswith('.gz')):
if(i==0):
blob_list=blob_list + blob.name
else:
print(blob.name)
blob_list=blob_list+ ','+ blob.name
try:
with open("/home/airflow/gcs/data/blob_list.txt", "wt") as fout:
fout.write(blob_list)
except Exception as e:
print(e)
return data
with models.DAG(
# The id you will see in the DAG airflow page
"raw_to_cleansed_example_4feb",
default_args=default_args,
# The interval with which to schedule the DAG
schedule_interval=datetime.timedelta(days=1), # Override to match your needs
) as dag:
start = dummy_operator.DummyOperator(
task_id='start',
trigger_rule='all_success'
)
end = dummy_operator.DummyOperator(
task_id='end',
trigger_rule='all_success'
)
def dataflow_call(filename,i):
today = date.today()
return DataflowTemplateOperator(
# The task id of your job
task_id='dataflow_dynamic_{}'.format(str(i)),
template="gs://mystoragebucket/template/dataflowTemplate",
parameters={
"input_file":'gs://' + "{{ task_instance.xcom_pull(task_ids='get_file_name',key='Source_Raw_Bucket_Name') }}" + '/' + filename,
"delimiter":"{{ task_instance.xcom_pull(task_ids='get_file_name',key='delimiter') }}",
"no_of_cols":"{{ task_instance.xcom_pull(task_ids='get_file_name',key='cols') }}",
"p_output_file":"{{ task_instance.xcom_pull(task_ids='get_file_name',key='p_output_file') }}",
"b_output_file":"{{ task_instance.xcom_pull(task_ids='get_file_name',key='b_output_file') }}",
},
)
py_op = PythonOperator(
task_id='get_file_name',
provide_context=True,
python_callable=read_config)
start >> py_op
read_text_file('/home/airflow/gcs/data/blob_list.txt')
i = 0
for b in list_of_blobs:
py_op >> dataflow_call(b,i) >> end
i = i+1
question from:
https://stackoverflow.com/questions/66058240/tasks-instances-dynamically-created-are-being-marked-as-removedwhen-i-am-dynamic 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…