Also, I was wondering if someone could explain mod...
# questions
e
Also, I was wondering if someone could explain modular pipelines? I think I need to create a couple, but I'm not sure if I get it. Right now, I have a pipeline where I repeat the same 3 steps over and over in nodes because I have a number of different datasets I want to perform those steps on. GPT seems to think I can create a pipeline with parameterized node definitions and I can create entries in /base/parameters.yml that specify the datasets I want to pass to a pipeline, but I'm not sure if that's right. and now I want to create a keyword extraction pipeline that I want to apply to all my datasets without having to duplicate all the nodes. Is there any example of this working I can look at? Any advice is greatly appreciated!
🤔 1
d
Have you looked at the Kedro docs for modular pipelines?
👍 1
There are examples and everything.
Kedro docs, by and large, are very good. If you run into issues following the docs, please share what you've done, and somebody can help take a look. Although what GPT says in this case is not necessarily wrong, I don't think it really makes sense for people to try and verify what it suggests, because we can't control its output.
👍 1
TL;DR, duplicate part of your pipelines using namespaces
e
ok, I'll see what I can figure out. cheers,
@Lodewic van Twillert thank you, I read the documentation but I'm still unclear what is going on - I didn't find the example helpful because its just pieces of code without a full coded example. what are free inputs/outputs? do I create a separate pipeline for each dataset I want to pass through the modular pipeline? I have 7 datasets I want to run through a pipeline. Do the instance pipelines have nodes? I want to reuse the nodes in a "preprocessing" pipeline on multiple datasets, so I don't understand how to build the instance pipelines for the data I want to pass through it. Any further clarification you can provide would be helpful.
From this code, it seems like I'm supposed to define all the different instances in the same pipeline.py file? Is that correct? So if I have a preprocessing pipeline, I create that pipeline and then add an instance for each dataset below the definition of the preprocessing pipeline. And each of those instance pipelines don't have any nodes, they are just constructors. Is that correct? lastly, I noticed that the outputs use curly braces instead of square braces
outputs={"grilled_veg": "lunch_food"}
and a colon separates the datasets...what does this syntax mean?
Copy code
template_pipeline = pipeline(
    [
        node(
            func=node_func1,
            inputs=["input1", "input2", "params:override_me"],
            outputs="intermediary_output",
        ),
        node(
            func=node_func2,
            inputs="intermediary_output",
            outputs="output", <-- where does this go?
        ),
    ]
)

alpha_pipeline = pipeline(
    pipe=template_pipeline,
    inputs={"input1", "input2"}, <- there is no outputs here
    parameters={"params:override_me": "params:alpha"},
    namespace="alpha",
)

beta_pipeline = pipeline(
    pipe=template_pipeline,
    inputs={"input1", "input2"}, <- there is no outputs here
    parameters={"params:override_me": "params:beta"},
    namespace="beta",
)

final_pipeline = alpha_pipeline + beta_pipeline
l
Hi @Emilio Gagliardi , to be clear I am not part of Kedro but I am just very fond of the namespaced pipelines so im happy to help but dont control the docs:) Let me break down your questions: First, you asked if you need to define all different instances of your pipeline in the same pipeline.py: • Not necessarily, but if you know exactly which 7 datasets you want to apply your pipeline to then yes let's not overcomplicate it and define your different instances in 1
create_pipeline()
function in your pipeline.py file. If we can get that working for you, then we can think about how to structure your ideal code later. Then you said, if you have a preprocessing pipeline, create that pipeline and then add an instance for each dataset below the definition of the preprocessing pipeline. • Yes! Lets say we define your pre-processing pipeline and call it
base_pipeline
. Then, we re-use that
base_pipeline
multiple times, each time with a new
namespace=
Lets use a little simpler version of your example:
Copy code
base_pipeline= pipeline([
        node(func=node_func1, inputs=["input1", "input2", "params:my_param"], outputs="intermediary_output"),
        node(func=node_func2, inputs="intermediary_output", outputs="output")
])
Then, like you said, we create new instances of this
base_pipeline
, by using namespaces. And this is where it gets tricky, but once it clicks you can do so many tricks! You construct new instances, but they still have the same nodes as your
base_pipeline
- not sure if constructor is the right word.. lets go through an example: 1. use namespaces 2. create each pipeline separately without namespaces
Copy code
# create a namespaced version of base_pipeline
pipeline_dataset1 = pipeline(pipe=base_pipeline, namespace="dataset1")
pipeline_dataset2 = pipeline(pipe=base_pipeline, namespace="dataset2")

# these are the exact same pipeline as without namespace
pipeline_dataset1_without_namespace = pipeline([
        node(func=node_func1, inputs=["dataset1.input1", "dataset1.input2", "params:dataset1.my_param"], outputs="dataset1.intermediary_output"),
        node(func=node_func2, inputs="dataset1.intermediary_output", outputs="dataset1.output")
])

pipeline_dataset2_without_namespace = pipeline([
        node(func=node_func1, inputs=["dataset2.input1", "dataset2.input2", "params:dataset2.my_param"], outputs="dataset2.intermediary_output"),
        node(func=node_func2, inputs="dataset2.intermediary_output", outputs="dataset2.output")
])
It's important to not that ALL inputs and outputs now have a
dataset1.
prefix - this affects both parameters and dataset names. Also note how much less duplication you will have when using namespaces 🙂 In most cases though, you want to also re-use some datatasets. For instance
input1
should be the same dataset for both pipeline 1 and 2, we do not want to use
dataset1.input1
and
dataset2.input1
because they should be the same. THAT is when you say:
Copy code
pipeline_dataset1 = pipeline(pipe=base_pipeline, namespace="dataset1", inputs="input1")

# Which effectively results in...
pipeline_dataset1_without_namespace = pipeline([
        node(func=node_func1, inputs=["input1", "dataset1.input2", "params:dataset1.my_param"], outputs="dataset1.intermediary_output"),  # notice how input1 does not have a prefix anymore.
        node(func=node_func2, inputs="dataset1.intermediary_output", outputs="dataset1.output")
])
This is what is shown in the documentation examples in more detail, you do not want to 'override' ALL the datasets and parameters with these namespace prefixes. To handle that you must use
inputs=
and
outputs=
--- Lastly, , this syntax
outputs={"grilled_veg": "lunch_food"}
is a dictionary, that says
outputs={"{original_dataset_name}": "{new_dataset_name}"}
- and for the outputs in your nodes, it says that the`grilled_veg` output datasets are now renamed to
lunch_food
output datasets. Using the previous examples, if you do not do anything, then a namespaced pipeline will always have by default:
outputs={"{original_dataset_name}": "{namespace}.{original_dataset_name}"}
, for all datasets - e.g.
outputs={"input1": "dataset1.input1"}
❤️ 1
For your 7 datasets, are they all listed in the
catalog.yml
? And do they have names there? How do you want to save the outputs of these pipelines - do you have 7 separate outputs? I will make the assumption that you have 7 inputs and 7 outputs in your catalog.yml for this next example - because you asked for a more complete example. First your data catalog.. In the catalog I will create dataset names with a prefix , just like
dataset1.
in the previous message. These prefixes will be matching your
namespaces
later on! You don't need this, but it will prevent some work if you do this right away in the catalog.
Copy code
#catalog.yml

##inputs
dataset1.input:
  type: pandas.CSVDataSet
  filepath: data/01_raw/dataset_1.csv

dataset2.input:
  type: pandas.CSVDataSet
  filepath: data/01_raw/dataset_2.csv

dataset3.input:
  type: pandas.CSVDataSet
  filepath: data/01_raw/dataset_3.csv

... repeat for all 7

## outputs
dataset1.output:
  type: pandas.CSVDataSet
  filepath: data/02_primary/output_1.csv

dataset2.output:
  type: pandas.CSVDataSet
  filepath: data/02_primary/output_2.csv

dataset3.output:
  type: pandas.CSVDataSet
  filepath: data/02_primary/output_3.csv
Then in your
pipeline.py
Copy code
def remove_missing_values(df):
   """drop missing values"""
   return df.dropna()

def describe_dataset(df):
   """describe dataset"""
   return df.describe()

def create_pipeline():
  base_pipeline= pipeline([
        node(func=remove_missing_values, inputs="input", outputs="intermediary_output"),
        node(func=describe_dataset, inputs="intermediary_output", outputs="output")
  ])

  # create namespaces for each dataset, using the prefixes used in your catalog.yml
  namespaces = ["dataset1", "dataset2", "dataset3", "dataset4", "dataset5", "dataset6", "dataset7"]
  pipeline_list = []
  for namespace in namespaces:
    namespaced_pipeline = pipeline(
      pipe=base_pipeline,
      namespace=namespace
    )
    pipeline_list.append(namespaced_pipeline)

  output_pipeline = sum(pipeline_list)
  return output_pipeline
In this example, we do not require to rename any
ìnputs=
,
outputs=
or even
parameters=
because each pipeline is entirely separate. If you use
kedro viz
on this, you will get the following:
1. Inspect in kedro viz to find that indeed
dataset2.output
was now matched to the entry in our catalog 🙂 2. You can unfold the namespaced pipeline to see that indeed our 2 processing steps are in there, for each dataset
e
@Lodewic van Twillert thank you so much, I did not understand half of that until you laid it out.!! I'm going to go try and get your example working. thanks again!
l
Awesome, let me know how it goes - happy to help 👍
d
Thanks @Lodewic van Twillert once again!
what are free inputs/outputs?
@Emilio Gagliardi re this, the note has a definition of free inputs/outputs in the latest docs https://docs.kedro.org/en/stable/nodes_and_pipelines/modular_pipelines.html#using-a-modular-pipeline-multiple-times I remember adding this a month ago, because it was missing until another user pointed it out; just make sure you're not on an old version of the docs, just in case!