Athul R T
06/04/2024, 7:43 AMdef create_pipeline(**kwargs) -> Pipeline:
pipeline_instance = pipeline([
node(
func=scale_columns,
inputs= ["dataset","params:preprocess_options"],
outputs= ["dataset_scaled","minmaxscaler", "labelencoder"],
name= "scale_columns_node",
),
])
dp_pipe_one = pipeline(
pipe = pipeline_instance,
inputs = "dataset",
#outputs = "dataset_scaled_one",
namespace = "one_process_pipeline",
)
dp_pipe_two = pipeline(
pipe = pipeline_instance,
inputs = "dataset",
#outputs = "dataset_scaled_two",
namespace = "two_process_pipeline",
)
return dp_pipe_one + dp_pipe_two
Pipeline 2 (model training)
def create_pipeline(**kwargs) -> Pipeline:
pipeline_instance = pipeline([
node(
func= split_data,
inputs= ["dataset_scaled","params:split_options"],
outputs= ["X_train", "X_test", "y_train", "y_test"],
name= "split_data_node",
),
node(
func= train_model,
inputs= ["X_train","y_train", "params:model_options"],
outputs= "classifier",
name= "train_model_node",
),
node(
func= test_model,
inputs= ["classifier", "X_test", "y_test"],
outputs= None,
name= "test_model_node",
),
])
ds_pipeline_one = pipeline(
pipe = pipeline_instance,
inputs = "dataset_scaled",
namespace = "one_model_pipeline"
)
return ds_pipeline_one + ds_pipeline_two
When I run the kedro, following error happens (for pipeline 2):
ValueError: Pipeline input(s) {'dataset_scaled'} not found in the DataCatalog
I have checked catalog and it is not found there.
[
'dataset',
'parameters',
'params:two_model_pipeline',
'params:two_model_pipeline.split_options',
'params:two_model_pipeline.split_options.test_size',
'params:two_model_pipeline.split_options.random_state',
'params:two_model_pipeline.split_options.x_params',
'params:two_model_pipeline.split_options.y_param',
'params:two_model_pipeline.model_options',
'params:two_model_pipeline.model_options.n_estimators',
'params:two_model_pipeline.model_options.random_state',
'params:one_model_pipeline',
'params:one_model_pipeline.split_options',
'params:one_model_pipeline.split_options.test_size',
'params:one_model_pipeline.split_options.random_state',
'params:one_model_pipeline.split_options.x_params',
'params:one_model_pipeline.split_options.y_param',
'params:one_model_pipeline.model_options',
'params:one_model_pipeline.model_options.n_estimators',
'params:one_model_pipeline.model_options.random_state',
'params:two_process_pipeline',
'params:two_process_pipeline.preprocess_options',
'params:two_process_pipeline.preprocess_options.x_features',
'params:two_process_pipeline.preprocess_options.y_features',
'params:one_process_pipeline',
'params:one_process_pipeline.preprocess_options',
'params:one_process_pipeline.preprocess_options.x_features',
'params:one_process_pipeline.preprocess_options.y_features',
]
Here is the log for pipeline 1 (preprocessing)
INFO Loading data from dataset (CSVDataset)... data_catalog.py:483
INFO Loading data from params:one_process_pipeline.preprocess_options (MemoryDataset)... data_catalog.py:483
INFO Running node: scale_columns_node: scale_columns([dataset;params:one_process_pipeline.preprocess_options]) -> node.py:361
[one_process_pipeline.dataset_scaled;one_process_pipeline.minmaxscaler;one_process_pipeline.labelencoder]
INFO Saving data to one_process_pipeline.dataset_scaled (CSVDataset)... data_catalog.py:525
INFO Saving data to one_process_pipeline.minmaxscaler (MlflowPickleDataset)... data_catalog.py:525
INFO Saving data to one_process_pipeline.labelencoder (MlflowPickleDataset)... data_catalog.py:525
INFO Completed 1 out of 2 tasks sequential_runner.py:90
INFO Loading data from dataset (CSVDataset)... data_catalog.py:483
INFO Loading data from params:two_process_pipeline.preprocess_options (MemoryDataset)... data_catalog.py:483
INFO Running node: scale_columns_node: scale_columns([dataset;params:two_process_pipeline.preprocess_options]) -> node.py:361
[two_process_pipeline.dataset_scaled;two_process_pipeline.minmaxscaler;two_process_pipeline.labelencoder]
INFO Saving data to two_process_pipeline.dataset_scaled (CSVDataset)... data_catalog.py:525
INFO Saving data to two_process_pipeline.minmaxscaler (MlflowPickleDataset)... data_catalog.py:525
INFO Saving data to two_process_pipeline.labelencoder (MlflowPickleDataset)... data_catalog.py:525
INFO Completed 2 out of 2 tasks sequential_runner.py:90
INFO Pipeline execution completed successfully.
How can I solve this issue?Artur Dobrogowski
06/04/2024, 9:17 AMdataset_scaled
but rather two_process_pipeline.dataset_scaled
or the first one. You can fix it by putting the training pipeline in the same namespace or provide where it can be found in input mapping:
ds_pipeline_one = pipeline(
pipe = pipeline_instance,
inputs = "dataset_scaled", # <- here
namespace = "one_model_pipeline"
)
Artur Dobrogowski
06/04/2024, 9:19 AMArtur Dobrogowski
06/04/2024, 9:19 AMAthul R T
06/04/2024, 9:20 AMkedro.pipeline.modular_pipeline.ModularPipelineError: Failed to map datasets and/or parameters onto the nodes provided: one_process_pipeline.dataset_scaled
Artur Dobrogowski
06/04/2024, 9:21 AMAthul R T
06/04/2024, 9:21 AMds_pipeline_one = pipeline(
pipe = pipeline_instance,
inputs = "one_process_pipeline.dataset_scaled",
namespace = "one_model_pipeline"
)
ds_pipeline_two = pipeline(
pipe = pipeline_instance,
inputs = "two_process_pipeline.dataset_scaled",
namespace = "two_model_pipeline"
)
Artur Dobrogowski
06/04/2024, 9:21 AM{"dataset_scaled": "two_process_pipeline.dataset_scaled"}
but that might not be the errorAthul R T
06/04/2024, 9:23 AMdatajoely
06/04/2024, 11:45 AM