Julian Waton
11/01/2022, 2:52 PM--from-nodes
and --to-nodes
(to save time over a full pipeline run), I often discover that some data does not exist in my environment - but it takes a while to discover this, as the code needs to run first
• Then "checking whether the data exists" is a bit complex:
◦ Either check whether it is an intermediate output of the provided pipeline
◦ Or check whether it can be read from the catalog using the _exists
method of the abstract dataset class
Is this something that someone has already built, and is it a common use case?Nok Lam Chan
11/01/2022, 3:28 PMJulian Waton
11/01/2022, 3:37 PMinputs
attribute of the pipeline class 🙂 https://github.com/kedro-org/kedro/blob/main/kedro/pipeline/pipeline.pyNok Lam Chan
11/01/2022, 3:41 PMSequentialRunner
, it would be an useful reference I think.Julian Waton
11/01/2022, 4:22 PMfrom <http://kedro.io|kedro.io> import AbstractDataSet, DataCatalog, MemoryDataSet
from kedro.pipeline import Pipeline
from kedro.runner.runner import AbstractRunner
class DryRunner(AbstractRunner):
"""``DryRunner`` is an ``AbstractRunner`` implementation. It can be used to list which
nodes would be run without actually executing anything.
"""
def create_default_data_set(self, ds_name: str) -> AbstractDataSet:
"""Factory method for creating the default data set for the runner.
Args:
ds_name: Name of the missing data set
Returns:
An instance of an implementation of AbstractDataSet to be used
for all unregistered data sets.
"""
return MemoryDataSet()
def _run(
self, pipeline: Pipeline, catalog: DataCatalog, session_id: str = None
) -> None:
"""The method implementing dry pipeline running.
Example logs output using this implementation:
kedro.runner.dry_runner - INFO - Actual run would execute 3 nodes:
node3: identity([A]) -> [B]
node2: identity([C]) -> [D]
node1: identity([D]) -> [E]
Args:
pipeline: The ``Pipeline`` to run.
catalog: The ``DataCatalog`` from which to fetch data.
session_id: The id of the session.
"""
nodes = pipeline.nodes
<http://self._logger.info|self._logger.info>(
"Actual run would execute %d nodes:\n%s",
len(nodes),
"\n",
pipeline.describe(),
)
<http://self._logger.info|self._logger.info>("Checking inputs...")
input_names = pipeline.inputs()
missing_inputs = [
input_name
for input_name in input_names
if not catalog._get_dataset(input_name)._exists()
]
if missing_inputs:
raise KeyError(f"Datasets {missing_inputs} not found.")
Nok Lam Chan
11/01/2022, 5:25 PM_exists
instead of the public one datasets.exists()
?Julian Waton
11/01/2022, 5:39 PMNok Lam Chan
11/01/2022, 5:58 PM