Hello, I would like to use something like this dry...
# questions
j
Hello, I would like to use something like this dry runner https://kedro.readthedocs.io/en/stable/nodes_and_pipelines/run_a_pipeline.html#custom-runners but in a slightly different context - I would like to also check whether the data exists. • I am using multiple kedro environments https://kedro.readthedocs.io/en/stable/kedro_project_setup/configuration.html for different model experiments • When I do a partial run with
--from-nodes
and
--to-nodes
(to save time over a full pipeline run), I often discover that some data does not exist in my environment - but it takes a while to discover this, as the code needs to run first • Then "checking whether the data exists" is a bit complex: ◦ Either check whether it is an intermediate output of the provided pipeline ◦ Or check whether it can be read from the catalog using the
_exists
method of the abstract dataset class Is this something that someone has already built, and is it a common use case?
K 1
1
n
Hi @Julian Waton, this is a good question and I think it can be a more useful example to include in our docs compare to the current DryRunner
I am thinking to achieve that, you need to check two things: • It’s a valid DAG (kedro already do this by default, if it’s a missing input you it throws error already) • So the additional thing that you need to do is just checking pipeline’s input can be loaded.
j
Exactly. Are the pipeline's inputs already understood by the pipeline class without having to explicitly loop through the nodes?
Aha, just seen that it's a straightforward check of the
inputs
attribute of the pipeline class 🙂 https://github.com/kedro-org/kedro/blob/main/kedro/pipeline/pipeline.py
Exactly
You can go to
SequentialRunner
, it would be an useful reference I think.
j
I built something pretty rough that appears to work!
Copy code
from <http://kedro.io|kedro.io> import AbstractDataSet, DataCatalog, MemoryDataSet
from kedro.pipeline import Pipeline
from kedro.runner.runner import AbstractRunner


class DryRunner(AbstractRunner):
    """``DryRunner`` is an ``AbstractRunner`` implementation. It can be used to list which
    nodes would be run without actually executing anything.
    """

    def create_default_data_set(self, ds_name: str) -> AbstractDataSet:
        """Factory method for creating the default data set for the runner.

        Args:
            ds_name: Name of the missing data set
        Returns:
            An instance of an implementation of AbstractDataSet to be used
            for all unregistered data sets.

        """
        return MemoryDataSet()

    def _run(
        self, pipeline: Pipeline, catalog: DataCatalog, session_id: str = None
    ) -> None:
        """The method implementing dry pipeline running.
        Example logs output using this implementation:

            kedro.runner.dry_runner - INFO - Actual run would execute 3 nodes:
            node3: identity([A]) -> [B]
            node2: identity([C]) -> [D]
            node1: identity([D]) -> [E]

        Args:
            pipeline: The ``Pipeline`` to run.
            catalog: The ``DataCatalog`` from which to fetch data.
            session_id: The id of the session.

        """
        nodes = pipeline.nodes
        <http://self._logger.info|self._logger.info>(
            "Actual run would execute %d nodes:\n%s",
            len(nodes),
            "\n",
            pipeline.describe(),
        )
        <http://self._logger.info|self._logger.info>("Checking inputs...")
        input_names = pipeline.inputs()
        missing_inputs = [
            input_name
            for input_name in input_names
            if not catalog._get_dataset(input_name)._exists()
        ]
        if missing_inputs:
            raise KeyError(f"Datasets {missing_inputs} not found.")
K 1
Appropriate error can be raised with expected file paths too.
n
Awesome - is there a particular reason you need to use the protected method
_exists
instead of the public one
datasets.exists()
?
j
I didn't know that existed!
K 1
n
In that case I would use the public one if possible