Hey guys. How do I access the context/catalog duri...
# questions
z
Hey guys. How do I access the context/catalog during a node execution?
j
hi @Zemeio, I'm wondering what are you trying to achieve?
the generic solution for this is to use hooks: https://docs.kedro.org/en/stable/hooks/introduction.html
m
@Juan Luis Allow me to “jump in” / piggy back on this question: In our case, we needed access to the
catalog
in the
pipeline.py
to create multiple pipelines dynamically, based on values specified in the parameter files. So far, I had “hacked my way around” by adding this in
pipeline.py
Copy code
project_path = paths.PROJECT_ROOT_DIR.as_posix()
session = KedroSession.create(project_path=project_path)
context = session.load_context()
catalog = context.catalog
It worked just fine. However… for better “customer segregation” I have recently separated the conf for our different customers in this way:
conf/customer_{x,y,z}/{base,local}
Thanks (or because) to this,
--conf-source
is now a mandatory argument to
kedro run
(which is what we wanted) However the “hack” above (in
pipeline.py
) does not work anymore… So, instead of re-instantiating a session / context / catalog, is there away to access the “actual runtime” session / context / catalog ? Thx in advance, M In our case, this very convenient / re-assuring since
j
However the “hack” above (in
pipeline.py
) does not work anymore…
could you elaborate? is it something that stopped working because of a change we did in Kedro recently?
m
Since
conf_source
is left unspecified in
KedroSession.create
it raises an error… (i.e the desire “customer conf isolation”)
I’ve seen this (c.f screenshot) the github thread about dynamic pipelines
But I get a
ImportError: cannot import name 'get_current_session' from 'kedro.framework.session'
j
looping my colleague @Nok Lam Chan
👍🏼 1
n
https://docs.kedro.org/en/0.18.3/faq/architecture_overview.html#kedro-architecture-overview We had lots of discussion about this, but fundamentally we expect pipeline to be static. When you started reading config to create pipeline, these are 2-order pipelines, or you may even using a pipeline to create more complicated pipelines.
👍🏼 1
z
My use case is similar to @Marc Gris, and I agree that it deviates a bit from the Kedro way, by making nodes a bit more dynamic.
👍🏼 1
n
(I’ll be to this thread, in a meeting now)
z
I have lots of "small variations" that can happen according to parameters, and it feels like the best way to accomplish this is by having the parameters be accessed in the node execution instead of creating hundreds of different static pipelines
👍🏼 1
I need to save the results to different entries in the catalog according to these parameters.
👍🏼 1
Please also refer to Marc's "jump-in" question, as that would solve for me as well but it seems he went further.
👍🏼 1
m
Thx @Nok Lam Chan will look into the
after_context_created
hook 👍🏼 (glad to hear that my flood of questions is not annoying 😉 )
n
What do you need to access? Maybe it helps if you can give me a code snippets even if such things doesn’t exist. Dynamic pipeline is something that I would like to improve, I don’t know if Kedro would ever support a full dynamic execution but it will help us to design a better solution. https://github.com/kedro-org/kedro/issues/2627
m
In my “current” case, I need to access “current”
"params:schema.tables"
to dynamically generate the different nodes for all those tables. Will try to do so with the hook you suggested.
z
Preliminary results here: I added the context to my Hooks class at the ``after_context_created`` hook, and then I can change the value of the node slightly depending on context values. I have to find possible side effects now, since this is not expected by the framework. One side effect I can think of, is that the order of running the nodes is defined before this change, so that is one thing to watch out for.
👍🏼 1
It seems like the pipeline is unchangeable, as well as the nodes. I change them, but it does not seem to reflect. I tried modifying the nodes in the
before_pipeline_run
hook.
Any updates on this topic? I still haven't found a way to work this out.
n
before_pipeline_run
- could you share how you make the change?
Copy code
runner = runner or SequentialRunner()
        hook_manager.hook.before_pipeline_run(  # pylint: disable=no-member
            run_params=record_data, pipeline=filtered_pipeline, catalog=catalog
        )

        try:
            run_result = runner.run(
                filtered_pipeline, catalog, hook_manager, session_id
            )
A snippet of code coming from
kedro/framework/session/session.py
, as long as it mutates the pipeline object it should work
z
@Nok Lam Chan
Copy code
@hook_impl
    def before_pipeline_run(self, pipeline: Pipeline):
        multi_nodes = []
        for node in pipeline.nodes:
            if [i for i in node.inputs if "{sln}" in i] or [i for i in node.outputs if "{sln}" in i]:
                multi_nodes.append(node)
        for node in multi_nodes:
            for sln in self.SLNS:
                n = Node(
                    node.func,
                    [i.format(sln=sln) for i in node.inputs],
                    [i.format(sln=sln) for i in node.outputs],
                    name=node.name + f"_{sln}",
                    tags=node.tags,
                    namespace=node.namespace
                )
                pipeline.nodes.append(n)
            pipeline.nodes.remove(node)
        print(pipeline)
This is my before_pipeline run. I add nodes based on how many slns I need to run for (I changed a few of the names here), but the pipeline ends up being the same. Probably a copy is being created and passed to the hooks? Should I, instead, put this functionality in the session and use my own managed session?