Bibo Bobo
04/11/2025, 11:43 AM"{namespace}.{layer}-{folder}#csv_all":
type: "${globals:datasets.partitioned_dataset}"
path: data/{layer}/{namespace}/{folder}
dataset:
type: "${globals:datasets.pandas_csv}"
"{namespace}.{layer}-{filename}#single_csv":
type: "${globals:datasets.pandas_csv}"
filepath: data/{layer}/{namespace}/{filename}.csv
And in pipeline definitions I can have either something like this
pipeline(
[
node(
func=do_stuff,
inputs=[
# other params
"05_model_input-folder_name#csv_all",
],
outputs="some_output",
)
],
namespace="some_namespace",
)
Or something like this depending on whether I want to make a test run on fraction of the data or on the full dataset
pipeline(
[
node(
func=do_stuff,
inputs=[
# other params
"05_model_input-filename#single_csv",
],
outputs="some_output",
)
],
namespace="some_namespace",
)
And I want to have a configuration in yaml where I can easily change the type of the dataset that is used in the pipeline.
Ideally I would like to have a single config from which I can set all the parameters that are used in the pipeline. And have something like this as a result
pipeline(
[
node(
func=do_stuff,
inputs=[
# other params
"dataset",
],
outputs="some_output",
)
],
namespace="some_namespace",
)
I see that when you create pipelines using Kedro cli it creates function with this signature def create_pipeline(**kwargs) -> Pipeline:
so I assume there is way to provide params and have something like this
def create_pipeline(**kwargs) -> Pipeline:
return pipeline(
[
node(
func=do_stuff,
inputs=[
# other params
kwargs.get("dataset"),
],
outputs="some_output",
)
],
namespace="some_namespace",
)
But I am not sure how to do it in a right way. I have several pipelines like this and want all of them to be dynamic like this. Should I change the default logic in pipeline_registry.py
and pass those kwargs from there or is there a more simple way to achieve something like this?Chris Schopp
04/11/2025, 2:29 PMbase
environment but the contents/values can be different.Dmitry Sorokin
04/11/2025, 2:54 PMafter_catalog_created()
hook to add a new dataset to your data catalog based on your parameter choice:
catalog.add("dataset", catalog._get_dataset(dataset_choice))
Bibo Bobo
04/11/2025, 4:02 PMBibo Bobo
04/11/2025, 4:02 PMdataset_choice
should come from?Bibo Bobo
04/11/2025, 4:09 PMfind_pipelines
as a reference and created a custom function that finds create_pipeline
functions in my project, loads the configuration like described in here, then takes the pipeline params like this conf_loader["parameters"].get(pipeline_name, {}).get("pipeline_params", {})
And then calls create_pipeline
with those params.
And it seems to be doing what I wanted but it still feels like a hack plus I am not sure if I am supposed to use OmegaConfigLoader
from the pipeline_registry.py
since I found that it updates some inner state of the context (or session, not sure which one of them).Dmitry Sorokin
04/11/2025, 4:18 PMparameters.yml
just choose one of 2 datasets
dataset_choice: "dataset1"
inside of the catalog.yml
you have 2 datasets:
dataset1:
type: pandas.CSVDataset
filepath: data/01_raw/dataset1.csv
dataset2:
type: pandas.CSVDataset
filepath: data/01_raw/dataset2.csv
then inside of the hook you read parameters:
directly from file:
with open("conf/base/parameters.yml") as f:
params = yaml.safe_load(f)
dataset_choice = params.get("dataset_choice")
or take them from catalog:
dataset_choice = catalog.load("params:dataset_choice")
and then update catalog
catalog.add("dataset", catalog._get_dataset(dataset_choice))
Bibo Bobo
04/11/2025, 4:20 PMDmitry Sorokin
04/11/2025, 4:21 PM