Guys what is the right way to create pipelines wit...
# questions
b
Guys what is the right way to create pipelines with dynamic inputs? I mean the following. For example if I have a pipeline that takes dataset (defined in data catalog) and some parameters. I would like to be able to switch the dataset from the cofings somehow without touching the definition of the pipeline itself. For example if I have those datasets define in catalog
Copy code
"{namespace}.{layer}-{folder}#csv_all":
  type: "${globals:datasets.partitioned_dataset}"
  path: data/{layer}/{namespace}/{folder}
  dataset:
    type: "${globals:datasets.pandas_csv}"

"{namespace}.{layer}-{filename}#single_csv":
  type: "${globals:datasets.pandas_csv}"
  filepath: data/{layer}/{namespace}/{filename}.csv
And in pipeline definitions I can have either something like this
Copy code
pipeline(
        [
            node(
                func=do_stuff,
                inputs=[
                    # other params
                    "05_model_input-folder_name#csv_all",
                ],
                outputs="some_output",
            )
        ],
    namespace="some_namespace",
)
Or something like this depending on whether I want to make a test run on fraction of the data or on the full dataset
Copy code
pipeline(
        [
            node(
                func=do_stuff,
                inputs=[
                    # other params
                    "05_model_input-filename#single_csv",
                ],
                outputs="some_output",
            )
        ],
    namespace="some_namespace",
)
And I want to have a configuration in yaml where I can easily change the type of the dataset that is used in the pipeline. Ideally I would like to have a single config from which I can set all the parameters that are used in the pipeline. And have something like this as a result
Copy code
pipeline(
    [
        node(
            func=do_stuff,
            inputs=[
                # other params
                "dataset",
            ],
            outputs="some_output",
        )
    ],
    namespace="some_namespace",
)
I see that when you create pipelines using Kedro cli it creates function with this signature
def create_pipeline(**kwargs) -> Pipeline:
so I assume there is way to provide params and have something like this
Copy code
def create_pipeline(**kwargs) -> Pipeline:
    return pipeline(
        [
            node(
                func=do_stuff,
                inputs=[
                    # other params
                    kwargs.get("dataset"),
                ],
                outputs="some_output",
            )
        ],
        namespace="some_namespace",
    )
But I am not sure how to do it in a right way. I have several pipelines like this and want all of them to be dynamic like this. Should I change the default logic in
pipeline_registry.py
and pass those kwargs from there or is there a more simple way to achieve something like this?
c
Are configuration environments what you're looking for? In each environment you can use different datasets and parameters. The names of the datasets/parameters must match the
base
environment but the contents/values can be different.
d
hi @Bibo Bobo, you can do it with hooks. For example, you can use the
after_catalog_created()
hook to add a new dataset to your data catalog based on your parameter choice:
catalog.add("dataset", catalog._get_dataset(dataset_choice))
b
@Chris Schopp Thank you for suggestion but I don't think that this is what I'm looking for. If I understand it right this will result in even more configuration sources while I try to reduce the amount of the configs and ideally I would like to have 1 main configuration for each pipeline where I can just define what params/datasets/other things I want for this pipeline.
@Dmitry Sorokin Thanks for suggestion, am I getting it right that you suggest to have some placeholder dataset names (to use them as inputs for the pipeline) and on hook call we will set the value for those placeholders using existing datasets from the catalog? If this is the case I think it looks better than what I ended up with for now. The only thing I am not sure about is where
dataset_choice
should come from?
For now I took the
find_pipelines
as a reference and created a custom function that finds
create_pipeline
functions in my project, loads the configuration like described in here, then takes the pipeline params like this
conf_loader["parameters"].get(pipeline_name, {}).get("pipeline_params", {})
And then calls
create_pipeline
with those params. And it seems to be doing what I wanted but it still feels like a hack plus I am not sure if I am supposed to use
OmegaConfigLoader
from the
pipeline_registry.py
since I found that it updates some inner state of the context (or session, not sure which one of them).
👍 1
d
inside of
parameters.yml
just choose one of 2 datasets
dataset_choice: "dataset1"
inside of the
catalog.yml
you have 2 datasets:
dataset1:
type: pandas.CSVDataset
filepath: data/01_raw/dataset1.csv
dataset2:
type: pandas.CSVDataset
filepath: data/01_raw/dataset2.csv
then inside of the hook you read parameters: directly from file:
with open("conf/base/parameters.yml") as f:
params = yaml.safe_load(f)
dataset_choice = params.get("dataset_choice")
or take them from catalog:
dataset_choice = catalog.load("params:dataset_choice")
and then update catalog
catalog.add("dataset", catalog._get_dataset(dataset_choice))
👍 1
b
Got it, sounds promising and more clean than my solution. I'll try it, thank you!
👍 1
d
thanks, happy that it was helpful, but I think your current solution is also good
👍 1