Hello Kedro community, I'm encountering issues whi...
# questions
g
Hello Kedro community, I'm encountering issues while trying to measure the execution time of my pipelines and manage namespaces dynamically, and I'm facing two distinct problems depending on whether or not I use namespaces. Context: ◦ *Project Structure:*My pipelines are organized into folders within
src/peloptmize/pipelines/
, where the folder name corresponds to the desired namespace. • Example: ▪︎
src/peloptmize/pipelines/data_processing/pipeline.py
-> namespace:
data_processing
src/peloptmize/pipelines/data_science/pipeline.py
-> namespace:
data_science
◦ *Goal:*I want Kedro to dynamically infer the namespace of each pipeline based on the project's folder structure, without explicitly defining namespaces in nodes or pipelines. ◦ Also, I want to measure the execution time of each pipeline. Hook Code: To measure execution time and infer namespaces, I've implemented the following hook:
Copy code
from kedro.framework.context import KedroContext
from kedro.framework.hooks import hook_impl
from kedro.framework.project import pipelines
from kedro.io import DataCatalog
import os
import time
import pandas as pd
from collections import defaultdict
from kedro.pipeline import Pipeline
from pathlib import Path

class ProjectHooks:
    def __init__(self):
        self._pipeline_times = defaultdict(float)
        self._start_node_time = {}
        self._node_to_pipeline = {}
        self._printed = False

    @hook_impl
    def after_context_created(self, context: KedroContext) -> None:
        # ... (your databricks code) ...
        context.catalog

    @hook_impl
    def after_catalog_created(self, catalog: DataCatalog, conf_catalog) -> None:
        pipeline_registry.register_pipelines = pipeline_registry.register_dynamic_pipelines(catalog)
        pipelines.configure("peloptmize.pipeline_registry")

    @hook_impl
    def before_pipeline_run(self, pipeline: Pipeline, run_params, catalog):
        filepath = pipeline.filepath
        path = Path(filepath)
        parts = path.parts
        if "pipelines" in parts:
            namespace_index = parts.index("pipelines") + 1
            if namespace_index < len(parts) - 1:
                namespace = parts[namespace_index]
            else:
                namespace = "default"
        else:
            namespace = "default"

        for node in pipeline.nodes:
            node_name = node.name
            self._node_to_pipeline[node_name] = namespace
            print(f"Node: {node_name}, Namespace: {namespace}") # Added logs

    @hook_impl
    def before_node_run(self, node, catalog, inputs):
        self._start_node_time[node.name] = time.time()

    @hook_impl
    def after_node_run(self, node, catalog, inputs, outputs):
        start_time = self._start_node_time.get(node.name)
        if start_time:
            duration = time.time() - start_time
            subpipeline_name = self._node_to_pipeline.get(node.name, "unknown")
            self._pipeline_times[subpipeline_name] += duration

    @hook_impl
    def after_pipeline_run(self, pipeline, run_params, catalog):
        if not self._printed:
            self._printed = True
            df = pd.DataFrame.from_dict(
                self._pipeline_times, orient="index", columns=["execution_time_seconds"]
            ).reset_index(names="subpipeline")
            df = df.sort_values("execution_time_seconds", ascending=False)

            print("\n" + "=" * 60)
            print("TEMPOS DE EXECUÇÃO POR SUBPIPELINE (dentro de __default__ ou All)")
            print("=" * 60)
            print(df.to_string(index=False, float_format="%.2f"))
            print("=" * 60 + "\n")
Problems: ◦ *Namespace Issue (Without Explicit Namespaces):*When I do not explicitly define namespaces in my pipelines or nodes, execution times are aggregated under the name "no_namespace," indicating that nodes are not being correctly associated with their inferred namespaces. ◦ *Catalog Issue (With Namespaces):*However, when I do use namespaces in my pipelines, I encounter a "dataset not found" error when executing
kedro run
, even though the dataset is listed in my
catalog.yml
.
Copy code
ValueError: Pipeline input(s) {'generate_constraints.constraints_US8',...
### The generate_constraints in this case is the name of the namespace.
Questions: • How can I resolve the "dataset not found" problem in the
catalog.yml
when using namespaces? • Are there more robust approaches to handling dynamic namespaces and time measurement in different environments? • Any help or suggestions would be greatly appreciated! kedro 0.19.5 kedro-datasets 3.0.1
h
hey @Gabriel Aguiar , 1. Regarding the Namespace Issue (Without Explicit Namespaces) Looking at your code snippet, I can’t really see where you actually assign the namespace. It seems like you’re just creating an empty object:
Copy code
for node in pipeline.nodes:
    node_name = node.name
    self._node_to_pipeline[node_name] = namespace
    print(f"Node: {node_name}, Namespace: {namespace}")
Therefore it might have grouped under
"no_namespace"
Also, it's generally best to assign namespaces at the pipeline level rather than the node level to ensure consistency.
2. Catalog Issue (With Namespaces) When you namespace your pipeline, Kedro prefixes all inputs, outputs, and datasets, for example
namespace_name.dataset_name
. This means you need to update your dataset names in your
catalog.yml
to reflect this change.
g
@Huong Nguyen Is there any way to avoid having to update all dataset names in the catalog when using namespaces? I have a large number of datasets across two catalogs — one for production and another for testing — so manually editing them would be very inefficient.
h
Currently, I'm not aware of a way to avoid updating the catalog, but let me double check with my colleagues.
❤️ 1
ah actually you can explicitly override this in the namespaced pipeline parameters, so you don’t need to modify your catalog. Here’s an example:
Copy code
namespace="data_processing",
inputs={"companies", "shuttles", "reviews"},  # Inputs remain the same, without namespace prefix
outputs={"model_input_table"},  # Outputs remain the same, without namespace prefix
🥳 1
if you dont explicitly define it like the above, it will be
data_processing.model_input_table
for example
❤️ 1
g
I get it, thanks 😄
np 1
a
Hey, you could also use dataset factories to template namespaced datasets if you didn’t want to explicitly specify inputs and outputs mapping in each pipeline: https://docs.kedro.org/en/stable/data/kedro_dataset_factories.html
thankyou 1