Hi all, I am a new to kedro and i love it ! So i ...
# questions
c
Hi all, I am a new to kedro and i love it ! So i have started to create my first pipeline unit test, but i'm a little bit stuck with a simple error 😄 kedro, version 0.19.11, running
pytest
:
/test_pipeline.py::test_pipeline_execution - NameError: name 'MemoryDataSet' is not defined
If any body could help me please... Bellow the test i try to add :
Copy code
from <http://kedro.io|kedro.io> import MemoryDataset
from <http://kedro.io|kedro.io> import KedroDataCatalog


def test_pipeline_execution():
    catalog = KedroDataCatalog({
        "raw_repos": MemoryDataSet(),
        "transformed_repos": MemoryDataSet(),
    })

    # define stub function instead of mock
    def extract_repositories():
        return [{"repo": "test-repo"}]

    def transform_repositories(raw_repos):
        return [{"repo": raw_repos[0]["repo"], "processed": True}]

    def load_repositories(transformed_repos):
        assert transformed_repos == [{"repo": "test-repo", "processed": True}]

    pipeline = Pipeline([
        node(extract_repositories, inputs=None, outputs="raw_repos"),
        node(transform_repositories, inputs="raw_repos", outputs="transformed_repos"),
        node(load_repositories, inputs="transformed_repos", outputs=None),
    ])

    runner = SequentialRunner()
    runner.run(pipeline, catalog)
Thanks in advanced guys.
1
h
Someone will reply to you shortly. In the meantime, this might help:
i
Its a typo Capitalization should change from
MemoryDataSet
to
MemoryDataset
where the
set
is lower case
thankyou 1
😄 1
👍 1
👍🏼 1