Gabriel Aguiar
04/02/2025, 3:51 PMsrc/peloptmize/pipelines/
, where the folder name corresponds to the desired namespace.
• Example:
▪︎ src/peloptmize/pipelines/data_processing/pipeline.py
-> namespace: data_processing
◦ src/peloptmize/pipelines/data_science/pipeline.py
-> namespace: data_science
◦ *Goal:*I want Kedro to dynamically infer the namespace of each pipeline based on the project's folder structure, without explicitly defining namespaces in nodes or pipelines.
◦ Also, I want to measure the execution time of each pipeline.
Hook Code:
To measure execution time and infer namespaces, I've implemented the following hook:
from kedro.framework.context import KedroContext
from kedro.framework.hooks import hook_impl
from kedro.framework.project import pipelines
from kedro.io import DataCatalog
import os
import time
import pandas as pd
from collections import defaultdict
from kedro.pipeline import Pipeline
from pathlib import Path
class ProjectHooks:
def __init__(self):
self._pipeline_times = defaultdict(float)
self._start_node_time = {}
self._node_to_pipeline = {}
self._printed = False
@hook_impl
def after_context_created(self, context: KedroContext) -> None:
# ... (your databricks code) ...
context.catalog
@hook_impl
def after_catalog_created(self, catalog: DataCatalog, conf_catalog) -> None:
pipeline_registry.register_pipelines = pipeline_registry.register_dynamic_pipelines(catalog)
pipelines.configure("peloptmize.pipeline_registry")
@hook_impl
def before_pipeline_run(self, pipeline: Pipeline, run_params, catalog):
filepath = pipeline.filepath
path = Path(filepath)
parts = path.parts
if "pipelines" in parts:
namespace_index = parts.index("pipelines") + 1
if namespace_index < len(parts) - 1:
namespace = parts[namespace_index]
else:
namespace = "default"
else:
namespace = "default"
for node in pipeline.nodes:
node_name = node.name
self._node_to_pipeline[node_name] = namespace
print(f"Node: {node_name}, Namespace: {namespace}") # Added logs
@hook_impl
def before_node_run(self, node, catalog, inputs):
self._start_node_time[node.name] = time.time()
@hook_impl
def after_node_run(self, node, catalog, inputs, outputs):
start_time = self._start_node_time.get(node.name)
if start_time:
duration = time.time() - start_time
subpipeline_name = self._node_to_pipeline.get(node.name, "unknown")
self._pipeline_times[subpipeline_name] += duration
@hook_impl
def after_pipeline_run(self, pipeline, run_params, catalog):
if not self._printed:
self._printed = True
df = pd.DataFrame.from_dict(
self._pipeline_times, orient="index", columns=["execution_time_seconds"]
).reset_index(names="subpipeline")
df = df.sort_values("execution_time_seconds", ascending=False)
print("\n" + "=" * 60)
print("TEMPOS DE EXECUÇÃO POR SUBPIPELINE (dentro de __default__ ou All)")
print("=" * 60)
print(df.to_string(index=False, float_format="%.2f"))
print("=" * 60 + "\n")
Problems:
◦ *Namespace Issue (Without Explicit Namespaces):*When I do not explicitly define namespaces in my pipelines or nodes, execution times are aggregated under the name "no_namespace," indicating that nodes are not being correctly associated with their inferred namespaces.
◦ *Catalog Issue (With Namespaces):*However, when I do use namespaces in my pipelines, I encounter a "dataset not found" error when executing kedro run
, even though the dataset is listed in my catalog.yml
.
ValueError: Pipeline input(s) {'generate_constraints.constraints_US8',...
### The generate_constraints in this case is the name of the namespace.
Questions:
• How can I resolve the "dataset not found" problem in the catalog.yml
when using namespaces?
• Are there more robust approaches to handling dynamic namespaces and time measurement in different environments?
• Any help or suggestions would be greatly appreciated!
kedro 0.19.5
kedro-datasets 3.0.1Huong Nguyen
04/02/2025, 4:30 PMfor node in pipeline.nodes:
node_name = node.name
self._node_to_pipeline[node_name] = namespace
print(f"Node: {node_name}, Namespace: {namespace}")
Therefore it might have grouped under "no_namespace"
Also, it's generally best to assign namespaces at the pipeline level rather than the node level to ensure consistency.Huong Nguyen
04/02/2025, 4:30 PMnamespace_name.dataset_name
. This means you need to update your dataset names in your catalog.yml
to reflect this change.Gabriel Aguiar
04/02/2025, 4:39 PMHuong Nguyen
04/02/2025, 4:43 PMHuong Nguyen
04/02/2025, 4:46 PMnamespace="data_processing",
inputs={"companies", "shuttles", "reviews"}, # Inputs remain the same, without namespace prefix
outputs={"model_input_table"}, # Outputs remain the same, without namespace prefix
Huong Nguyen
04/02/2025, 4:46 PMdata_processing.model_input_table
for exampleGabriel Aguiar
04/02/2025, 4:52 PMAnkita Katiyar
04/03/2025, 8:43 AM