Marc Gris
08/24/2023, 10:20 AMkedro run --params
etc…)
Somewhere in a github issues I had found something that looked more or less like this:
from kedro.framework.session import get_current_session
current_session = get_current_session()
current_context = session.load_context()
current_params = context.catalog.load("parameters")
However, this is the old api and does not work anymore. 😕
How can we access the current session at runtime in pipeline.py
?
For now I’m “condemned” to either hard code things manually or to do this ugly and not really useful hack
from kedro.framework.session import KedroSession
project_path = paths.PROJECT_ROOT_DIR.as_posix()
static_session = KedroSession.create(project_path=project_path)
....
If someone has a solution, please let me know.
Many thanks
M
P.S: I remember a mention being made about using the after_context_created
hook, but from the depth of my ignorance, I have no clue how this could be of help in what I want to achieve. 🙁Lodewic van Twillert
08/24/2023, 10:47 AMafter_context_created
after_catalog_created
hook
2. Replace the register_pipelines()
function with a custom register_dynamic_pipelines(catalog: DataCatalog)
function
a. If you can pass the catalog
to create pipelines, you can access datasets and parameters to dynamically build your pipeline!
3. Use create_pipeline(catalog: DataCatalog)
functions to create your pipelines !
Ill share my approach to create the pipelines. Of course you would need to extend this example if you also want to dynamically create DataCatalog entries - to save the outputs of your dynamic pipeline:) But im leaving that out here to save space. Here are my steps..
`ProjectHooks`: in hooks.py
Make sure to add it to your settings.py
, on the commented line saying HOOKS=(ProjectHooks(),)
from kedro.framework.context import KedroContext
from kedro.framework.hooks import hook_impl
from kedro.framework.project import pipelines
from <http://kedro.io|kedro.io> import DataCatalog
from yourpackage import pipeline_registry
class ProjectHooks:
@hook_impl
def after_context_created(self, context: KedroContext) -> None:
context.catalog
@hook_impl
def after_catalog_created(self, catalog: DataCatalog, conf_catalog) -> None:
"""Hook to fill in and extend templated pipelines."""
pipeline_registry.register_pipelines = pipeline_registry.register_dynamic_pipelines(catalog)
pipelines.configure("yourpackage.pipeline_registry")
Notice that we overwrite the pipeline_registry.register_pipelines
method with pipeline_registry.register_dynamic_pipelines()
in the hook.
In your pipeline_registry.py
use the following:
from <http://kedro.io|kedro.io> import DataCatalog
from kedro.pipeline import Pipeline
def register_pipelines():
"""Method that will be assigned to the callable returned by register_dynamic_pipelines(...), by a Hook."""
raise NotImplementedError("""
register_pipelines() is expected to be overwritten by ProjectHooks.
Make sure the hooks is found in hooks.py and enabled in settings.py
""")
def register_dynamic_pipelines(catalog: DataCatalog) -> dict[str, Pipeline]:
"""Register the project's pipelines depending on the catalog.
Create pipelines dynamically based on parameters and datasets defined in the catalog.
The function must return a callable without any arguments that will replace the `register_pipelines()` method
in this same module, using an `after_catalog_created_hook`.
Args:
catalog: The DataCatalog loaded from the KedroContext.
Returns:
A callable that returns a mapping from pipeline names to ``Pipeline`` objects.
"""
# create pipelines with access to catalog
my_pipeline = create_pipeline(catalog=catalog)
def register_pipelines():
"""Register the project's pipelines.
Returns:
A mapping from pipeline names to ``Pipeline`` objects.
"""
pipelines = {
"__default__": my_pipeline
}
return pipelines
return register_pipelines
This uses a create_pipeline(catalog=catalog)
method, for example something like this!
from <http://kedro.io|kedro.io> import DataCatalog
def create_pipeline(catalog: DataCatalog):
parameters = catalog.datasets.parameters.load()
models = parameters["models"] # a dictionary with the keys being the model labels
dynamic_nodes = []
for model, parameters in models.items():
model_nodes = [
node(
func=fit_model,
inputs=dict(
X="X_train",
y="y_train",
model_type=f"params:{model}.class",
fit_kwargs=f"params:{model}.fit_kwargs",
),
outputs=f"{model}.fitted",
),
node(
func=score_model,
inputs=dict(
X="X_test",
y="y_test",
model=f"{model}.fitted",
),
outputs=f"{model}.score",
),
]
dynamic_nodes.extend(dynamic_nodes)
output_pipeline = pipeline(dynamic_nodes)
return output_pipeline
Marc Gris
08/24/2023, 12:05 PMLodewic van Twillert
08/24/2023, 12:06 PMcatalog.yml
, should have been this:
models:
linear_regression:
class: sklearn.linear_model.LinearRegression
fit_kwargs:
fit_intercept: true
decision_tree:
class: sklearn.tree.DecisionTreeRegressor
fit_kwargs:
criterion: squared_error
quantile_regression:
class: sklearn.linear_model.QuantileRegressor
fit_kwargs:
fit_intercept: true
quantile: 0.5
Nok Lam Chan
08/24/2023, 12:32 PMLodewic van Twillert
08/24/2023, 12:35 PMNok Lam Chan
08/24/2023, 12:35 PMLodewic van Twillert
08/24/2023, 12:36 PMNok Lam Chan
08/24/2023, 12:38 PMMarc Gris
08/24/2023, 1:07 PMfrom kedro.framework.session import get_current_session
?
Thx
MNok Lam Chan
08/24/2023, 1:40 PMafter_context_created
or after_catalog_created
. What do you need to do? Do you just need params
to create your pipeline?Marc Gris
08/24/2023, 2:09 PMdef create_pipeline(**kwargs) -> Pipeline:
all_models = ['model_a', 'model_b', 'model_c' ... ]
all_pipelines = []
for model in all_models:
model_pipeline = pipeline([
# MODEL INPUT PREP
node(
func=functions.prepare_model_input,
inputs=["featurized_users",
"featurized_items",
"featurized_interactions",
f"params:models.{model}.name",
f"params:models.{model}.params.input_prep"
],
outputs=f"{model}_input",
name=f"{model}_input_preparation_node",
tags=['model', 'input', model])
,
[ other nodes ...]
all_pipelines.append(model_pipeline)
return sum(all_pipelines)
What would be an elegant / proper way to do this (avoiding the hardcoding, and ideally accessing "params:models"
to iterate over.
I do hear and understand Kedro’s stance for static / data centric pipelines…
But one must reckon that if one has dozens of models, creating their pipelines manually would be both tedious and so redundant / “boilerplaty”…
Thx 🙂
P.S: May be what was shared by Lodevic completely “fits my bill” and solves my problem. But I’m stuck at work on something and did have time to dive into his suggestion.Nok Lam Chan
08/24/2023, 2:19 PMLodewic van Twillert
08/24/2023, 2:21 PMMarc Gris
08/24/2023, 2:48 PMNok Lam Chan
08/24/2023, 2:59 PMLodewic van Twillert
08/24/2023, 3:08 PMNok Lam Chan
08/24/2023, 3:11 PMIñigo Hidalgo
08/25/2023, 8:58 AMdef after_context_created(self, context: KedroContext) -> None:
context.catalog
Lodewic van Twillert
08/25/2023, 9:05 AMIñigo Hidalgo
08/25/2023, 9:05 AMLodewic van Twillert
08/25/2023, 9:18 AMkedro run
breaks - but kedro viz
will be fine, but here is why:
The reason is that when you call context.catalog
, then the after_catalog_created()
hooks are triggered. Normally, the catalog seems to be created After register_pipelines()
is required. So, when our register_pipelines()
function is not replaced then it will raise an error.
So calling context.catalog
after the context is created, we force the after_catalog_created()
to be triggered earlier than usual-> therefore overwriting the register_pipelines()
function before it is required.
Here is the _get_catalog()
method from kedro where you can see that the after_catalog_created()
hook is triggered 👍
https://github.com/kedro-org/kedro/blob/main/kedro/framework/context/context.py#L255C5-L295C23
--- I think kedro viz
also creates the catalog earlier than kedro run
would, so for viz it is not required to