Hugo Evers
05/31/2023, 10:11 AMclass KedroTask(Task):
"""Kedro node as a Prefect task."""
def _init_(self, node: Node, hooks: list = HOOKS):
self._node = node
self.manager = _create_hook_manager()
self._hooks = hooks or []
super()._init_(name=node.name, tags=node.tags)
def run(self, task_dict: Dict[str, Union[DataCatalog, str]]):
# Add the before node run of the hooks
for hook in self._hooks:
inputs = {}
for input in self._node.inputs:
inputs[input] = task_dict["catalog"].load(input)
hook.before_node_run(
self._node, catalog=task_dict["catalog"], inputs=inputs, is_async=False
)
run_node(
self._node,
task_dict["catalog"],
self.manager,
task_dict["sess_id"],
)
# If this is the last node of the pipeline
if self._node._name == task_dict["pipeline"]._nodes[-1]._name:
for hook in self._hooks:
hook.after_pipeline_run(
run_params=task_dict["run_params"],
pipeline=task_dict["pipeline"],
catalog=task_dict["catalog"],
)
Kedro task creates a prefect task. It subclasses prefect Task class.
The run command now runs the “before_node_run” of every hook.
If the node name is the last name of the pipeline nodes, the “after_pipeline_run” hook is triggered of every listed hook
class KedroInitTask(Task):
"""Task to initialize KedroSession"""
def __init__(
self,
pipeline_name: str,
package_name: str,
project_path: Union[Path, str] = None,
env: str = None,
extra_params: Dict[str, Any] = None,
*args,
**kwargs,
):
self.project_path = Path(project_path or Path.cwd()).resolve()
self.extra_params = extra_params
self.pipeline_name = pipeline_name
self.env = env
super().__init__(name=f"{package_name}_init", *args, **kwargs)
def run(self) -> Dict[str, Union[DataCatalog, str]]:
"""
Initializes a Kedro session and returns the DataCatalog and
KedroSession
"""
# bootstrap project within task / flow scope
bootstrap_project(self.project_path)
session = KedroSession.create(
project_path=self.project_path,
env=self.env,
extra_params=self.extra_params, # noqa: E501
)
# Note that for logging inside a Prefect task self.logger is used.
<http://self.logger.info|self.logger.info>("Session created with ID %s", session.session_id)
pipeline = pipelines.get(self.pipeline_name)
context = session.load_context()
catalog = context.catalog
unregistered_ds = pipeline.data_sets() - set(catalog.list()) # NOQA
for ds_name in unregistered_ds:
catalog.add(ds_name, MemoryDataSet())
# TODO: get run params from the session or context
run_params = {
"session_id": session.session_id,
"project_path": context.project_path,
"env": self.env,
"kedro_version": kedro_version,
"tags": (),
"from_nodes": [],
"to_nodes": [],
"node_names": (),
"from_inputs": [],
"to_outputs": [],
"load_versions": {},
"extra_params": {},
"pipeline_name": self.pipeline_name,
"namespace": None,
"runner": "<kedro.runner.sequential_runner.SequentialRunner object at 0x10232c160>",
}
for hook in HOOKS:
hook.before_pipeline_run(
run_params=run_params, pipeline=pipeline, catalog=catalog
)
return {
"catalog": catalog,
"sess_id": session.session_id,
"pipeline": pipeline,
"run_params": run_params,
}
In the kedroInitTask
, I created the run_params with some default values or dynamically from the context or session where possible.
After the catalog and session in created, we run the “before_pipeline_run” for every hook givem this run_params, pipeline and catalog. We return the Pipeline and run_params, because we need them for the hooks in the remaining nodes
Some issues arise though, for example one can supply commands, like filtering by tags and nodes, or overwriting params through the CLI, these commands are not getting picked up. (they are not usable in the prefect deployment anyway).
We load the hooks from the setting.py file, where we normally add hooks to a kedro project. We can only load the hooks once, because we need the same initted hook (e.g. MLFlow_HOOK) for multiple reasons. How would this work out if we run a task in prefect independent of the other tasks? So in the prefect GUI you select a task, and choose to re-run it, is the task execution now aware of the init of the pipeline it is a part of?Juan Luis
05/31/2023, 10:23 AMHugo Evers
05/31/2023, 10:23 AMJuan Luis
05/31/2023, 10:25 AMbecause the new prefect is not open sourceoh, that's something that I didn't have on my radar. will leave a comment in https://github.com/kedro-org/kedro/issues/2431
Hugo Evers
05/31/2023, 10:26 AMJuan Luis
05/31/2023, 10:39 AMHugo Evers
05/31/2023, 10:40 AMJuan Luis
05/31/2023, 10:44 AM