Hi everyone I’m coming back with a question that ...
# questions
m
Hi everyone I’m coming back with a question that I think has been “floating” around here a few times. Sorry for that. DISCLAIMER ⚠️ : This is about facilitating dynamic pipelines (bad 😈 … I know) When creating a dynamic pipeline it is useful, even “vital”, to access the currently running context / session. (ex: to do things like accessing the parameters after they’ve been overwritten via
kedro run --params
etc…) Somewhere in a github issues I had found something that looked more or less like this:
Copy code
from kedro.framework.session import get_current_session
current_session = get_current_session()
current_context = session.load_context()
current_params = context.catalog.load("parameters")
However, this is the old api and does not work anymore. 😕 How can we access the current session at runtime in
pipeline.py
? For now I’m “condemned” to either hard code things manually or to do this ugly and not really useful hack
Copy code
from kedro.framework.session import KedroSession
project_path = paths.PROJECT_ROOT_DIR.as_posix()
static_session = KedroSession.create(project_path=project_path)
....
If someone has a solution, please let me know. Many thanks M P.S: I remember a mention being made about using the
after_context_created
hook, but from the depth of my ignorance, I have no clue how this could be of help in what I want to achieve. 🙁
l
Hello again! I too sometimes want dynamic pipelines, though I do try to avoid them as much as I can in the final product - they are very nice for experimenting if you have a lot of parameter options to test. I will share my Hooks to solve this! Wonder what people think.. 1. Create a
after_context_created
after_catalog_created
hook 2. Replace the
register_pipelines()
function with a custom
register_dynamic_pipelines(catalog: DataCatalog)
function a. If you can pass the
catalog
to create pipelines, you can access datasets and parameters to dynamically build your pipeline! 3. Use
create_pipeline(catalog: DataCatalog)
functions to create your pipelines ! Ill share my approach to create the pipelines. Of course you would need to extend this example if you also want to dynamically create DataCatalog entries - to save the outputs of your dynamic pipeline:) But im leaving that out here to save space. Here are my steps.. `ProjectHooks`: in
hooks.py
Make sure to add it to your
settings.py
, on the commented line saying
HOOKS=(ProjectHooks(),)
Copy code
from kedro.framework.context import KedroContext
from kedro.framework.hooks import hook_impl
from kedro.framework.project import pipelines
from <http://kedro.io|kedro.io> import DataCatalog

from yourpackage import pipeline_registry

class ProjectHooks:
    @hook_impl
    def after_context_created(self, context: KedroContext) -> None:
        context.catalog

    @hook_impl
    def after_catalog_created(self, catalog: DataCatalog, conf_catalog) -> None:
        """Hook to fill in and extend templated pipelines."""
        pipeline_registry.register_pipelines = pipeline_registry.register_dynamic_pipelines(catalog)
        pipelines.configure("yourpackage.pipeline_registry")
Notice that we overwrite the
pipeline_registry.register_pipelines
method with
pipeline_registry.register_dynamic_pipelines()
in the hook. In your
pipeline_registry.py
use the following:
Copy code
from <http://kedro.io|kedro.io> import DataCatalog
from kedro.pipeline import Pipeline


def register_pipelines():
    """Method that will be assigned to the callable returned by register_dynamic_pipelines(...), by a Hook."""

    raise NotImplementedError("""
    register_pipelines() is expected to be overwritten by ProjectHooks.
    Make sure the hooks is found in hooks.py and enabled in settings.py
    """)


def register_dynamic_pipelines(catalog: DataCatalog) -> dict[str, Pipeline]:
    """Register the project's pipelines depending on the catalog.

    Create pipelines dynamically based on parameters and datasets defined in the catalog.
    The function must return a callable without any arguments that will replace the `register_pipelines()` method
    in this same module, using an `after_catalog_created_hook`.

    Args:
        catalog: The DataCatalog loaded from the KedroContext.

    Returns:
        A callable that returns a mapping from pipeline names to ``Pipeline`` objects.
    """
    # create pipelines with access to catalog
    my_pipeline = create_pipeline(catalog=catalog)

    def register_pipelines():
        """Register the project's pipelines.

        Returns:
            A mapping from pipeline names to ``Pipeline`` objects.
        """
        pipelines = {
            "__default__": my_pipeline
        }

        return pipelines

    return register_pipelines
This uses a
create_pipeline(catalog=catalog)
method, for example something like this!
Copy code
from <http://kedro.io|kedro.io> import DataCatalog
def create_pipeline(catalog: DataCatalog):
    parameters = catalog.datasets.parameters.load()
    models = parameters["models"]  # a dictionary with the keys being the model labels

    dynamic_nodes = []
    for model, parameters in models.items():
        model_nodes = [
            node(
                func=fit_model,
                inputs=dict(
                    X="X_train",
                    y="y_train",
                    model_type=f"params:{model}.class",
                    fit_kwargs=f"params:{model}.fit_kwargs",
                ),
                outputs=f"{model}.fitted",
            ),
            node(
                func=score_model,
                inputs=dict(
                    X="X_test",
                    y="y_test",
                    model=f"{model}.fitted",
                ),
                outputs=f"{model}.score",
            ),
        ]
        dynamic_nodes.extend(dynamic_nodes)

    output_pipeline = pipeline(dynamic_nodes)
    return output_pipeline
🥳 1
Hope you don't consider this spam because it's quite verbose... But by using namespaced pipelines and data factories, you could even make this a lot cleaner. Instead of copy-pasting more code, maybe it's handier to attach the .py files 🤔 --- Same Hook as above. But, instead of creating pipelines by changing the parameter and dataset names directly we create `namespace`d pipelines. Along with dataset factories: https://docs.kedro.org/en/stable/data/data_catalog.html#example-3-generalise-datasets-using-namespaces-into-one-dataset-factory - have a look at the catalog.yml. This way , not only is the pipeline dynamic but also your catalog!
m
@Lodewic van Twillert Spam ? Are you kidding ? 😉 Thank you so much for all this. I’ll explore / play around and let you know how it goes. Thank you so much for sharing 🙏🏼 ☀️ 🙂 Regards M.
l
Haha ok great, then enjoy! Hope to hear your thoughts
👍🏼 1
🙂 1
For completeness, I made some quick typos in the
catalog.yml
, should have been this:
Copy code
models:
  linear_regression:
    class: sklearn.linear_model.LinearRegression
    fit_kwargs:
      fit_intercept: true
  decision_tree:
    class: sklearn.tree.DecisionTreeRegressor
    fit_kwargs:
      criterion: squared_error
  quantile_regression:
    class: sklearn.linear_model.QuantileRegressor
    fit_kwargs:
      fit_intercept: true
      quantile: 0.5
n
Thanks a lot @Lodewic van Twillert, very creative solution. We have an open issue to track Dynamic Pipeline. While we may not have an immediate solution, it is still beneficial to have workaround.
I’ll cross post your solution there. https://github.com/kedro-org/kedro/issues/2627
And this is gem not spams!
l
Cool thanks, I didnt realize there was already a Dynamic Pipeline issue - ill keep an eye on that too 💪
👍🏼 1
n
The solution is quite comprehensive, it would be great if you can share a demo repository, but this is awesome work 🙂
l
Yeah sure, happy to! I can make a working version with the iris starter and link a repository
❤️ 1
n
I’ve linked your response on the issue now. Let me know when you have a working repository and I can update it accordingly
m
Hi @Nok Lam Chan Good to see you here 🙂 I’m both really appreciative & grateful by what @Lodewic van Twillert has shared. But just to try to “really understand”, leaving aside the “general question” of dynamic pipelines, do you confirm that there is no simple & straight forward way to achieve what the old api allowed to do with
from kedro.framework.session import get_current_session
? Thx M
n
@Marc Gris The direct replacement of it is hook, either
after_context_created
or
after_catalog_created
. What do you need to do? Do you just need
params
to create your pipeline?
m
@Nok Lam Chan here’s an example of the dirty dynamic things that I try to do:
Copy code
def create_pipeline(**kwargs) -> Pipeline:

    all_models = ['model_a', 'model_b', 'model_c' ... ]
    all_pipelines =  [] 

    for model in all_models:

        model_pipeline = pipeline([

                            # MODEL INPUT PREP
                            node(
                                func=functions.prepare_model_input,
                                inputs=["featurized_users", 
                                        "featurized_items", 
                                        "featurized_interactions", 
                                        f"params:models.{model}.name",
                                        f"params:models.{model}.params.input_prep"
                                        ], 
                                outputs=f"{model}_input", 
                                name=f"{model}_input_preparation_node",
                                tags=['model', 'input', model])
                                , 
          [ other nodes ...]

          all_pipelines.append(model_pipeline)

      return sum(all_pipelines)
What would be an elegant / proper way to do this (avoiding the hardcoding, and ideally accessing
"params:models"
to iterate over. I do hear and understand Kedro’s stance for static / data centric pipelines… But one must reckon that if one has dozens of models, creating their pipelines manually would be both tedious and so redundant / “boilerplaty”… Thx 🙂 P.S: May be what was shared by Lodevic completely “fits my bill” and solves my problem. But I’m stuck at work on something and did have time to dive into his suggestion.
n
dynamic pipeline is not so dirty IMO, I have a lot of empathy on that. It should be avoid when it’s not necessary, and there are cases where it is necessary 🙂
☺️ 1
I am trying to work out an example
🙏🏼 1
l
If you are interested in trying my approach, or just testing it. I have finished an example in this repository: https://github.com/Lodewic/kedro-dynamic-pipeline-hook-example
❤️ 1
🎉 1
m
Wow… Awesome… You’re blazingly fast !!! Will dive into it, with GREAT interest, ASAP 🙂 Thx again
n
that’s an awesome README !
💪 1
l
Thank you! It has the Kedro badge I saw being mentioned haha
K 1
1000 1
n
Ya that is neat 😄
i
@Lodewic van Twillert this is amazing. We're in the process of migrating away from kedro 0.17.1 where we make use of the old session API Marc was trying to use, and this repo shows such a clever use of hooks to replace the use of the session. Thanks for sharing!!
What is the reason for this hook?
Copy code
def after_context_created(self, context: KedroContext) -> None:
        context.catalog
it doesn't save it as a class attribute or anything, does "calling" the catalog force it to instantiate before it normally would or smth?
l
Hi glad you like it, hope it helps you! That hook I don't remember exactly when we started to include that, we had an issue to which that was a solution - but maybe it was unrelated to the dynamic pipelines.. Might have been something do to with the lazy-loading in the catalog as you also suspected. Could try commenting it out and see if it all works the same, am also curious tbh 🤔
i
will give it a try once I get one of our projects set up and I'll update you 🙂
💪 1
l
@Iñigo Hidalgo If you remove that hook, then
kedro run
breaks - but
kedro viz
will be fine, but here is why: The reason is that when you call
context.catalog
, then the
after_catalog_created()
hooks are triggered. Normally, the catalog seems to be created After
register_pipelines()
is required. So, when our
register_pipelines()
function is not replaced then it will raise an error. So calling
context.catalog
after the context is created, we force the
after_catalog_created()
to be triggered earlier than usual-> therefore overwriting the
register_pipelines()
function before it is required. Here is the
_get_catalog()
method from kedro where you can see that the
after_catalog_created()
hook is triggered 👍 https://github.com/kedro-org/kedro/blob/main/kedro/framework/context/context.py#L255C5-L295C23 --- I think
kedro viz
also creates the catalog earlier than
kedro run
would, so for viz it is not required to
In short, you were right the first time