Zemeio
07/12/2023, 9:35 AMJuan Luis
07/12/2023, 1:03 PMMarc Gris
07/19/2023, 8:00 AMcatalog
in the pipeline.py
to create multiple pipelines dynamically, based on values specified in the parameter files.
So far, I had “hacked my way around” by adding this in pipeline.py
project_path = paths.PROJECT_ROOT_DIR.as_posix()
session = KedroSession.create(project_path=project_path)
context = session.load_context()
catalog = context.catalog
It worked just fine.
However… for better “customer segregation” I have recently separated the conf for our different customers in this way:
conf/customer_{x,y,z}/{base,local}
Thanks (or because) to this, --conf-source
is now a mandatory argument to kedro run
(which is what we wanted)
However the “hack” above (in pipeline.py
) does not work anymore…
So, instead of re-instantiating a session / context / catalog, is there away to access the “actual runtime” session / context / catalog ?
Thx in advance,
M
In our case, this very convenient / re-assuring sinceJuan Luis
07/19/2023, 8:09 AMHowever the “hack” above (incould you elaborate? is it something that stopped working because of a change we did in Kedro recently?) does not work anymore…pipeline.py
Marc Gris
07/19/2023, 8:14 AMconf_source
is left unspecified in KedroSession.create
it raises an error… (i.e the desire “customer conf isolation”)ImportError: cannot import name 'get_current_session' from 'kedro.framework.session'
Juan Luis
07/19/2023, 9:02 AMNok Lam Chan
07/19/2023, 9:30 AMZemeio
07/19/2023, 9:30 AMNok Lam Chan
07/19/2023, 9:30 AMZemeio
07/19/2023, 9:33 AMMarc Gris
07/19/2023, 9:59 AMafter_context_created
hook 👍🏼
(glad to hear that my flood of questions is not annoying 😉 )Nok Lam Chan
07/19/2023, 12:03 PMMarc Gris
07/19/2023, 12:41 PM"params:schema.tables"
to dynamically generate the different nodes for all those tables.
Will try to do so with the hook you suggested.Zemeio
07/20/2023, 12:59 AMbefore_pipeline_run
hook.Nok Lam Chan
07/26/2023, 10:16 AMbefore_pipeline_run
- could you share how you make the change?runner = runner or SequentialRunner()
hook_manager.hook.before_pipeline_run( # pylint: disable=no-member
run_params=record_data, pipeline=filtered_pipeline, catalog=catalog
)
try:
run_result = runner.run(
filtered_pipeline, catalog, hook_manager, session_id
)
A snippet of code coming from kedro/framework/session/session.py
, as long as it mutates the pipeline object it should workZemeio
07/27/2023, 1:13 AM@hook_impl
def before_pipeline_run(self, pipeline: Pipeline):
multi_nodes = []
for node in pipeline.nodes:
if [i for i in node.inputs if "{sln}" in i] or [i for i in node.outputs if "{sln}" in i]:
multi_nodes.append(node)
for node in multi_nodes:
for sln in self.SLNS:
n = Node(
node.func,
[i.format(sln=sln) for i in node.inputs],
[i.format(sln=sln) for i in node.outputs],
name=node.name + f"_{sln}",
tags=node.tags,
namespace=node.namespace
)
pipeline.nodes.append(n)
pipeline.nodes.remove(node)
print(pipeline)
This is my before_pipeline run. I add nodes based on how many slns I need to run for (I changed a few of the names here), but the pipeline ends up being the same. Probably a copy is being created and passed to the hooks?
Should I, instead, put this functionality in the session and use my own managed session?