Emilio Gagliardi
08/26/2023, 10:51 PMDeepyaman Datta
08/27/2023, 3:32 AMLodewic van Twillert
08/27/2023, 9:17 PMEmilio Gagliardi
08/28/2023, 6:11 PMoutputs={"grilled_veg": "lunch_food"}
and a colon separates the datasets...what does this syntax mean?
template_pipeline = pipeline(
[
node(
func=node_func1,
inputs=["input1", "input2", "params:override_me"],
outputs="intermediary_output",
),
node(
func=node_func2,
inputs="intermediary_output",
outputs="output", <-- where does this go?
),
]
)
alpha_pipeline = pipeline(
pipe=template_pipeline,
inputs={"input1", "input2"}, <- there is no outputs here
parameters={"params:override_me": "params:alpha"},
namespace="alpha",
)
beta_pipeline = pipeline(
pipe=template_pipeline,
inputs={"input1", "input2"}, <- there is no outputs here
parameters={"params:override_me": "params:beta"},
namespace="beta",
)
final_pipeline = alpha_pipeline + beta_pipeline
Lodewic van Twillert
08/29/2023, 6:26 PMcreate_pipeline()
function in your pipeline.py file.
If we can get that working for you, then we can think about how to structure your ideal code later.
Then you said, if you have a preprocessing pipeline, create that pipeline and then add an instance for each dataset below the definition of the preprocessing pipeline.
• Yes! Lets say we define your pre-processing pipeline and call it base_pipeline
. Then, we re-use that base_pipeline
multiple times, each time with a new namespace=
Lets use a little simpler version of your example:
base_pipeline= pipeline([
node(func=node_func1, inputs=["input1", "input2", "params:my_param"], outputs="intermediary_output"),
node(func=node_func2, inputs="intermediary_output", outputs="output")
])
Then, like you said, we create new instances of this base_pipeline
, by using namespaces. And this is where it gets tricky, but once it clicks you can do so many tricks! You construct new instances, but they still have the same nodes as your base_pipeline
- not sure if constructor is the right word.. lets go through an example:
1. use namespaces
2. create each pipeline separately without namespaces
# create a namespaced version of base_pipeline
pipeline_dataset1 = pipeline(pipe=base_pipeline, namespace="dataset1")
pipeline_dataset2 = pipeline(pipe=base_pipeline, namespace="dataset2")
# these are the exact same pipeline as without namespace
pipeline_dataset1_without_namespace = pipeline([
node(func=node_func1, inputs=["dataset1.input1", "dataset1.input2", "params:dataset1.my_param"], outputs="dataset1.intermediary_output"),
node(func=node_func2, inputs="dataset1.intermediary_output", outputs="dataset1.output")
])
pipeline_dataset2_without_namespace = pipeline([
node(func=node_func1, inputs=["dataset2.input1", "dataset2.input2", "params:dataset2.my_param"], outputs="dataset2.intermediary_output"),
node(func=node_func2, inputs="dataset2.intermediary_output", outputs="dataset2.output")
])
It's important to not that ALL inputs and outputs now have a dataset1.
prefix - this affects both parameters and dataset names. Also note how much less duplication you will have when using namespaces 🙂
In most cases though, you want to also re-use some datatasets. For instance input1
should be the same dataset for both pipeline 1 and 2, we do not want to use dataset1.input1
and dataset2.input1
because they should be the same.
THAT is when you say:
pipeline_dataset1 = pipeline(pipe=base_pipeline, namespace="dataset1", inputs="input1")
# Which effectively results in...
pipeline_dataset1_without_namespace = pipeline([
node(func=node_func1, inputs=["input1", "dataset1.input2", "params:dataset1.my_param"], outputs="dataset1.intermediary_output"), # notice how input1 does not have a prefix anymore.
node(func=node_func2, inputs="dataset1.intermediary_output", outputs="dataset1.output")
])
This is what is shown in the documentation examples in more detail, you do not want to 'override' ALL the datasets and parameters with these namespace prefixes. To handle that you must use inputs=
and outputs=
---
Lastly, , this syntax outputs={"grilled_veg": "lunch_food"}
is a dictionary, that says outputs={"{original_dataset_name}": "{new_dataset_name}"}
- and for the outputs in your nodes, it says that the`grilled_veg` output datasets are now renamed to lunch_food
output datasets.
Using the previous examples, if you do not do anything, then a namespaced pipeline will always have by default: outputs={"{original_dataset_name}": "{namespace}.{original_dataset_name}"}
, for all datasets - e.g. outputs={"input1": "dataset1.input1"}
catalog.yml
? And do they have names there?
How do you want to save the outputs of these pipelines - do you have 7 separate outputs?
I will make the assumption that you have 7 inputs and 7 outputs in your catalog.yml for this next example - because you asked for a more complete example.
First your data catalog..
In the catalog I will create dataset names with a prefix , just like dataset1.
in the previous message. These prefixes will be matching your namespaces
later on! You don't need this, but it will prevent some work if you do this right away in the catalog.
#catalog.yml
##inputs
dataset1.input:
type: pandas.CSVDataSet
filepath: data/01_raw/dataset_1.csv
dataset2.input:
type: pandas.CSVDataSet
filepath: data/01_raw/dataset_2.csv
dataset3.input:
type: pandas.CSVDataSet
filepath: data/01_raw/dataset_3.csv
... repeat for all 7
## outputs
dataset1.output:
type: pandas.CSVDataSet
filepath: data/02_primary/output_1.csv
dataset2.output:
type: pandas.CSVDataSet
filepath: data/02_primary/output_2.csv
dataset3.output:
type: pandas.CSVDataSet
filepath: data/02_primary/output_3.csv
Then in your pipeline.py
def remove_missing_values(df):
"""drop missing values"""
return df.dropna()
def describe_dataset(df):
"""describe dataset"""
return df.describe()
def create_pipeline():
base_pipeline= pipeline([
node(func=remove_missing_values, inputs="input", outputs="intermediary_output"),
node(func=describe_dataset, inputs="intermediary_output", outputs="output")
])
# create namespaces for each dataset, using the prefixes used in your catalog.yml
namespaces = ["dataset1", "dataset2", "dataset3", "dataset4", "dataset5", "dataset6", "dataset7"]
pipeline_list = []
for namespace in namespaces:
namespaced_pipeline = pipeline(
pipe=base_pipeline,
namespace=namespace
)
pipeline_list.append(namespaced_pipeline)
output_pipeline = sum(pipeline_list)
return output_pipeline
In this example, we do not require to rename any ìnputs=
, outputs=
or even parameters=
because each pipeline is entirely separate. If you use kedro viz
on this, you will get the following:dataset2.output
was now matched to the entry in our catalog 🙂
2. You can unfold the namespaced pipeline to see that indeed our 2 processing steps are in there, for each datasetEmilio Gagliardi
08/29/2023, 6:59 PMLodewic van Twillert
08/29/2023, 7:00 PMDeepyaman Datta
08/30/2023, 4:20 AMwhat are free inputs/outputs?@Emilio Gagliardi re this, the note has a definition of free inputs/outputs in the latest docs https://docs.kedro.org/en/stable/nodes_and_pipelines/modular_pipelines.html#using-a-modular-pipeline-multiple-times I remember adding this a month ago, because it was missing until another user pointed it out; just make sure you're not on an old version of the docs, just in case!