Gabriel Aguiar
04/10/2024, 6:00 PMValueError: Pipeline contains no nodes after applying all provided filters When Running a Specific Pipeline
Hello everyone,
I'm currently working on a Kedro project and encountered an issue when attempting to run a specific pipeline using the command kedro run --pipeline=data_cleaning. The command results in the following error message:
In my project, I've defined multiple pipelines including data_processing, data_organization, and data_cleaning. Each pipeline is dynamically registered using a custom hook that replaces the register_pipelines() method based on the catalog's content.
The data_organization pipeline is meant to contain several nodes, and I've verified the following:
- The pipeline is correctly constructed and returns a Pipeline instance with the expected nodes.
- The pipeline's name matches exactly between the command line argument and the name used in the dynamic registration process within register_dynamic_pipelines.
- The hook for dynamic pipeline registration is correctly configured and should be overriding the default register_pipelines().
Despite these checks, running the data_organization pipeline results in the aforementioned error, suggesting that no nodes are available to run after applying the specified filters. This issue does not occur with other pipelines in the project.
I'm seeking insights or suggestions on further troubleshooting steps. Could this be related to how the nodes are tagged or an issue with the pipeline registration process? Any advice on what might be causing this discrepancy and how to resolve it would be greatly appreciated.
Thank you for your help!
catalog.yml code:
# 02 Intermediate
"df_{name_us}_raw":
  type: pickle.PickleDataset
  filepath: data/02_intermediate/df_{name_us}_raw.joblib
  backend: joblib
# 03 Primary
"df_{name_us}_filtered":
  type: pandas.ParquetDataset
  filepath: data/03_primary/df_{name_us}_filtered.pq
"df_{name_us}_{base}_raw":
  type: pandas.ParquetDataset
  filepath: data/03_primary/df_{name_us}_{base}_raw.pq
# 04 Feature
"df_{name_us}_{base}_clean":
  type: pandas.ParquetDataset
  filepath: data/03_primary/df_{name_us}_{base}_clean.pq
  
# 05 Models Input
# 06 Models
# 07 Models Output
# 08 Reporting
Pipeline registry:
"""Project pipelines."""
from typing import Dict
from kedro.framework.project import find_pipelines
from kedro.pipeline import Pipeline
from <http://kedro.io|kedro.io> import DataCatalog
from peloptmize.pipelines import data_processing, data_organization, data_cleaning
def register_pipelines() -> dict[str, Pipeline]:
    # """Register the project's pipelines.
    # Returns:
    #     A mapping from pipeline names to ``Pipeline`` objects.
    # """
    # pipelines = find_pipelines()
    # pipelines["__default__"] = sum(pipelines.values())
    # return pipelines
    """Method that will be assigned to the callable returned by register_dynamic_pipelines(...), by a Hook."""
    raise NotImplementedError("""
        register_pipelines() is expected to be overwritten by ProjectHooks.
        Make sure the hooks is found in peloptimize/hooks and enabled in settings.py
        """)
def register_dynamic_pipelines(catalog: DataCatalog) -> dict[str, Pipeline]:
    """Register the project's pipelines depending on the catalog.
    Create pipelines dynamically based on parameters and datasets defined in the catalog.
    The function must return a callable without any arguments that will replace the
    `register_pipelines()` method in this same module, using an `after_catalog_created_hook`.
    Args:
        catalog: The DataCatalog loaded from the KedroContext.
    Returns:
        A callable that returns a mapping from pipeline names to ``Pipeline`` objects.
    """
    # create pipelines with access to catalog
    data_processing_pp = data_processing.create_pipeline(catalog = catalog)
    data_organization_pp = data_organization.create_pipeline(catalog = catalog)
    data_cleaning_pp = data_cleaning.create_pipeline(catalog = catalog)
    def register_pipelines():
        """Register the project's pipelines.
        Returns:
            A mapping from pipeline names to ``Pipeline`` objects.
        """
        pipelines = {
            "data_processing": data_processing_pp,
            "data_organization": data_organization_pp,
            "data_cleaning": data_cleaning_pp,
        }
        pipelines["__default__"] = data_processing_pp + data_organization_pp + data_cleaning_pp
        pipelines["All"] = data_processing_pp + data_organization_pp + data_cleaning_pp
        return pipelines
    
    return register_pipelines
"""Project pipelines."""
from typing import Dict
from kedro.framework.project import find_pipelines
from kedro.pipeline import Pipeline
from <http://kedro.io|kedro.io> import DataCatalog
from peloptmize.pipelines import data_processing, data_organization, data_cleaning
def register_pipelines() -> dict[str, Pipeline]:
    # """Register the project's pipelines.
    # Returns:
    #     A mapping from pipeline names to ``Pipeline`` objects.
    # """
    # pipelines = find_pipelines()
    # pipelines["__default__"] = sum(pipelines.values())
    # return pipelines
    """Method that will be assigned to the callable returned by register_dynamic_pipelines(...), by a Hook."""
    raise NotImplementedError("""
        register_pipelines() is expected to be overwritten by ProjectHooks.
        Make sure the hooks is found in peloptimize/hooks and enabled in settings.py
        """)
def register_dynamic_pipelines(catalog: DataCatalog) -> dict[str, Pipeline]:
    """Register the project's pipelines depending on the catalog.
    Create pipelines dynamically based on parameters and datasets defined in the catalog.
    The function must return a callable without any arguments that will replace the
    `register_pipelines()` method in this same module, using an `after_catalog_created_hook`.
    Args:
        catalog: The DataCatalog loaded from the KedroContext.
    Returns:
        A callable that returns a mapping from pipeline names to ``Pipeline`` objects.
    """
    # create pipelines with access to catalog
    data_processing_pp = data_processing.create_pipeline(catalog = catalog)
    data_organization_pp = data_organization.create_pipeline(catalog = catalog)
    data_cleaning_pp = data_cleaning.create_pipeline(catalog = catalog)
    def register_pipelines():
        """Register the project's pipelines.
        Returns:
            A mapping from pipeline names to ``Pipeline`` objects.
        """
        pipelines = {
            "data_processing": data_processing_pp,
            "data_organization": data_organization_pp,
            "data_cleaning": data_cleaning_pp,
        }
        pipelines["__default__"] = data_processing_pp + data_organization_pp + data_cleaning_pp
        pipelines["All"] = data_processing_pp + data_organization_pp + data_cleaning_pp
        return pipelines
    
    return register_pipelines
Data_organization pipeline:
from kedro.pipeline import Pipeline, node, pipeline
from typing import Dict, List
import pandas as pd
from .nodes import (
    split_df_to_base
    )
def create_pipeline(**kwargs) -> Pipeline:
    catalog = kwargs['catalog']
    params = catalog.datasets.parameters.load()
    
    nodes = []
    us_to_solve = params.get("us_to_solve", [])
    for name_us in us_to_solve:
        base_params = params.get(f"tags_{name_us}.", {})
        for param_name, columns in base_params.items():
            _, base = param_name.split('.', 1) 
            
            nodes.extend([
                node(
                    func=split_df_to_base,
                    inputs=[f"df_{name_us}_filtered", f"params:{param_name}"],
                    outputs=f"model_{name_us}_{base}_raw",
                    name=f"split_{name_us}_{base}"
                ),
            ])
    return pipeline(nodes)Ankita Katiyar
04/10/2024, 6:08 PMpandas)Ankita Katiyar
04/10/2024, 6:12 PMGabriel Aguiar
04/10/2024, 6:14 PM(peloptimize) C:\Dev\kedro_pelopt\sentinela-palletizing\peloptmize>kedro run --pipeline=data_cleaning
[04/10/24 15:14:11] INFO     Kedro project peloptmize                                                                                                                                              session.py:321
Traceback (most recent call last):
  File "C:\Users\gabriel.gomes\AppData\Local\anaconda3\envs\peloptimize\lib\runpy.py", line 196, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "C:\Users\gabriel.gomes\AppData\Local\anaconda3\envs\peloptimize\lib\runpy.py", line 86, in _run_code
    exec(code, run_globals)
  File "C:\Users\gabriel.gomes\AppData\Local\anaconda3\envs\peloptimize\Scripts\kedro.exe\__main__.py", line 7, in <module>
  File "C:\Users\gabriel.gomes\AppData\Local\anaconda3\envs\peloptimize\lib\site-packages\kedro\framework\cli\cli.py", line 198, in main
    cli_collection()
  File "C:\Users\gabriel.gomes\AppData\Local\anaconda3\envs\peloptimize\lib\site-packages\click\core.py", line 1157, in __call__
    return self.main(*args, **kwargs)
  File "C:\Users\gabriel.gomes\AppData\Local\anaconda3\envs\peloptimize\lib\site-packages\kedro\framework\cli\cli.py", line 127, in main
    super().main(
  File "C:\Users\gabriel.gomes\AppData\Local\anaconda3\envs\peloptimize\lib\site-packages\click\core.py", line 1078, in main
    rv = self.invoke(ctx)
  File "C:\Users\gabriel.gomes\AppData\Local\anaconda3\envs\peloptimize\lib\site-packages\click\core.py", line 1688, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "C:\Users\gabriel.gomes\AppData\Local\anaconda3\envs\peloptimize\lib\site-packages\click\core.py", line 1434, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "C:\Users\gabriel.gomes\AppData\Local\anaconda3\envs\peloptimize\lib\site-packages\click\core.py", line 783, in invoke
    return __callback(*args, **kwargs)
  File "C:\Users\gabriel.gomes\AppData\Local\anaconda3\envs\peloptimize\lib\site-packages\kedro\framework\cli\project.py", line 225, in run
    session.run(
  File "C:\Users\gabriel.gomes\AppData\Local\anaconda3\envs\peloptimize\lib\site-packages\kedro\framework\session\session.py", line 346, in run
    filtered_pipeline = pipeline.filter(
  File "C:\Users\gabriel.gomes\AppData\Local\anaconda3\envs\peloptimize\lib\site-packages\kedro\pipeline\pipeline.py", line 768, in filter
    raise ValueError(
ValueError: Pipeline contains no nodes after applying all provided filtersAnkita Katiyar
04/10/2024, 6:20 PMGabriel Aguiar
04/10/2024, 6:21 PMAnkita Katiyar
04/10/2024, 6:22 PMNok Lam Chan
04/10/2024, 6:39 PMNok Lam Chan
04/10/2024, 6:40 PMGabriel Aguiar
04/10/2024, 6:43 PMShubham Agrawal
06/27/2024, 11:49 AM