*Troubleshooting `ValueError: Pipeline contains no...
# questions
g
Troubleshooting
ValueError: Pipeline contains no nodes after applying all provided filters
When Running a Specific Pipeline
Hello everyone, I'm currently working on a Kedro project and encountered an issue when attempting to run a specific pipeline using the command
kedro run --pipeline=data_cleaning
. The command results in the following error message: In my project, I've defined multiple pipelines including
data_processing
,
data_organization
, and
data_cleaning
. Each pipeline is dynamically registered using a custom hook that replaces the
register_pipelines()
method based on the catalog's content. The
data_organization
pipeline is meant to contain several nodes, and I've verified the following: - The pipeline is correctly constructed and returns a
Pipeline
instance with the expected nodes. - The pipeline's name matches exactly between the command line argument and the name used in the dynamic registration process within
register_dynamic_pipelines
. - The hook for dynamic pipeline registration is correctly configured and should be overriding the default
register_pipelines()
. Despite these checks, running the
data_organization
pipeline results in the aforementioned error, suggesting that no nodes are available to run after applying the specified filters. This issue does not occur with other pipelines in the project. I'm seeking insights or suggestions on further troubleshooting steps. Could this be related to how the nodes are tagged or an issue with the pipeline registration process? Any advice on what might be causing this discrepancy and how to resolve it would be greatly appreciated. Thank you for your help! catalog.yml code:
Copy code
# 02 Intermediate
"df_{name_us}_raw":
  type: pickle.PickleDataset
  filepath: data/02_intermediate/df_{name_us}_raw.joblib
  backend: joblib

# 03 Primary
"df_{name_us}_filtered":
  type: pandas.ParquetDataset
  filepath: data/03_primary/df_{name_us}_filtered.pq

"df_{name_us}_{base}_raw":
  type: pandas.ParquetDataset
  filepath: data/03_primary/df_{name_us}_{base}_raw.pq

# 04 Feature
"df_{name_us}_{base}_clean":
  type: pandas.ParquetDataset
  filepath: data/03_primary/df_{name_us}_{base}_clean.pq
  
# 05 Models Input

# 06 Models

# 07 Models Output

# 08 Reporting
Pipeline registry:
Copy code
"""Project pipelines."""
from typing import Dict
from kedro.framework.project import find_pipelines
from kedro.pipeline import Pipeline
from <http://kedro.io|kedro.io> import DataCatalog
from peloptmize.pipelines import data_processing, data_organization, data_cleaning

def register_pipelines() -> dict[str, Pipeline]:
    # """Register the project's pipelines.

    # Returns:
    #     A mapping from pipeline names to ``Pipeline`` objects.
    # """
    # pipelines = find_pipelines()
    # pipelines["__default__"] = sum(pipelines.values())
    # return pipelines

    """Method that will be assigned to the callable returned by register_dynamic_pipelines(...), by a Hook."""
    raise NotImplementedError("""
        register_pipelines() is expected to be overwritten by ProjectHooks.
        Make sure the hooks is found in peloptimize/hooks and enabled in settings.py
        """)

def register_dynamic_pipelines(catalog: DataCatalog) -> dict[str, Pipeline]:
    """Register the project's pipelines depending on the catalog.

    Create pipelines dynamically based on parameters and datasets defined in the catalog.
    The function must return a callable without any arguments that will replace the
    `register_pipelines()` method in this same module, using an `after_catalog_created_hook`.

    Args:
        catalog: The DataCatalog loaded from the KedroContext.

    Returns:
        A callable that returns a mapping from pipeline names to ``Pipeline`` objects.
    """
    # create pipelines with access to catalog

    data_processing_pp = data_processing.create_pipeline(catalog = catalog)
    data_organization_pp = data_organization.create_pipeline(catalog = catalog)
    data_cleaning_pp = data_cleaning.create_pipeline(catalog = catalog)

    def register_pipelines():
        """Register the project's pipelines.

        Returns:
            A mapping from pipeline names to ``Pipeline`` objects.
        """
        pipelines = {
            "data_processing": data_processing_pp,
            "data_organization": data_organization_pp,
            "data_cleaning": data_cleaning_pp,
        }
        pipelines["__default__"] = data_processing_pp + data_organization_pp + data_cleaning_pp
        pipelines["All"] = data_processing_pp + data_organization_pp + data_cleaning_pp
        return pipelines
    
    return register_pipelines
"""Project pipelines."""
from typing import Dict
from kedro.framework.project import find_pipelines
from kedro.pipeline import Pipeline
from <http://kedro.io|kedro.io> import DataCatalog
from peloptmize.pipelines import data_processing, data_organization, data_cleaning

def register_pipelines() -> dict[str, Pipeline]:
    # """Register the project's pipelines.

    # Returns:
    #     A mapping from pipeline names to ``Pipeline`` objects.
    # """
    # pipelines = find_pipelines()
    # pipelines["__default__"] = sum(pipelines.values())
    # return pipelines

    """Method that will be assigned to the callable returned by register_dynamic_pipelines(...), by a Hook."""
    raise NotImplementedError("""
        register_pipelines() is expected to be overwritten by ProjectHooks.
        Make sure the hooks is found in peloptimize/hooks and enabled in settings.py
        """)

def register_dynamic_pipelines(catalog: DataCatalog) -> dict[str, Pipeline]:
    """Register the project's pipelines depending on the catalog.

    Create pipelines dynamically based on parameters and datasets defined in the catalog.
    The function must return a callable without any arguments that will replace the
    `register_pipelines()` method in this same module, using an `after_catalog_created_hook`.

    Args:
        catalog: The DataCatalog loaded from the KedroContext.

    Returns:
        A callable that returns a mapping from pipeline names to ``Pipeline`` objects.
    """
    # create pipelines with access to catalog

    data_processing_pp = data_processing.create_pipeline(catalog = catalog)
    data_organization_pp = data_organization.create_pipeline(catalog = catalog)
    data_cleaning_pp = data_cleaning.create_pipeline(catalog = catalog)

    def register_pipelines():
        """Register the project's pipelines.

        Returns:
            A mapping from pipeline names to ``Pipeline`` objects.
        """
        pipelines = {
            "data_processing": data_processing_pp,
            "data_organization": data_organization_pp,
            "data_cleaning": data_cleaning_pp,
        }
        pipelines["__default__"] = data_processing_pp + data_organization_pp + data_cleaning_pp
        pipelines["All"] = data_processing_pp + data_organization_pp + data_cleaning_pp
        return pipelines
    
    return register_pipelines
Data_organization pipeline:
Copy code
from kedro.pipeline import Pipeline, node, pipeline
from typing import Dict, List
import pandas as pd

from .nodes import (
    split_df_to_base
    )


def create_pipeline(**kwargs) -> Pipeline:
    catalog = kwargs['catalog']
    params = catalog.datasets.parameters.load()
    
    nodes = []
    us_to_solve = params.get("us_to_solve", [])

    for name_us in us_to_solve:
        base_params = params.get(f"tags_{name_us}.", {})
        for param_name, columns in base_params.items():
            _, base = param_name.split('.', 1) 
            
            nodes.extend([
                node(
                    func=split_df_to_base,
                    inputs=[f"df_{name_us}_filtered", f"params:{param_name}"],
                    outputs=f"model_{name_us}_{base}_raw",
                    name=f"split_{name_us}_{base}"
                ),
            ])

    return pipeline(nodes)
a
Hello, sometimes this error shows up due to missing packages - are all the requirements properly installed in your environment? (eg.
pandas
)
The tracebacks are quite long and the real error message tends to get a bit lost in the noise, would be helpful if you could share your stacktrace. (related https://github.com/kedro-org/kedro/issues/2401)
g
Thanks for the answer @Ankita Katiyar 🙂
Copy code
(peloptimize) C:\Dev\kedro_pelopt\sentinela-palletizing\peloptmize>kedro run --pipeline=data_cleaning
[04/10/24 15:14:11] INFO     Kedro project peloptmize                                                                                                                                              session.py:321
Traceback (most recent call last):
  File "C:\Users\gabriel.gomes\AppData\Local\anaconda3\envs\peloptimize\lib\runpy.py", line 196, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "C:\Users\gabriel.gomes\AppData\Local\anaconda3\envs\peloptimize\lib\runpy.py", line 86, in _run_code
    exec(code, run_globals)
  File "C:\Users\gabriel.gomes\AppData\Local\anaconda3\envs\peloptimize\Scripts\kedro.exe\__main__.py", line 7, in <module>
  File "C:\Users\gabriel.gomes\AppData\Local\anaconda3\envs\peloptimize\lib\site-packages\kedro\framework\cli\cli.py", line 198, in main
    cli_collection()
  File "C:\Users\gabriel.gomes\AppData\Local\anaconda3\envs\peloptimize\lib\site-packages\click\core.py", line 1157, in __call__
    return self.main(*args, **kwargs)
  File "C:\Users\gabriel.gomes\AppData\Local\anaconda3\envs\peloptimize\lib\site-packages\kedro\framework\cli\cli.py", line 127, in main
    super().main(
  File "C:\Users\gabriel.gomes\AppData\Local\anaconda3\envs\peloptimize\lib\site-packages\click\core.py", line 1078, in main
    rv = self.invoke(ctx)
  File "C:\Users\gabriel.gomes\AppData\Local\anaconda3\envs\peloptimize\lib\site-packages\click\core.py", line 1688, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "C:\Users\gabriel.gomes\AppData\Local\anaconda3\envs\peloptimize\lib\site-packages\click\core.py", line 1434, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "C:\Users\gabriel.gomes\AppData\Local\anaconda3\envs\peloptimize\lib\site-packages\click\core.py", line 783, in invoke
    return __callback(*args, **kwargs)
  File "C:\Users\gabriel.gomes\AppData\Local\anaconda3\envs\peloptimize\lib\site-packages\kedro\framework\cli\project.py", line 225, in run
    session.run(
  File "C:\Users\gabriel.gomes\AppData\Local\anaconda3\envs\peloptimize\lib\site-packages\kedro\framework\session\session.py", line 346, in run
    filtered_pipeline = pipeline.filter(
  File "C:\Users\gabriel.gomes\AppData\Local\anaconda3\envs\peloptimize\lib\site-packages\kedro\pipeline\pipeline.py", line 768, in filter
    raise ValueError(
ValueError: Pipeline contains no nodes after applying all provided filters
a
Do you have/can you post the project to github so I can try it out? 😄
g
I have some private data in there, so I will open a rep with some changes, okay? @Ankita Katiyar 😄
a
Yes ofc, just a minimal example would help!
🙏 1
n
What is kwargs[catalog]? How are you passing this into the function?
Can you put a breakpoint there to make sure your code entered the loop or is just empty list
g
I think i already found the reason @Nok Lam Chan
👍 1
s
@Gabriel Aguiar Could you share what was the reason? I am having similar problem 😅 Edit: Figured it was due to missing library installation
218 Views