I was using modular pipelines to generate multiple...
# questions
a
I was using modular pipelines to generate multiple models. Pipeline 1 (dataprocessing)
Copy code
def create_pipeline(**kwargs) -> Pipeline:
    pipeline_instance = pipeline([
        node(
                func=scale_columns,
                inputs= ["dataset","params:preprocess_options"],
                outputs= ["dataset_scaled","minmaxscaler", "labelencoder"],
                name= "scale_columns_node",
            ), 
    ])

    dp_pipe_one = pipeline(
        pipe = pipeline_instance,
        inputs = "dataset",
        #outputs = "dataset_scaled_one",
        namespace = "one_process_pipeline",
    )

    dp_pipe_two = pipeline(
        pipe = pipeline_instance,
        inputs = "dataset",
        #outputs = "dataset_scaled_two",
        namespace = "two_process_pipeline",
    )
    
    return dp_pipe_one + dp_pipe_two
Pipeline 2 (model training)
Copy code
def create_pipeline(**kwargs) -> Pipeline:
    pipeline_instance = pipeline([
        node(
                func= split_data,
                inputs= ["dataset_scaled","params:split_options"],
                outputs= ["X_train", "X_test", "y_train", "y_test"],
                name= "split_data_node",
            ), 
        node(
                func= train_model,
                inputs= ["X_train","y_train", "params:model_options"],
                outputs= "classifier",
                name= "train_model_node",
            ), 
        node(
                func= test_model,
                inputs= ["classifier", "X_test", "y_test"],
                outputs= None,
                name= "test_model_node",
            ), 
    ])

    ds_pipeline_one = pipeline(
        pipe = pipeline_instance,
        inputs = "dataset_scaled",
        namespace = "one_model_pipeline"
    )
    
    return ds_pipeline_one + ds_pipeline_two
When I run the kedro, following error happens (for pipeline 2):
ValueError: Pipeline input(s) {'dataset_scaled'} not found in the DataCatalog
I have checked catalog and it is not found there.
Copy code
[
    'dataset',
    'parameters',
    'params:two_model_pipeline',
    'params:two_model_pipeline.split_options',
    'params:two_model_pipeline.split_options.test_size',
    'params:two_model_pipeline.split_options.random_state',
    'params:two_model_pipeline.split_options.x_params',
    'params:two_model_pipeline.split_options.y_param',
    'params:two_model_pipeline.model_options',
    'params:two_model_pipeline.model_options.n_estimators',
    'params:two_model_pipeline.model_options.random_state',
    'params:one_model_pipeline',
    'params:one_model_pipeline.split_options',
    'params:one_model_pipeline.split_options.test_size',
    'params:one_model_pipeline.split_options.random_state',
    'params:one_model_pipeline.split_options.x_params',
    'params:one_model_pipeline.split_options.y_param',
    'params:one_model_pipeline.model_options',
    'params:one_model_pipeline.model_options.n_estimators',
    'params:one_model_pipeline.model_options.random_state',
    'params:two_process_pipeline',
    'params:two_process_pipeline.preprocess_options',
    'params:two_process_pipeline.preprocess_options.x_features',
    'params:two_process_pipeline.preprocess_options.y_features',
    'params:one_process_pipeline',
    'params:one_process_pipeline.preprocess_options',
    'params:one_process_pipeline.preprocess_options.x_features',
    'params:one_process_pipeline.preprocess_options.y_features',
]
Here is the log for pipeline 1 (preprocessing)
Copy code
INFO     Loading data from dataset (CSVDataset)...                                                             data_catalog.py:483
                    INFO     Loading data from params:one_process_pipeline.preprocess_options (MemoryDataset)...                   data_catalog.py:483
                    INFO     Running node: scale_columns_node: scale_columns([dataset;params:one_process_pipeline.preprocess_options]) ->  node.py:361
                             [one_process_pipeline.dataset_scaled;one_process_pipeline.minmaxscaler;one_process_pipeline.labelencoder]                
                    INFO     Saving data to one_process_pipeline.dataset_scaled (CSVDataset)...                                    data_catalog.py:525
                    INFO     Saving data to one_process_pipeline.minmaxscaler (MlflowPickleDataset)...                             data_catalog.py:525
                    INFO     Saving data to one_process_pipeline.labelencoder (MlflowPickleDataset)...                             data_catalog.py:525
                    INFO     Completed 1 out of 2 tasks                                                                        sequential_runner.py:90
                    INFO     Loading data from dataset (CSVDataset)...                                                             data_catalog.py:483
                    INFO     Loading data from params:two_process_pipeline.preprocess_options (MemoryDataset)...                   data_catalog.py:483
                    INFO     Running node: scale_columns_node: scale_columns([dataset;params:two_process_pipeline.preprocess_options]) ->  node.py:361
                             [two_process_pipeline.dataset_scaled;two_process_pipeline.minmaxscaler;two_process_pipeline.labelencoder]                
                    INFO     Saving data to two_process_pipeline.dataset_scaled (CSVDataset)...                                    data_catalog.py:525
                    INFO     Saving data to two_process_pipeline.minmaxscaler (MlflowPickleDataset)...                             data_catalog.py:525
                    INFO     Saving data to two_process_pipeline.labelencoder (MlflowPickleDataset)...                             data_catalog.py:525
                    INFO     Completed 2 out of 2 tasks                                                                        sequential_runner.py:90
                    INFO     Pipeline execution completed successfully.
How can I solve this issue?
a
Your input can't be found, because it's in namespace of the pipelines that produce them. It's not
dataset_scaled
but rather
two_process_pipeline.dataset_scaled
or the first one. You can fix it by putting the training pipeline in the same namespace or provide where it can be found in input mapping:
Copy code
ds_pipeline_one = pipeline(
        pipe = pipeline_instance,
        inputs = "dataset_scaled", # <- here
        namespace = "one_model_pipeline"
    )
also you won't find it in catalog, as it's a memory dataset that's temporary, unless specified explicite in catalog
unless you'd check during pipeline execution like with a hook
a
This causes the following error
kedro.pipeline.modular_pipeline.ModularPipelineError: Failed to map datasets and/or parameters onto the nodes provided: one_process_pipeline.dataset_scaled
a
how do u do it?
a
This is the pipeline
Copy code
ds_pipeline_one = pipeline(
        pipe = pipeline_instance,
        inputs = "one_process_pipeline.dataset_scaled",
        namespace = "one_model_pipeline"
    )

    ds_pipeline_two = pipeline(
        pipe = pipeline_instance,
        inputs = "two_process_pipeline.dataset_scaled",
        namespace = "two_model_pipeline"
    )
a
it's best to provide dictionary
{"dataset_scaled": "two_process_pipeline.dataset_scaled"}
but that might not be the error
👍 2
a
Thank you.. It worked.. The pipeline run successfully.
👍 2
d
Thanks for helping out @Artur Dobrogowski kedroid
kedroid 2