Johannes
09/05/2023, 8:51 AMdef DATA_CATALOG(basic_data):
    conf_loader = ConfigLoader(conf_source= Path("/workspace/pipeline/src/tests/conf/"))
    return DataCatalog(
        data_sets={"basic_data": MemoryDataset(basic_data)},
        feed_dict=???
    )def test_example_pipeline(DATA_CATALOG):
    # Initialize Kedro context and runner
    runner = SequentialRunner()
    #pipe = create_pipeline(inputs = "basic_data", outputs = "outputs")
    pipe = pipeline(
        [
            node(func=increment_random_seed, 
                 inputs={"random_seed": "params:random_seed.option1"},
                 outputs="output_inrs", 
                 name="split_data"),
            node(func=multiply_arr,
                 inputs={"arr": "basic_data",
                         "random_seed": "output_inrs"},
                 outputs="outputs") 
        ],
        tags="DL_Pipeline"Juan Luis
09/05/2023, 9:30 AMJohannes
09/05/2023, 10:19 AM@pytest.fixture()
def DATA_CATALOG(basic_data):
    conf_loader = ConfigLoader(conf_source= Path("/workspace/pipeline/src/tests/conf/"))
    return DataCatalog(
        data_sets={"basic_data": MemoryDataset(basic_data)},
        feed_dict=conf_loader
    )node(func=increment_random_seed, 
                 inputs={"random_seed": "params:random_seed.option1"},
                 outputs="output_inrs", 
                 name="split_data"),./pipeline/src/tests/pipelines/DeepLearningSolutions/test_pipeline.py::test_example_pipeline Failed: [undefined]ValueError: Pipeline input(s) {'params:random_seed.option1'} not found in the DataCatalog
DATA_CATALOG = <kedro.io.data_catalog.DataCatalog object at 0x7fc5a54e2c70>
    def test_example_pipeline(DATA_CATALOG):
        # Initialize Kedro context and runner
        runner = SequentialRunner()
        #pipe = create_pipeline(inputs = "basic_data", outputs = "outputs")
        pipe = pipeline(
            [
                node(func=increment_random_seed,
                     inputs={"random_seed": "params:random_seed.option1"},
                     outputs="output_inrs",
                     name="split_data"),
                node(func=multiply_arr,
                     inputs={"arr": "basic_data",
                             "random_seed": "output_inrs"},
                     outputs="outputs")
            ],
            tags="DL_Pipeline"
        )
    
        # Run the entire pipeline
>       pipeline_result = runner.run(pipeline=pipe, catalog=DATA_CATALOG)
pipeline/src/tests/pipelines/DeepLearningSolutions/test_pipeline.py:72: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
self = <kedro.runner.sequential_runner.SequentialRunner object at 0x7fc5a54e2d90>
pipeline = Pipeline([
Node(increment_random_seed, {'random_seed': 'params:random_seed.option1'}, 'output_inrs', 'split_data'),
Node(multiply_arr, {'arr': 'basic_data', 'random_seed': 'output_inrs'}, 'outputs', None)
])
catalog = <kedro.io.data_catalog.DataCatalog object at 0x7fc5a54e2e20>
hook_manager = <kedro.framework.hooks.manager._NullPluginManager object at 0x7fc5a54e2df0>
session_id = None
    def run(
        self,
        pipeline: Pipeline,
        catalog: DataCatalog,
        hook_manager: PluginManager = None,
        session_id: str = None,
    ) -> dict[str, Any]:
        """Run the ``Pipeline`` using the datasets provided by ``catalog``
        and save results back to the same objects.
    
        Args:
            pipeline: The ``Pipeline`` to run.
            catalog: The ``DataCatalog`` from which to fetch data.
            hook_manager: The ``PluginManager`` to activate hooks.
            session_id: The id of the session.
    
        Raises:
            ValueError: Raised when ``Pipeline`` inputs cannot be satisfied.
    
        Returns:
            Any node outputs that cannot be processed by the ``DataCatalog``.
            These are returned in a dictionary, where the keys are defined
            by the node outputs.
    
        """
    
        hook_manager = hook_manager or _NullPluginManager()
        catalog = catalog.shallow_copy()
    
        # Check which datasets used in the pipeline are in the catalog or match
        # a pattern in the catalog
        registered_ds = [ds for ds in pipeline.data_sets() if ds in catalog]
    
        # Check if there are any input datasets that aren't in the catalog and
        # don't match a pattern in the catalog.
        unsatisfied = pipeline.inputs() - set(registered_ds)
    
        if unsatisfied:
>           raise ValueError(
                f"Pipeline input(s) {unsatisfied} not found in the DataCatalog"
            )
E           ValueError: Pipeline input(s) {'params:random_seed.option1'} not found in the DataCatalog
/usr/local/lib/python3.8/dist-packages/kedro/runner/runner.py:86: ValueError
Total number of tests expected to run: 1
Total number of tests run: 1
Total number of tests passed: 0
Total number of tests failed: 1
Total number of tests failed with errors: 0
Total number of tests skipped: 0
Total number of tests with no result data: 0