Viktoriia
03/24/2025, 8:41 AMkedro run
command when creating a pipeline, i.e. within the function def create_pipeline(**kwargs)
? I'm most interested in conf-source
and env
.Hall
03/24/2025, 8:42 AMdatajoely
03/24/2025, 9:21 AMdatajoely
03/24/2025, 9:22 AMdatajoely
03/24/2025, 9:23 AMbefore_pipeline_run
/ after_context_created
to retrieve the env
.
I'm not sure we expose the --conf-source
though, that's something that is consumed by settings.py
before kedro really starts upViktoriia
03/24/2025, 10:36 AMenv
somehow further into the pipeline?Viktoriia
03/24/2025, 10:49 AM--env
and --conf-source
to access my parameters via OmegaConfigLoader like this:
def create_pipeline(**kwargs) -> Pipeline:
conf_path = str(project_path / settings.CONF_SOURCE)
conf_loader = OmegaConfigLoader(conf_source=conf_path, env=env)
The problem is that settings.CONF_SOURCE
doesn't get updated with --conf-source
, so I don't know where my config liesdatajoely
03/24/2025, 12:10 PMViktoriia
03/24/2025, 12:54 PMkedro run --env dev --conf-source my_conf_path
datajoely
03/24/2025, 1:26 PMdatajoely
03/24/2025, 1:26 PMViktoriia
03/24/2025, 1:50 PM# settings.py
CONF_SOURCE = "CONF_SOURCE_DEFINED_IN_SETTINGS"
# piepeline.py
def create_pipeline(**kwargs) -> Pipeline:
conf_path = str(settings.CONF_SOURCE)
print("Config path", conf_path)
conf_loader = OmegaConfigLoader(conf_source=conf_path, env='local')
print("Config parameters", conf_loader['parameters'])
return Pipeline([])
Traceback:
$ kedro run --env dev --conf-source conf
[03/24/25 14:49:30] INFO Using 'C:\ProgramData\Miniconda3\envs\rwk-dev-py11-env\Lib\site-packages\kedro\framework\project\rich_logging.yml' as logging configuration. __init__.py:270
Config path CONF_SOURCE_DEFINED_IN_SETTINGS
╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
│ C:\ProgramData\Miniconda3\envs\rwk-dev-py11-env\Scripts\kedro-script.py:9 in <module> │
│ │
│ C:\ProgramData\Miniconda3\envs\rwk-dev-py11-env\Lib\site-packages\kedro\framework\cli\cli.py:263 │
│ in main │
│ │
│ C:\ProgramData\Miniconda3\envs\rwk-dev-py11-env\Lib\site-packages\click\core.py:1161 in __call__ │
│ │
│ C:\ProgramData\Miniconda3\envs\rwk-dev-py11-env\Lib\site-packages\kedro\framework\cli\cli.py:163 │
│ in main │
│ │
│ C:\ProgramData\Miniconda3\envs\rwk-dev-py11-env\Lib\site-packages\click\core.py:1082 in main │
│ │
│ C:\ProgramData\Miniconda3\envs\rwk-dev-py11-env\Lib\site-packages\click\core.py:1697 in invoke │
│ │
│ C:\ProgramData\Miniconda3\envs\rwk-dev-py11-env\Lib\site-packages\click\core.py:1443 in invoke │
│ │
│ C:\ProgramData\Miniconda3\envs\rwk-dev-py11-env\Lib\site-packages\click\core.py:788 in invoke │
│ │
│ C:\ProgramData\Miniconda3\envs\rwk-dev-py11-env\Lib\site-packages\kedro\framework\cli\project.py │
│ :228 in run │
│ │
│ C:\ProgramData\Miniconda3\envs\rwk-dev-py11-env\Lib\site-packages\kedro\framework\session\sessio │
│ n.py:346 in run │
│ │
│ C:\ProgramData\Miniconda3\envs\rwk-dev-py11-env\Lib\site-packages\kedro\framework\project\__init │
│ __.py:166 in inner │
│ │
│ C:\ProgramData\Miniconda3\envs\rwk-dev-py11-env\Lib\site-packages\kedro\framework\project\__init │
│ __.py:211 in _load_data │
│ │
│ C:\Users\vila3\Projects\Restwert-Kompetenz\src\rwk_analytics\pipeline_registry.py:13 in │
│ register_pipelines │
│ │
│ 10 │ Returns: │
│ 11 │ │ A mapping from pipeline names to ``Pipeline`` objects. │
│ 12 │ """ │
│ ❱ 13 │ pipelines = find_pipelines() │
│ 14 │ pipelines["__default__"] = sum(pipelines.values()) │
│ 15 │ return pipelines │
│ 16 │
│ │
│ C:\ProgramData\Miniconda3\envs\rwk-dev-py11-env\Lib\site-packages\kedro\framework\project\__init │
│ __.py:457 in find_pipelines │
│ │
│ C:\ProgramData\Miniconda3\envs\rwk-dev-py11-env\Lib\site-packages\kedro\framework\project\__init │
│ __.py:354 in _create_pipeline │
│ │
│ C:\Users\vila3\Projects\Restwert-Kompetenz\src\rwk_analytics\pipelines\preprocess\pipeline.py:20 │
│ in create_pipeline │
│ │
│ 17 │ conf_path = str(settings.CONF_SOURCE) │
│ 18 │ print("Config path", conf_path) │
│ 19 │ conf_loader = OmegaConfigLoader(conf_source=conf_path, env='local') │
│ ❱ 20 │ print("Config parameters", conf_loader['parameters']) │
│ 21 │ return Pipeline([]) │
│ 22 │ │
│ 23 │ logger = Logger() │
│ │
│ C:\ProgramData\Miniconda3\envs\rwk-dev-py11-env\Lib\site-packages\kedro\config\omegaconf_config. │
│ py:212 in __getitem__ │
│ │
│ C:\ProgramData\Miniconda3\envs\rwk-dev-py11-env\Lib\site-packages\kedro\config\omegaconf_config. │
│ py:302 in load_and_merge_dir_config │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
MissingConfigException: Given configuration path either does not exist or is not a valid directory: CONF_SOURCE_DEFINED_IN_SETTINGS
datajoely
03/24/2025, 1:51 PMdatajoely
03/24/2025, 1:51 PMdatajoely
03/24/2025, 1:51 PMViktoriia
03/24/2025, 1:57 PMparameters.yaml
, like:
# parameters.yaml
data1:
source: source1
query_file: data1.sql
data2:
source: source2
query_file: data2.sql
Instead of making a long list of nodes, I just do a for loop iterating over these entries, like
def create_pipeline(**kwargs) -> Pipeline:
conf_loader = OmegaConfigLoader(conf_source=conf_path, env='local')
entities = conf_loader['parameters']
node_list = []
for entity in entities:
node_list.append(
node(
func=extract,
inputs=[
f"params:{entity}.query_file",
f"params:{entity}.source",
],
outputs=entity,
)
)
return pipeline(node_list)
datajoely
03/24/2025, 1:58 PMdatajoely
03/24/2025, 1:59 PM