Mohamed El Guendouz
01/31/2025, 1:55 PMHall
01/31/2025, 1:55 PMDmitry Sorokin
01/31/2025, 2:51 PM--group-in-memory
option with the kedro airflow create
command to group in-memory nodes together. However, we are actively working on a more advanced approach that will allow nodes to be grouped automatically based on their namespace. We expect to deliver this feature in February.
Hope this helps!Mohamed El Guendouz
01/31/2025, 3:25 PM# Default Args DAG
default_args = dict(
owner="{{ owner | default('airflow') }}",
depends_on_past={{ depends_on_past | default(False) }},
email_on_failure={{ email_on_failure | default(False) }},
email_on_retry={{ email_on_retry | default(False) }},
retries={{ retries | default(1) }},
retry_delay=timedelta(minutes={{ retry_delay | default(5) }}),
)
with DAG(
dag_id=dag_id,
schedule=CronDataIntervalTimetable("0 1 * * *", timezone="{{ timezone }}"),
default_args=default_args,
) as dag:
# Create cluster
create_cluster = DataprocCreateClusterOperator(
task_id="create_cluster",
project_id=project_id,
cluster_name=cluster_name,
region=region,
cluster_config=cluster_config,
gcp_conn_id=".....",
)
tasks = {
{% for node_name, node_list in nodes.items() %}
"{{ node_name | safe}}": DataprocSubmitJobOperator(
task_id="{{ node_name | safe}}",
project_id=project_id,
region=region,
job={
"reference": {
"job_id": "{{ node_name | safe | slugify }}-job-{{ '{{' }} ts_nodash {{ '}}' }}",
},
"placement": {"cluster_name": cluster_name},
"pyspark_job": {
"python_file_uris": [whl_file],
"main_python_file_uri": script_path,
"args": [
"--pipeline={{ pipeline_name }}",
"--from-nodes={{ node_name | safe }}",
"--to-nodes={{ node_name | safe }}",
"--params process_date={{ '{{' }} data_interval_end | ds {{ '}}' }}",
"--async",
],
"archive_uris": [f"{config_archive}#config"],
},
},
gcp_conn_id="{{ .... }}",
),
{% endfor %}
}
# Delete cluster
delete_cluster = DataprocDeleteClusterOperator(
task_id="delete_cluster",
project_id=project_id,
cluster_name=cluster_name,
region=region,
trigger_rule="all_done",
gcp_conn_id="{{ .... }}",
)
# Dependancies
create_cluster >> list(tasks.values()) >> delete_cluster
{% for parent_node, child_nodes in dependencies.items() -%}
{% for child in child_nodes %} tasks["{{ parent_node | safe }}"] >> tasks["{{ child | safe}}"]
{% endfor %}
{%- endfor %}
As you can see, I define my tasks dynamically within a dictionary, and my dependencies are managed at the end of the template. My goal is to group certain nodes into TaskGroups, but I don't see any parameters automatically passed to the template that would allow me to do this.
I structured my DAG this way because I didnโt know how to do it differently...
Would the upcoming feature you're working on (grouping nodes based on namespace) allow this kind of grouping within the Jinja2 template? Or do you have any suggestions on how I could implement it manually in the meantime?
Thanks again for your help! ๐Mohamed El Guendouz
01/31/2025, 3:35 PMMohamed El Guendouz
01/31/2025, 4:00 PM--from-nodes
and --to-nodes
, I believe that the solution would be to run kedro run -ns <namespace>
for each task, which should resolve my problem.Mohamed El Guendouz
01/31/2025, 4:01 PMMohamed El Guendouz
01/31/2025, 7:04 PMDmitry Sorokin
02/03/2025, 7:48 AMMohamed El guendouz
02/03/2025, 9:43 AM