Fazil B. Topal
07/27/2023, 8:50 AMhooks
to be nice but without high level order of execution, not sure if i can do what i want.
Context: I am trying to play around with the data versioning to change it a bit since I would run each nodes in a different k8s pod ideally. That means, dataset versioning should match. From what i gather Session
class has this info but Im trying to find a proper how to make sure same code version + some envs ends up using the same data versioning etc.
Any help is appreciated 🙂datajoely
07/27/2023, 9:06 AMFazil B. Topal
07/27/2023, 9:16 AMdatajoely
07/27/2023, 9:42 AMFazil B. Topal
07/27/2023, 10:27 AMNok Lam Chan
07/27/2023, 4:29 PMFazil B. Topal
07/27/2023, 7:12 PMNok Lam Chan
07/27/2023, 8:33 PMmeharji arumilli
07/28/2023, 9:35 AMFazil B. Topal
07/28/2023, 9:40 AMmeharji arumilli
07/28/2023, 9:44 AMNok Lam Chan
07/28/2023, 12:50 PMmeharji arumilli
07/28/2023, 12:57 PMfrom collections import defaultdict
from pathlib import Path
from airflow import DAG
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.version import version
from datetime import datetime, timedelta
from kedro.framework.session import KedroSession
from kedro.framework.project import configure_project
from kedro.framework.hooks import _create_timestamp_version
fixed_version = datetime.now().strftime('%Y%m%d-%H%M%S')
class KedroOperator(BaseOperator):
@apply_defaults
def __init__(
self,
package_name: str,
pipeline_name: str,
node_name: str,
project_path: str,
env: str,
run_id: str,
*args, **kwargs
) -> None:
super().__init__(*args, **kwargs)
self.package_name = package_name
self.pipeline_name = pipeline_name
self.node_name = node_name
self.project_path = project_path
self.env = env
self.run_id = run_id
def execute(self, context):
configure_project(self.package_name)
with KedroSession.create(self.package_name,
self.project_path,
env=self.env) as session:
session.run(self.pipeline_name, node_names=[self.node_name])
# Kedro settings required to run your pipeline
env = "local"
pipeline_name = "__default__"
project_path = Path.cwd()
package_name = "test_fi"
# Default settings applied to all tasks
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
# Using a DAG context manager, you don't have to specify the dag property of each task
with DAG(
"test-fi",
start_date=datetime(2019, 1, 1),
max_active_runs=3,
schedule_interval=timedelta(minutes=30), # <https://airflow.apache.org/docs/stable/scheduler.html#dag-runs>
default_args=default_args,
catchup=False # enable if you don't want historical dag runs to run
) as dag:
tasks = {}
tasks["define-project-parameters"] = KedroOperator(
task_id="define-project-parameters",
package_name=package_name,
pipeline_name=pipeline_name,
node_name="define_project_parameters",
project_path=project_path,
env=env,
run_id=_create_timestamp_version(fixed_version)
)
tasks["preprocess-epc"] = KedroOperator(
task_id="preprocess-test",
package_name=package_name,
pipeline_name=pipeline_name,
node_name="preprocess_test",
project_path=project_path,
env=env,
run_id=_create_timestamp_version(fixed_version)
)
run_id=_create_timestamp_version(fixed_version)
to be same across all tasks work? haven't tested yetNok Lam Chan
07/28/2023, 1:06 PMKedroSession
datajoely
07/28/2023, 1:28 PMmeharji arumilli
07/28/2023, 1:30 PMFazil B. Topal
07/28/2023, 2:01 PMNok Lam Chan
07/28/2023, 2:04 PMKedroSession.create
rather than constructing KedroSession()
directly.session = KedroSession.create()
session.session_id = <your_session_id>
meharji arumilli
07/28/2023, 2:18 PMclass MyTemplatedConfigLoader(TemplatedConfigLoader):
def __init__(self, conf_source, env, runtime_params):
self.params = os.environ
self._update_model_version_to_parameters()
super().__init__(conf_source=conf_source, env=env, runtime_params=runtime_params, globals_dict=self.params)
def _update_model_version_to_parameters(self):
if 'model_version' not in self.params.keys():
version = datetime.now().strftime('%Y%m%d-%H%M%S')
self.params['model_version'] = version
print(f'Model version {version}')
This is my settings.py file in kedro, where it assigns the timestamp as model version and is used as $model_version variable in catalog to save the outputs in the $model_version path.session = KedroSession.create()
session.session_id = <your_session_id>
self.params['model_version'] = version
in the above class MyTemplatedConfigLoader. When it is run as a kedro project in IDE or as docker image, all the nodes have one version. Whereas it creates a new version for every node when it is run in airflow. Is there a simple hack i might be overseeing!!from collections import defaultdict
import os
from pathlib import Path
from datetime import datetime
from airflow import DAG
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.version import version
from datetime import datetime, timedelta
from kedro.framework.session import KedroSession
from kedro.framework.project import configure_project
import logging
from datetime import datetime
from kedro.config import TemplatedConfigLoader
class MyTemplatedConfigLoader(TemplatedConfigLoader):
def __init__(self, conf_source, env, runtime_params):
self.params = os.environ
self._update_model_version_to_parameters()
super().__init__(conf_source=conf_source, env=env, runtime_params=runtime_params, globals_dict=self.params)
def _update_model_version_to_parameters(self):
if 'model_version' not in self.params.keys():
version = datetime.now().strftime'%Y%m%d-%H%M%S')
self.params['model_version'] = version
print(f'Model version {version}')
# Create a single instance of MyTemplatedConfigLoader outside the task functions
config_loader = MyTemplatedConfigLoader(conf_source=None, env=None, runtime_params=None)
class KedroOperator(BaseOperator):
@apply_defaults
def __init__(
self,
package_name: str,
pipeline_name: str,
node_name: str,
project_path: str,
env: str,
*args, **kwargs
) -> None:
super().__init__(*args, **kwargs)
self.package_name = package_name
self.pipeline_name = pipeline_name
self.node_name = node_name
self.project_path = project_path
self.env = env
def execute(self, context):
configure_project(self.package_name)
model_version = config_loader.params['model_version']
<http://logging.info|logging.info>(f"Executing task {self.task_id}, using model version: {model_version}")
with KedroSession.create(
self.package_name, self.project_path, env=self.env, extra_params={'model_version': model_version}
) as session:
session.run(self.pipeline_name, node_names=[self.node_name])
i played around like this, yet no luck, each task still gets a new timestamp as model_version