https://kedro.org/ logo
#questions
Title
# questions
d

Dawid Bugajny

05/08/2023, 10:34 AM
Hello! I would like to ask if there is any way to create multiple pipelines in a single directory. I want to have 2 pipelines with the same nodes, but with different outputs. If I create a function that is not called "create_pipeline", but for example "create_pipeline_2" in the same directory, it will not be found by the find_pipelines() function (kedro.framework.project). I have seen modular pipelines, but I would still have to create a new directory for pipelines (correct me if I am wrong)
j

Juan Luis

05/08/2023, 10:45 AM
hi @Dawid Bugajny! the default logic in
pipeline_registry.py
uses
kedro.framework.project.find_pipelines
, which has some hardcoded logic on how the pipelines are supposed to be found: https://github.com/kedro-org/kedro/blob/653145771fa440a46b36d2c96c57a5997613da5e/kedro/framework/project/__init__.py#L356-L369 in the absence of a better idea, my initial suggestion would be that you can implement your own pipeline discovery logic that follows your convention, and modify
pipeline_registry.py
accordingly
d

Dawid Bugajny

05/08/2023, 10:49 AM
I thought about this and wondered if this would be the correct way, thank you
āœ”ļø 1
m

marrrcin

05/08/2023, 11:43 AM
If you have as little as 2-3 pipelines, I think that implementing the whole discovery logic is a little bit of an overkill here, just create instances of the pipelines in
register_pipelines
directly šŸ¤”
šŸ‘ 1
šŸ‘šŸ¼ 1
j

Jordan

05/09/2023, 1:41 PM
@Dawid Bugajny It's not the prettiest looking code, but here's what I did with a recent project:
Copy code
def create_analysis_pipeline(**kwargs) -> Pipeline:
    return pipeline(
        [
            node(
                func=calculate_raps,
                inputs="image",
                outputs=["wave_vectors", "fourier_amplitudes"],
                name="calculate_raps",
            ),
            node(
                func=lambda x: x,
                inputs="fourier_amplitudes",
                outputs="output_amplitudes",
                name="output_amplitudes",
            ),
            node(
                func=calculate_total_power,
                inputs="fourier_amplitudes",
                outputs="total_power",
                name="calculate_total_power",
            ),
            node(
                func=fourier_transform_image,
                inputs="image",
                outputs="image_ft",
                name="fourier_transform_image",
            ),
            node(
                func=plot_analysis,
                inputs=["image", "image_ft", "wave_vectors", "fourier_amplitudes"],
                outputs="analysis_figure",
                name="plot_analysis",
            ),
        ]
    )


def create_pipeline(**kwargs) -> Pipeline:
    labels = ["original", "reconstructed"]
    analysis_pipelines = [
        pipeline(
            pipe=create_analysis_pipeline(),
            inputs={"image": f"{label}_image"},
            outputs={
                "total_power": f"{label}_total_power",
                "analysis_figure": f"{label}_analysis_figure",
                "output_amplitudes": f"{label}_fourier_amplitudes",
            },
            namespace=f"{label}_analysis",
        )
        for label in labels
    ]
    return sum(analysis_pipelines)
Here we have multiple instances of the same pipeline being created from one
pipeline.py
file. Although in this case, I have different inputs as well as outputs. I got the idea from @datajoelyā€™s modular spaceflights example here: https://github.com/datajoely/modular-spaceflights/blob/main/src/modular_spaceflights/pipelines/modelling/pipeline.py
kedroid 1
3 Views