meharji arumilli
08/02/2023, 9:12 AMclass MyTemplatedConfigLoader(TemplatedConfigLoader):
def __init__(self, conf_source, env, runtime_params):
os.environ["model_version"] = datetime.now().strftime('%Y%m%d-%H%M%S')
self.params = os.environ
super().__init__(conf_source=conf_source, env=env, runtime_params=runtime_params, globals_dict=self.params)
CONFIG_LOADER_CLASS = MyTemplatedConfigLoader
This generates a unique model_version when the project is run in kedro. This model_version
is used in the filepaths in Catalog to save the outputs from different nodes.
However, when this kedro project is packaged and run in airflow, each node is generating
a new model_version which causes the subsequent nodes to fail as it expects the output (file path with model_version)
from previous node as input.
Does anyone working with kedro and airflow offer a hack for this to keep the model_version unique across all nodes or tasks in airflow?model_version
is sent to KedroOperator by params={"model_version": model_version}
. This can be accessed in Kedro catalog.yml configuration file in Jinja template style {{ params.model_version }} . I hope this helps and saves time for soemone.
with DAG(
"test-fi",
start_date=datetime(2023, 1, 1),
max_active_runs=3,
schedule_interval=timedelta(days=30),
default_args=default_args,
catchup=False
) as dag:
tasks = {}
def create_model_version(**kwargs):
model_version = datetime.now().strftime('%Y%m%d-%H%M%S')
kwargs['ti'].xcom_push(key='model_version', value=model_version)
def define_project_parameters_task(**kwargs):
ti = kwargs['ti']
model_version = ti.xcom_pull(task_ids='create_model_version', key='model_version')
print(f'Model version {model_version}')
# Use the model_version in your KedroOperator configuration
define_project_parameters_task = KedroOperator(
task_id="define-project-parameters",
package_name=package_name,
pipeline_name=pipeline_name,
node_name="define_project_parameters",
project_path=project_path,
env=env,
params={"model_version": model_version} # Pass the model_version as a parameter
)
Nok Lam Chan
08/03/2023, 4:39 PMmeharji arumilli
08/03/2023, 9:45 PMclass KedroOperator(BaseOperator):
@apply_defaults
def __init__(
self,
package_name: str,
pipeline_name: str,
node_name: str,
project_path: str,
env: str,
extra_params: dict,
*args, **kwargs
) -> None:
super().__init__(*args, **kwargs)
self.package_name = package_name
self.pipeline_name = pipeline_name
self.node_name = node_name
self.project_path = project_path
self.env = env
self.extra_params = extra_params
def execute(self, context):
configure_project(self.package_name)
with KedroSession.create(self.package_name,
self.project_path,
env=self.env, extra_params=self.extra_params) as session:
session.run(self.pipeline_name, node_names=[self.node_name])
.....
.....
tasks = {}
def create_model_version(**kwargs):
model_version = datetime.now().strftime('%Y%m%d-%H%M%S')
kwargs['ti'].xcom_push(key='model_version', value=model_version)
def define_project_parameters_task(**kwargs):
ti = kwargs['ti']
define_project_parameters_task = KedroOperator(
task_id="define-project-parameters",
package_name=package_name,
pipeline_name=pipeline_name,
node_name="define_project_parameters",
project_path=project_path,
env=env,
extra_params={"model_version": "{{ ti.xcom_pull(task_ids='create_model_version', key='model_version') }}"} # Pass the model_version as a parameter
)
define_project_parameters_task.execute(context=kwargs)
I applied the above where extra_params=self.extra_params
dict with mode_version item is sent to KedroSession.create
. Yet it fails File "/home/airflow/.local/lib/python3.8/site-packages/kedro/config/templated_config.py", line 222, in _format_string
raise ValueError(
ValueError: Failed to format pattern '${model_version}': no config value found, no default provided
2023-08-04 08:43:30,700 - airflow.task.task_runner.standard_task_runner.StandardTaskRunner - ERROR - Failed to execute job 841 for task define_project_parameters (Failed to format pattern '${model_version}': no config value found, no default provided; 63)
.class MyTemplatedConfigLoader(TemplatedConfigLoader):
def __init__(self, conf_source, env, runtime_params):
self.params = os.environ
super().__init__(conf_source=conf_source, env=env, runtime_params=runtime_params, globals_dict=self.params)
I think, the model_version
should be available in self.params global dictionary in `TemplatedConfigLoader`class. Instead, the extra_params=self.extra_params
sent in KedroSession.create
is used as runtime_params (if im correct). Seems getting closer, does this give any hint to achieve this?Nok Lam Chan
08/04/2023, 10:25 AMFazil B. Topal
08/04/2023, 10:28 AMmeharji arumilli
08/04/2023, 1:10 PM