There is a bug in the Kedro Prefect deployment whe...
# plugins-integrations
h
There is a bug in the Kedro Prefect deployment when using Kedro-Mlflow: Not all hooks are not getting run, for example before_pipeline_run, after_pipeline_run and before_node_run Therefore, when using kedro-mlflow in a kedro run on prefect, the KedroPipelineModel is not getting logged to mlflow. However, when using the sklearn flavor, it is getting logged. This is happening because kedro is not running the entire pipeline. Now, we fixed this is a hacky way by: making a new KedroTask class that adds the pipelinehook.
Copy code
class KedroTask(Task):
    """Kedro node as a Prefect task."""

    def _init_(self, node: Node, hooks: list = HOOKS):
        self._node = node
        self.manager = _create_hook_manager()
        self._hooks = hooks or []
        super()._init_(name=node.name, tags=node.tags)

    def run(self, task_dict: Dict[str, Union[DataCatalog, str]]):

        # Add the before node run of the hooks
        for hook in self._hooks:
            inputs = {}
            for input in self._node.inputs:
                inputs[input] = task_dict["catalog"].load(input)
            hook.before_node_run(
                self._node, catalog=task_dict["catalog"], inputs=inputs, is_async=False
            )

        run_node(
            self._node,
            task_dict["catalog"],
            self.manager,
            task_dict["sess_id"],
        )

        # If this is the last node of the pipeline
        if self._node._name == task_dict["pipeline"]._nodes[-1]._name:
            for hook in self._hooks:
                hook.after_pipeline_run(
                    run_params=task_dict["run_params"],
                    pipeline=task_dict["pipeline"],
                    catalog=task_dict["catalog"],
                )
Kedro task creates a prefect task. It subclasses prefect Task class. The run command now runs the “before_node_run” of every hook. If the node name is the last name of the pipeline nodes, the “after_pipeline_run” hook is triggered of every listed hook
Copy code
class KedroInitTask(Task):
    """Task to initialize KedroSession"""

    def __init__(
        self,
        pipeline_name: str,
        package_name: str,
        project_path: Union[Path, str] = None,
        env: str = None,
        extra_params: Dict[str, Any] = None,
        *args,
        **kwargs,
    ):
        self.project_path = Path(project_path or Path.cwd()).resolve()
        self.extra_params = extra_params
        self.pipeline_name = pipeline_name
        self.env = env
        super().__init__(name=f"{package_name}_init", *args, **kwargs)

    def run(self) -> Dict[str, Union[DataCatalog, str]]:
        """
        Initializes a Kedro session and returns the DataCatalog and
        KedroSession
        """
        # bootstrap project within task / flow scope
        bootstrap_project(self.project_path)

        session = KedroSession.create(
            project_path=self.project_path,
            env=self.env,
            extra_params=self.extra_params,  # noqa: E501
        )
        # Note that for logging inside a Prefect task self.logger is used.
        <http://self.logger.info|self.logger.info>("Session created with ID %s", session.session_id)
        pipeline = pipelines.get(self.pipeline_name)
        context = session.load_context()
        catalog = context.catalog
        unregistered_ds = pipeline.data_sets() - set(catalog.list())  # NOQA

        for ds_name in unregistered_ds:
            catalog.add(ds_name, MemoryDataSet())

        # TODO: get run params from the session or context
        run_params = {
            "session_id": session.session_id,
            "project_path": context.project_path,
            "env": self.env,
            "kedro_version": kedro_version,
            "tags": (),
            "from_nodes": [],
            "to_nodes": [],
            "node_names": (),
            "from_inputs": [],
            "to_outputs": [],
            "load_versions": {},
            "extra_params": {},
            "pipeline_name": self.pipeline_name,
            "namespace": None,
            "runner": "<kedro.runner.sequential_runner.SequentialRunner object at 0x10232c160>",
        }
        for hook in HOOKS:

            hook.before_pipeline_run(
                run_params=run_params, pipeline=pipeline, catalog=catalog
            )
        return {
            "catalog": catalog,
            "sess_id": session.session_id,
            "pipeline": pipeline,
            "run_params": run_params,
        }
In the
kedroInitTask
, I created the run_params with some default values or dynamically from the context or session where possible. After the catalog and session in created, we run the “before_pipeline_run” for every hook givem this run_params, pipeline and catalog. We return the Pipeline and run_params, because we need them for the hooks in the remaining nodes Some issues arise though, for example one can supply commands, like filtering by tags and nodes, or overwriting params through the CLI, these commands are not getting picked up. (they are not usable in the prefect deployment anyway). We load the hooks from the setting.py file, where we normally add hooks to a kedro project. We can only load the hooks once, because we need the same initted hook (e.g. MLFlow_HOOK) for multiple reasons. How would this work out if we run a task in prefect independent of the other tasks? So in the prefect GUI you select a task, and choose to re-run it, is the task execution now aware of the init of the pipeline it is a part of?
j
hi @Hugo Evers, thanks a lot for the detailed writeup. what version of Prefect are you using?
h
below 2
so 1.something
the latest one
because the new prefect is not open source
we wanted to migrate to version 2, and contribute a v2 deployment
but choose against since this service handles very sensitive data
j
because the new prefect is not open source
oh, that's something that I didn't have on my radar. will leave a comment in https://github.com/kedro-org/kedro/issues/2431
in any case, let's keep this thread about Prefect 1.x
h
anyway, we would like to contribute a fix for this prefect deployment
so my question would be, how would we go about this?
we would like to make separate plugins for the customisations we made
and have them work together, such that our custom kedro project looks more like a regular kedro project with some plugins and some configuration specifications
j
from your initial message I'm wondering if there's anything that can be done on the Kedro or kedro-mlflow side to mitigate these issues. I'm deferring to folks that are more informed than myself to give a more qualified answer cc @Deepyaman Datta @Yolan Honoré-Rougé
h
nothing for kedro-mlflow, but in Kedro yes, and specifically the kedro-prefect deployment from the docs
My suggestion would be to create a Kedro-Prefect plugin (like kedro-sagemaker plugin), and update that plugin to make sure the hooks are ran, and the CLI works the same
P 1
j
cc @marrrcin (he's on PTO) GetInData | Part of Xebia has experience with deploying Kedro in different places, could be a good fit https://github.com/getindata
👍 1