Gabriel Aguiar
04/10/2024, 6:00 PMValueError: Pipeline contains no nodes after applying all provided filters
When Running a Specific Pipeline
Hello everyone,
I'm currently working on a Kedro project and encountered an issue when attempting to run a specific pipeline using the command kedro run --pipeline=data_cleaning
. The command results in the following error message:
In my project, I've defined multiple pipelines including data_processing
, data_organization
, and data_cleaning
. Each pipeline is dynamically registered using a custom hook that replaces the register_pipelines()
method based on the catalog's content.
The data_organization
pipeline is meant to contain several nodes, and I've verified the following:
- The pipeline is correctly constructed and returns a Pipeline
instance with the expected nodes.
- The pipeline's name matches exactly between the command line argument and the name used in the dynamic registration process within register_dynamic_pipelines
.
- The hook for dynamic pipeline registration is correctly configured and should be overriding the default register_pipelines()
.
Despite these checks, running the data_organization
pipeline results in the aforementioned error, suggesting that no nodes are available to run after applying the specified filters. This issue does not occur with other pipelines in the project.
I'm seeking insights or suggestions on further troubleshooting steps. Could this be related to how the nodes are tagged or an issue with the pipeline registration process? Any advice on what might be causing this discrepancy and how to resolve it would be greatly appreciated.
Thank you for your help!
catalog.yml code:
# 02 Intermediate
"df_{name_us}_raw":
type: pickle.PickleDataset
filepath: data/02_intermediate/df_{name_us}_raw.joblib
backend: joblib
# 03 Primary
"df_{name_us}_filtered":
type: pandas.ParquetDataset
filepath: data/03_primary/df_{name_us}_filtered.pq
"df_{name_us}_{base}_raw":
type: pandas.ParquetDataset
filepath: data/03_primary/df_{name_us}_{base}_raw.pq
# 04 Feature
"df_{name_us}_{base}_clean":
type: pandas.ParquetDataset
filepath: data/03_primary/df_{name_us}_{base}_clean.pq
# 05 Models Input
# 06 Models
# 07 Models Output
# 08 Reporting
Pipeline registry:
"""Project pipelines."""
from typing import Dict
from kedro.framework.project import find_pipelines
from kedro.pipeline import Pipeline
from <http://kedro.io|kedro.io> import DataCatalog
from peloptmize.pipelines import data_processing, data_organization, data_cleaning
def register_pipelines() -> dict[str, Pipeline]:
# """Register the project's pipelines.
# Returns:
# A mapping from pipeline names to ``Pipeline`` objects.
# """
# pipelines = find_pipelines()
# pipelines["__default__"] = sum(pipelines.values())
# return pipelines
"""Method that will be assigned to the callable returned by register_dynamic_pipelines(...), by a Hook."""
raise NotImplementedError("""
register_pipelines() is expected to be overwritten by ProjectHooks.
Make sure the hooks is found in peloptimize/hooks and enabled in settings.py
""")
def register_dynamic_pipelines(catalog: DataCatalog) -> dict[str, Pipeline]:
"""Register the project's pipelines depending on the catalog.
Create pipelines dynamically based on parameters and datasets defined in the catalog.
The function must return a callable without any arguments that will replace the
`register_pipelines()` method in this same module, using an `after_catalog_created_hook`.
Args:
catalog: The DataCatalog loaded from the KedroContext.
Returns:
A callable that returns a mapping from pipeline names to ``Pipeline`` objects.
"""
# create pipelines with access to catalog
data_processing_pp = data_processing.create_pipeline(catalog = catalog)
data_organization_pp = data_organization.create_pipeline(catalog = catalog)
data_cleaning_pp = data_cleaning.create_pipeline(catalog = catalog)
def register_pipelines():
"""Register the project's pipelines.
Returns:
A mapping from pipeline names to ``Pipeline`` objects.
"""
pipelines = {
"data_processing": data_processing_pp,
"data_organization": data_organization_pp,
"data_cleaning": data_cleaning_pp,
}
pipelines["__default__"] = data_processing_pp + data_organization_pp + data_cleaning_pp
pipelines["All"] = data_processing_pp + data_organization_pp + data_cleaning_pp
return pipelines
return register_pipelines
"""Project pipelines."""
from typing import Dict
from kedro.framework.project import find_pipelines
from kedro.pipeline import Pipeline
from <http://kedro.io|kedro.io> import DataCatalog
from peloptmize.pipelines import data_processing, data_organization, data_cleaning
def register_pipelines() -> dict[str, Pipeline]:
# """Register the project's pipelines.
# Returns:
# A mapping from pipeline names to ``Pipeline`` objects.
# """
# pipelines = find_pipelines()
# pipelines["__default__"] = sum(pipelines.values())
# return pipelines
"""Method that will be assigned to the callable returned by register_dynamic_pipelines(...), by a Hook."""
raise NotImplementedError("""
register_pipelines() is expected to be overwritten by ProjectHooks.
Make sure the hooks is found in peloptimize/hooks and enabled in settings.py
""")
def register_dynamic_pipelines(catalog: DataCatalog) -> dict[str, Pipeline]:
"""Register the project's pipelines depending on the catalog.
Create pipelines dynamically based on parameters and datasets defined in the catalog.
The function must return a callable without any arguments that will replace the
`register_pipelines()` method in this same module, using an `after_catalog_created_hook`.
Args:
catalog: The DataCatalog loaded from the KedroContext.
Returns:
A callable that returns a mapping from pipeline names to ``Pipeline`` objects.
"""
# create pipelines with access to catalog
data_processing_pp = data_processing.create_pipeline(catalog = catalog)
data_organization_pp = data_organization.create_pipeline(catalog = catalog)
data_cleaning_pp = data_cleaning.create_pipeline(catalog = catalog)
def register_pipelines():
"""Register the project's pipelines.
Returns:
A mapping from pipeline names to ``Pipeline`` objects.
"""
pipelines = {
"data_processing": data_processing_pp,
"data_organization": data_organization_pp,
"data_cleaning": data_cleaning_pp,
}
pipelines["__default__"] = data_processing_pp + data_organization_pp + data_cleaning_pp
pipelines["All"] = data_processing_pp + data_organization_pp + data_cleaning_pp
return pipelines
return register_pipelines
Data_organization pipeline:
from kedro.pipeline import Pipeline, node, pipeline
from typing import Dict, List
import pandas as pd
from .nodes import (
split_df_to_base
)
def create_pipeline(**kwargs) -> Pipeline:
catalog = kwargs['catalog']
params = catalog.datasets.parameters.load()
nodes = []
us_to_solve = params.get("us_to_solve", [])
for name_us in us_to_solve:
base_params = params.get(f"tags_{name_us}.", {})
for param_name, columns in base_params.items():
_, base = param_name.split('.', 1)
nodes.extend([
node(
func=split_df_to_base,
inputs=[f"df_{name_us}_filtered", f"params:{param_name}"],
outputs=f"model_{name_us}_{base}_raw",
name=f"split_{name_us}_{base}"
),
])
return pipeline(nodes)
Ankita Katiyar
04/10/2024, 6:08 PMpandas
)Ankita Katiyar
04/10/2024, 6:12 PMGabriel Aguiar
04/10/2024, 6:14 PM(peloptimize) C:\Dev\kedro_pelopt\sentinela-palletizing\peloptmize>kedro run --pipeline=data_cleaning
[04/10/24 15:14:11] INFO Kedro project peloptmize session.py:321
Traceback (most recent call last):
File "C:\Users\gabriel.gomes\AppData\Local\anaconda3\envs\peloptimize\lib\runpy.py", line 196, in _run_module_as_main
return _run_code(code, main_globals, None,
File "C:\Users\gabriel.gomes\AppData\Local\anaconda3\envs\peloptimize\lib\runpy.py", line 86, in _run_code
exec(code, run_globals)
File "C:\Users\gabriel.gomes\AppData\Local\anaconda3\envs\peloptimize\Scripts\kedro.exe\__main__.py", line 7, in <module>
File "C:\Users\gabriel.gomes\AppData\Local\anaconda3\envs\peloptimize\lib\site-packages\kedro\framework\cli\cli.py", line 198, in main
cli_collection()
File "C:\Users\gabriel.gomes\AppData\Local\anaconda3\envs\peloptimize\lib\site-packages\click\core.py", line 1157, in __call__
return self.main(*args, **kwargs)
File "C:\Users\gabriel.gomes\AppData\Local\anaconda3\envs\peloptimize\lib\site-packages\kedro\framework\cli\cli.py", line 127, in main
super().main(
File "C:\Users\gabriel.gomes\AppData\Local\anaconda3\envs\peloptimize\lib\site-packages\click\core.py", line 1078, in main
rv = self.invoke(ctx)
File "C:\Users\gabriel.gomes\AppData\Local\anaconda3\envs\peloptimize\lib\site-packages\click\core.py", line 1688, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "C:\Users\gabriel.gomes\AppData\Local\anaconda3\envs\peloptimize\lib\site-packages\click\core.py", line 1434, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "C:\Users\gabriel.gomes\AppData\Local\anaconda3\envs\peloptimize\lib\site-packages\click\core.py", line 783, in invoke
return __callback(*args, **kwargs)
File "C:\Users\gabriel.gomes\AppData\Local\anaconda3\envs\peloptimize\lib\site-packages\kedro\framework\cli\project.py", line 225, in run
session.run(
File "C:\Users\gabriel.gomes\AppData\Local\anaconda3\envs\peloptimize\lib\site-packages\kedro\framework\session\session.py", line 346, in run
filtered_pipeline = pipeline.filter(
File "C:\Users\gabriel.gomes\AppData\Local\anaconda3\envs\peloptimize\lib\site-packages\kedro\pipeline\pipeline.py", line 768, in filter
raise ValueError(
ValueError: Pipeline contains no nodes after applying all provided filters
Ankita Katiyar
04/10/2024, 6:20 PMGabriel Aguiar
04/10/2024, 6:21 PMAnkita Katiyar
04/10/2024, 6:22 PMNok Lam Chan
04/10/2024, 6:39 PMNok Lam Chan
04/10/2024, 6:40 PMGabriel Aguiar
04/10/2024, 6:43 PMShubham Agrawal
06/27/2024, 11:49 AM