https://kedro.org/ logo
#questions
Title
# questions
f

Fazil B. Topal

07/27/2023, 8:50 AM
hey all, Is there someway where i can see the high level overview of how kedro functions? I find
hooks
to be nice but without high level order of execution, not sure if i can do what i want. Context: I am trying to play around with the data versioning to change it a bit since I would run each nodes in a different k8s pod ideally. That means, dataset versioning should match. From what i gather
Session
class has this info but Im trying to find a proper how to make sure same code version + some envs ends up using the same data versioning etc. Any help is appreciated 🙂
d

datajoely

07/27/2023, 9:06 AM
This plug-in should do most of this for you if you’re okay introducing Airflow https://github.com/getindata/kedro-airflow-k8s
f

Fazil B. Topal

07/27/2023, 9:16 AM
We prefer to Argo Workflows for this, if there is useful code in there, i'd take a look. Thanks!
d

datajoely

07/27/2023, 9:42 AM
the mechanism should be very transferable, there is also an unmaintained plugin in this space which should help https://github.com/nraw/kedro-argo
f

Fazil B. Topal

07/27/2023, 10:27 AM
yes, i saw that one but that one doesn't provide anything in data versioning. It assumes that path to load file is static which in my case it is not. If I use kedro's versioning, which is using timestamps by default, i need to make sure each node can load the same version. That is what i am trying to fix
n

Nok Lam Chan

07/27/2023, 4:29 PM
This might be relevant. If you run nodes separately you will get different timestamp from each session. Currently there isn’t an easy way to override it. https://github.com/kedro-org/kedro/issues/1731
thankyou 1
f

Fazil B. Topal

07/27/2023, 7:12 PM
yes, looks about right. We were thinking either to not use default versioning and create something on our own or we store session files in remote storage and load them in the next run to override the existing session in kedro. Maybe extend the current Session class to our own needs and invoke the runner by ourselves. From what i gather, there is not a easy way atm.
n

Nok Lam Chan

07/27/2023, 8:33 PM
Please do upvote the issues and leave your comment about what's the problem so we can have discussion among the team and prioritise these issues accordingly.
đź‘Ť 1
I think for now you may have to create this session timestamp as an additional mode and pass them to all the nodes (via templating maybe) to coordinate the version number
m

meharji arumilli

07/28/2023, 9:35 AM
@Fazil B. Topal did u manage to address this issue? im facing the same issue, each node in the airflow creates a new version, and i need it to be fixed for the whole project
f

Fazil B. Topal

07/28/2023, 9:40 AM
I am thinking about ways to do it, meanwhile i saw this PR: https://github.com/kedro-org/kedro/pull/1871/files Looks like the configurable version would come in handy here. I haven't finalized the way to do it but looks like im gonna have to do my own versioning and disable kedro's default timestamp versioning
m

meharji arumilli

07/28/2023, 9:44 AM
Okay. I don't need to have custom format for versioning, only that the kedro default timestamp version need to be same across all nodes
n

Nok Lam Chan

07/28/2023, 12:50 PM
@meharji arumilli This is not possible because they are not running on the same process/machine due to the nature of distributed computing. What we can potentially do though is allow customisation of the session_id, then you can pass your desire id to make sure they are consistent across nodes.
m

meharji arumilli

07/28/2023, 12:57 PM
Copy code
from collections import defaultdict

from pathlib import Path

from airflow import DAG
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.version import version
from datetime import datetime, timedelta

from kedro.framework.session import KedroSession
from kedro.framework.project import configure_project
from kedro.framework.hooks import _create_timestamp_version

fixed_version = datetime.now().strftime('%Y%m%d-%H%M%S')


class KedroOperator(BaseOperator):

    @apply_defaults
    def __init__(
            self,
            package_name: str,
            pipeline_name: str,
            node_name: str,
            project_path: str,
            env: str,
            run_id: str,
            *args, **kwargs
    ) -> None:
        super().__init__(*args, **kwargs)
        self.package_name = package_name
        self.pipeline_name = pipeline_name
        self.node_name = node_name
        self.project_path = project_path
        self.env = env
        self.run_id = run_id

    def execute(self, context):
        configure_project(self.package_name)
        with KedroSession.create(self.package_name,
                                 self.project_path,
                                 env=self.env) as session:
            session.run(self.pipeline_name, node_names=[self.node_name])


# Kedro settings required to run your pipeline
env = "local"
pipeline_name = "__default__"
project_path = Path.cwd()
package_name = "test_fi"

# Default settings applied to all tasks
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

# Using a DAG context manager, you don't have to specify the dag property of each task
with DAG(
        "test-fi",
        start_date=datetime(2019, 1, 1),
        max_active_runs=3,
        schedule_interval=timedelta(minutes=30),  # <https://airflow.apache.org/docs/stable/scheduler.html#dag-runs>
        default_args=default_args,
        catchup=False  # enable if you don't want historical dag runs to run
) as dag:
    tasks = {}

    tasks["define-project-parameters"] = KedroOperator(
        task_id="define-project-parameters",
        package_name=package_name,
        pipeline_name=pipeline_name,
        node_name="define_project_parameters",
        project_path=project_path,
        env=env,
        run_id=_create_timestamp_version(fixed_version)
    )

    tasks["preprocess-epc"] = KedroOperator(
        task_id="preprocess-test",
        package_name=package_name,
        pipeline_name=pipeline_name,
        node_name="preprocess_test",
        project_path=project_path,
        env=env,
        run_id=_create_timestamp_version(fixed_version)
    )
does something like this with
run_id=_create_timestamp_version(fixed_version)
to be same across all tasks work? haven't tested yet
n

Nok Lam Chan

07/28/2023, 1:06 PM
How did this run_id get passed to Kedro?
I don’t think it would work now, because session_id is internal to
KedroSession
d

datajoely

07/28/2023, 1:28 PM
* unless you overrode that
m

meharji arumilli

07/28/2023, 1:30 PM
it didn't work. any hint to set session_id as constant
f

Fazil B. Topal

07/28/2023, 2:01 PM
Session class accepts session_id as param. wouldn't that work? If i create my own SEssion and invoke run via that one?
n

Nok Lam Chan

07/28/2023, 2:04 PM
I am not sure, you are suppose to use the factory method
KedroSession.create
rather than constructing
KedroSession()
directly.
It may work but I am pretty you need a few more hacks.
Or maybe this is enough (I haven’t tested)
Copy code
session = KedroSession.create()
session.session_id = <your_session_id>
m

meharji arumilli

07/28/2023, 2:18 PM
or can we utilize the concept of task groups? that should run under the same context or session, ensuring consistency across tasks.
tasks = {} tasks["task1"] = KedroOperator( task_group="your_task_group", # Specify the task group for this task task_id="task1", package_name=package_name, pipeline_name=pipeline_name, node_name="task1_node", project_path=project_path, env=env, ) tasks["task2"] = KedroOperator( task_group="your_task_group", # Specify the same task group for this task task_id="task2", package_name=package_name, pipeline_name=pipeline_name, node_name="task2_node", project_path=project_path, env=env, )
Copy code
class MyTemplatedConfigLoader(TemplatedConfigLoader):
    def __init__(self, conf_source, env, runtime_params):
        self.params = os.environ
        self._update_model_version_to_parameters()
        super().__init__(conf_source=conf_source, env=env, runtime_params=runtime_params, globals_dict=self.params)

    def _update_model_version_to_parameters(self):
        if 'model_version' not in self.params.keys():
            version = datetime.now().strftime('%Y%m%d-%H%M%S')
            self.params['model_version'] = version
            print(f'Model version {version}')
This is my settings.py file in kedro, where it assigns the timestamp as model version and is used as $model_version variable in catalog to save the outputs in the $model_version path.
Now for dag/airflow version we are supposed to use
Copy code
session = KedroSession.create()
session.session_id = <your_session_id>
My issue is from
self.params['model_version'] = version
in the above class MyTemplatedConfigLoader. When it is run as a kedro project in IDE or as docker image, all the nodes have one version. Whereas it creates a new version for every node when it is run in airflow. Is there a simple hack i might be overseeing!!
Copy code
from collections import defaultdict
import os
from pathlib import Path
from datetime import datetime
from airflow import DAG
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.version import version
from datetime import datetime, timedelta
from kedro.framework.session import KedroSession
from kedro.framework.project import configure_project
import logging
from datetime import datetime
from kedro.config import TemplatedConfigLoader


class MyTemplatedConfigLoader(TemplatedConfigLoader):
    def __init__(self, conf_source, env, runtime_params):
        self.params = os.environ
        self._update_model_version_to_parameters()
        super().__init__(conf_source=conf_source, env=env, runtime_params=runtime_params, globals_dict=self.params)

    def _update_model_version_to_parameters(self):
        if 'model_version' not in self.params.keys():
            version = datetime.now().strftime'%Y%m%d-%H%M%S')
            self.params['model_version'] = version
            print(f'Model version {version}')


# Create a single instance of MyTemplatedConfigLoader outside the task functions
config_loader = MyTemplatedConfigLoader(conf_source=None, env=None, runtime_params=None)



class KedroOperator(BaseOperator):

    @apply_defaults
    def __init__(
            self,
            package_name: str,
            pipeline_name: str,
            node_name: str,
            project_path: str,
            env: str,
            *args, **kwargs
    ) -> None:
        super().__init__(*args, **kwargs)
        self.package_name = package_name
        self.pipeline_name = pipeline_name
        self.node_name = node_name
        self.project_path = project_path
        self.env = env

    def execute(self, context):
        configure_project(self.package_name)
        model_version = config_loader.params['model_version']
        <http://logging.info|logging.info>(f"Executing task {self.task_id}, using model version: {model_version}")
        with KedroSession.create(
                self.package_name, self.project_path, env=self.env, extra_params={'model_version': model_version}
        ) as session:
            session.run(self.pipeline_name, node_names=[self.node_name])
i played around like this, yet no luck, each task still gets a new timestamp as model_version
any help here will be highly appreciated!!
3 Views