Hello, I would like to use something like this dry...
# questions
Hello, I would like to use something like this dry runner https://kedro.readthedocs.io/en/stable/nodes_and_pipelines/run_a_pipeline.html#custom-runners but in a slightly different context - I would like to also check whether the data exists. • I am using multiple kedro environments https://kedro.readthedocs.io/en/stable/kedro_project_setup/configuration.html for different model experiments • When I do a partial run with
(to save time over a full pipeline run), I often discover that some data does not exist in my environment - but it takes a while to discover this, as the code needs to run first • Then "checking whether the data exists" is a bit complex: ◦ Either check whether it is an intermediate output of the provided pipeline ◦ Or check whether it can be read from the catalog using the
method of the abstract dataset class Is this something that someone has already built, and is it a common use case?
K 1
Hi @Julian Waton, this is a good question and I think it can be a more useful example to include in our docs compare to the current DryRunner
I am thinking to achieve that, you need to check two things: • It’s a valid DAG (kedro already do this by default, if it’s a missing input you it throws error already) • So the additional thing that you need to do is just checking pipeline’s input can be loaded.
Exactly. Are the pipeline's inputs already understood by the pipeline class without having to explicitly loop through the nodes?
Aha, just seen that it's a straightforward check of the
attribute of the pipeline class 🙂 https://github.com/kedro-org/kedro/blob/main/kedro/pipeline/pipeline.py
You can go to
, it would be an useful reference I think.
I built something pretty rough that appears to work!
Copy code
from <http://kedro.io|kedro.io> import AbstractDataSet, DataCatalog, MemoryDataSet
from kedro.pipeline import Pipeline
from kedro.runner.runner import AbstractRunner

class DryRunner(AbstractRunner):
    """``DryRunner`` is an ``AbstractRunner`` implementation. It can be used to list which
    nodes would be run without actually executing anything.

    def create_default_data_set(self, ds_name: str) -> AbstractDataSet:
        """Factory method for creating the default data set for the runner.

            ds_name: Name of the missing data set
            An instance of an implementation of AbstractDataSet to be used
            for all unregistered data sets.

        return MemoryDataSet()

    def _run(
        self, pipeline: Pipeline, catalog: DataCatalog, session_id: str = None
    ) -> None:
        """The method implementing dry pipeline running.
        Example logs output using this implementation:

            kedro.runner.dry_runner - INFO - Actual run would execute 3 nodes:
            node3: identity([A]) -> [B]
            node2: identity([C]) -> [D]
            node1: identity([D]) -> [E]

            pipeline: The ``Pipeline`` to run.
            catalog: The ``DataCatalog`` from which to fetch data.
            session_id: The id of the session.

        nodes = pipeline.nodes
            "Actual run would execute %d nodes:\n%s",
        <http://self._logger.info|self._logger.info>("Checking inputs...")
        input_names = pipeline.inputs()
        missing_inputs = [
            for input_name in input_names
            if not catalog._get_dataset(input_name)._exists()
        if missing_inputs:
            raise KeyError(f"Datasets {missing_inputs} not found.")
K 1
Appropriate error can be raised with expected file paths too.
Awesome - is there a particular reason you need to use the protected method
instead of the public one
I didn't know that existed!
K 1
In that case I would use the public one if possible