Hi, I have my config loader as below. It mainly as...
# questions
m
Hi, I have my config loader as below. It mainly assings the model_version variable to self.params. The model_version is generated using timestamp.
class MyTemplatedConfigLoader(TemplatedConfigLoader):
def __init__(self, conf_source, env, runtime_params):
os.environ["model_version"] = datetime.now().strftime('%Y%m%d-%H%M%S')
self.params = os.environ
super().__init__(conf_source=conf_source, env=env, runtime_params=runtime_params, globals_dict=self.params)
CONFIG_LOADER_CLASS = MyTemplatedConfigLoader
This generates a unique model_version when the project is run in kedro. This model_version is used in the filepaths in Catalog to save the outputs from different nodes. However, when this kedro project is packaged and run in airflow, each node is generating a new model_version which causes the subsequent nodes to fail as it expects the output (file path with model_version) from previous node as input. Does anyone working with kedro and airflow offer a hack for this to keep the model_version unique across all nodes or tasks in airflow?
Hello, i have found a solution. It works by using Xcom in the DAG. Below is an example: There should be a task that creates a model version and push to xcom. The subsequent tasks pull from Xcom as shown below. And the variable
model_version
is sent to KedroOperator by
params={"model_version": model_version}
. This can be accessed in Kedro catalog.yml configuration file in Jinja template style {{ params.model_version }} . I hope this helps and saves time for soemone.
Copy code
with DAG(
        "test-fi",
        start_date=datetime(2023, 1, 1),
        max_active_runs=3,
        schedule_interval=timedelta(days=30),  
        default_args=default_args,
        catchup=False 
) as dag:
    tasks = {}


    def create_model_version(**kwargs):
        model_version = datetime.now().strftime('%Y%m%d-%H%M%S')
        kwargs['ti'].xcom_push(key='model_version', value=model_version)


    def define_project_parameters_task(**kwargs):
        ti = kwargs['ti']
        model_version = ti.xcom_pull(task_ids='create_model_version', key='model_version')
        print(f'Model version {model_version}')
        # Use the model_version in your KedroOperator configuration
        define_project_parameters_task = KedroOperator(
            task_id="define-project-parameters",
            package_name=package_name,
            pipeline_name=pipeline_name,
            node_name="define_project_parameters",
            project_path=project_path,
            env=env,
            params={"model_version": model_version}  # Pass the model_version as a parameter
        )
@Nok Lam Chan @Fazil B. Topal just tagging to make you aware and it may help!! This is more of airflow configuration but it helps to work with Kedro.
👍🏼 1
n
This is fantastic work and thank you for sharing back your solution
https://github.com/kedro-org/kedro-plugins/issues/293 I think there are something that we can improved, but this is not the current priority. I wrote down an issue to document this solution in case anyone need this.
👍 1
m
I am eager to say that it worked. It has an issue, the model_version is not passed to catalog.yml. I have used {{ params.model_version }} in the catalog path and in fact it saved at { params.model_version }}/data.txt instead of having actual value like 20230803-230000/data.txt. And since the {{ params.model_version }} is constant the airflow pipeline worked. BUt the task of passing the model_version variable to kedro catalog configuration through KedroOperator is not achieved 😞
Copy code
class KedroOperator(BaseOperator):

    @apply_defaults
    def __init__(
            self,
            package_name: str,
            pipeline_name: str,
            node_name: str,
            project_path: str,
            env: str,
            extra_params: dict,
            *args, **kwargs
    ) -> None:
        super().__init__(*args, **kwargs)
        self.package_name = package_name
        self.pipeline_name = pipeline_name
        self.node_name = node_name
        self.project_path = project_path
        self.env = env
        self.extra_params = extra_params

    def execute(self, context):
        configure_project(self.package_name)
        with KedroSession.create(self.package_name,
                                 self.project_path,
                                 env=self.env, extra_params=self.extra_params) as session:
            session.run(self.pipeline_name, node_names=[self.node_name])

.....
.....
    tasks = {}


    def create_model_version(**kwargs):
        model_version = datetime.now().strftime('%Y%m%d-%H%M%S')
        kwargs['ti'].xcom_push(key='model_version', value=model_version)


    def define_project_parameters_task(**kwargs):
        ti = kwargs['ti']
        define_project_parameters_task = KedroOperator(
            task_id="define-project-parameters",
            package_name=package_name,
            pipeline_name=pipeline_name,
            node_name="define_project_parameters",
            project_path=project_path,
            env=env,
            extra_params={"model_version": "{{ ti.xcom_pull(task_ids='create_model_version', key='model_version') }}"}  # Pass the model_version as a parameter
        )

        define_project_parameters_task.execute(context=kwargs)
I applied the above where
extra_params=self.extra_params
dict with mode_version item is sent to
KedroSession.create
. Yet it fails
File "/home/airflow/.local/lib/python3.8/site-packages/kedro/config/templated_config.py", line 222, in _format_string
raise ValueError(
ValueError: Failed to format pattern '${model_version}': no config value found, no default provided
2023-08-04 08:43:30,700 - airflow.task.task_runner.standard_task_runner.StandardTaskRunner - ERROR - Failed to execute job 841 for task define_project_parameters (Failed to format pattern '${model_version}': no config value found, no default provided; 63)
.
Copy code
class MyTemplatedConfigLoader(TemplatedConfigLoader):
    def __init__(self, conf_source, env, runtime_params):
        self.params = os.environ
        super().__init__(conf_source=conf_source, env=env, runtime_params=runtime_params, globals_dict=self.params)
I think, the
model_version
should be available in self.params global dictionary in `TemplatedConfigLoader`class. Instead, the
extra_params=self.extra_params
sent in
KedroSession.create
is used as runtime_params (if im correct). Seems getting closer, does this give any hint to achieve this?
Try this
f
I did similar with omegaconf via custom resolver to refer runtime params in the yaml
👍🏼 1
m