New question: I would like to make a modular pipel...
# questions
m
New question: I would like to make a modular pipeline for a single list of parameter values. The items in the list are user defined so the length is variable and would be filled in via the
params.yaml
file. I see in the docs that I can invoke my modular pipeline with the example
Copy code
alpha_pipeline = pipeline(
    pipe=template_pipeline,
    inputs={"input1", "input2"},
    parameters={"params:override_me": "params:alpha"},
    namespace="alpha",
)

beta_pipeline = pipeline(
    pipe=template_pipeline,
    inputs={"input1", "input2"},
    parameters={"params:override_me": "params:beta"},
    namespace="beta",
)
etc..
Is there a way I could do this in a loop but where the list is read in from the
params.yaml
file? So something like the loop below:
Copy code
variable_parameter_list = read_in_config_yaml_somehow()
pipes = []
for value in variable_parameter_list:
   pipes.append(pipeline(
    pipe=template_pipeline,
    inputs={"input1", "input2"},
    parameters={"params:override_me": "params:value"},
    namespace=value,
))
d
m
So far this looks like exactly what I needed. Thank you!
m
This blogpost should be an autoresponder 😄
💯 1
m
Coming back to this question: So far, the dynamic pipelines have worked really well and I have created N modular pipelines for the N user defined parameters. I would like to build a downstream pipeline that merges some of the outputs from the prior pipeline together. But I don't see an example where this is done in the modular pipelines docs or in the above article. Creating a list of model identifiers from the dynamic pipeline output is a simple example. Is there an elegant way to do this?
Alright here is a working way to merge assets that I put together that I think works with the kedro project lifecycle. Still curious to see if there are better approaches though to me this feels like a satisfying way to handle the problem.
Copy code
from kedro.pipeline import node, Pipeline
from kedro.pipeline.modular_pipeline import pipeline

from ikea_pipeline import settings


def create_pipeline(**kwargs) -> Pipeline:

    namespaces = [
        f"{namespace}.{variant}"
        for namespace, variants in settings.DYNAMIC_PIPELINES_MAPPING.items()
        for variant in variants
    ]
    deployments = [f"{i}.deployment_id" for i in namespaces]

    nodes = [
        node(
            name="merge_deployments",
            func=lambda *args: {"deployments": list(args)},
            inputs=[f"{i}.deployment_id" for i in namespaces],
            outputs="deployment_ids",
        ),
    ]
    pipeline_inst = pipeline(nodes)

    return pipeline(
        pipeline_inst,
        inputs=deployments,
        namespace="merge_dynamic_pipeline_reprex",
    )
👍 1
m
I would do the same @Marshall Krassenstein 👍