Hi Team, Is there a way to access the <CLI argume...
# questions
v
Hi Team, Is there a way to access the CLI arguments passed to
kedro run
command when creating a pipeline, i.e. within the function
def create_pipeline(**kwargs)
? I'm most interested in
conf-source
and
env
.
h
Someone will reply to you shortly. In the meantime, this might help:
d
so hooks are the way to do this
you can definitely do stuff with
before_pipeline_run
/
after_context_created
to retrieve the
env
. I'm not sure we expose the
--conf-source
though, that's something that is consumed by
settings.py
before kedro really starts up
v
so if I use hooks, can I pass the
env
somehow further into the pipeline?
need both
--env
and
--conf-source
to access my parameters via OmegaConfigLoader like this:
Copy code
def create_pipeline(**kwargs) -> Pipeline:
    conf_path = str(project_path / settings.CONF_SOURCE)
    conf_loader = OmegaConfigLoader(conf_source=conf_path, env=env)
The problem is that
settings.CONF_SOURCE
doesn't get updated with
--conf-source
, so I don't know where my config lies
d
as in you don't think Kedro is working or you're doing it a different way?
v
I don't know how to make the snippet above work with
kedro run --env dev --conf-source my_conf_path
d
are you getting an error?
what logs do you see when you run the command
v
I get something like this My code:
Copy code
# settings.py
CONF_SOURCE = "CONF_SOURCE_DEFINED_IN_SETTINGS"
Copy code
# piepeline.py
def create_pipeline(**kwargs) -> Pipeline:
    conf_path = str(settings.CONF_SOURCE)
    print("Config path", conf_path)
    conf_loader = OmegaConfigLoader(conf_source=conf_path, env='local')
    print("Config parameters", conf_loader['parameters'])
    return Pipeline([])
Traceback:
Copy code
$ kedro run --env dev --conf-source conf
[03/24/25 14:49:30] INFO     Using 'C:\ProgramData\Miniconda3\envs\rwk-dev-py11-env\Lib\site-packages\kedro\framework\project\rich_logging.yml' as logging configuration.                               __init__.py:270

Config path CONF_SOURCE_DEFINED_IN_SETTINGS
╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
│ C:\ProgramData\Miniconda3\envs\rwk-dev-py11-env\Scripts\kedro-script.py:9 in <module>            │
│                                                                                                  │
│ C:\ProgramData\Miniconda3\envs\rwk-dev-py11-env\Lib\site-packages\kedro\framework\cli\cli.py:263 │
│ in main                                                                                          │
│                                                                                                  │
│ C:\ProgramData\Miniconda3\envs\rwk-dev-py11-env\Lib\site-packages\click\core.py:1161 in __call__ │
│                                                                                                  │
│ C:\ProgramData\Miniconda3\envs\rwk-dev-py11-env\Lib\site-packages\kedro\framework\cli\cli.py:163 │
│ in main                                                                                          │
│                                                                                                  │
│ C:\ProgramData\Miniconda3\envs\rwk-dev-py11-env\Lib\site-packages\click\core.py:1082 in main     │
│                                                                                                  │
│ C:\ProgramData\Miniconda3\envs\rwk-dev-py11-env\Lib\site-packages\click\core.py:1697 in invoke   │
│                                                                                                  │
│ C:\ProgramData\Miniconda3\envs\rwk-dev-py11-env\Lib\site-packages\click\core.py:1443 in invoke   │
│                                                                                                  │
│ C:\ProgramData\Miniconda3\envs\rwk-dev-py11-env\Lib\site-packages\click\core.py:788 in invoke    │
│                                                                                                  │
│ C:\ProgramData\Miniconda3\envs\rwk-dev-py11-env\Lib\site-packages\kedro\framework\cli\project.py │
│ :228 in run                                                                                      │
│                                                                                                  │
│ C:\ProgramData\Miniconda3\envs\rwk-dev-py11-env\Lib\site-packages\kedro\framework\session\sessio │
│ n.py:346 in run                                                                                  │
│                                                                                                  │
│ C:\ProgramData\Miniconda3\envs\rwk-dev-py11-env\Lib\site-packages\kedro\framework\project\__init │
│ __.py:166 in inner                                                                               │
│                                                                                                  │
│ C:\ProgramData\Miniconda3\envs\rwk-dev-py11-env\Lib\site-packages\kedro\framework\project\__init │
│ __.py:211 in _load_data                                                                          │
│                                                                                                  │
│ C:\Users\vila3\Projects\Restwert-Kompetenz\src\rwk_analytics\pipeline_registry.py:13 in          │
│ register_pipelines                                                                               │
│                                                                                                  │
│   10 │   Returns:                                                                                │
│   11 │   │   A mapping from pipeline names to ``Pipeline`` objects.                              │
│   12 │   """                                                                                     │
│ ❱ 13 │   pipelines = find_pipelines()                                                            │
│   14 │   pipelines["__default__"] = sum(pipelines.values())                                      │
│   15 │   return pipelines                                                                        │
│   16                                                                                             │
│                                                                                                  │
│ C:\ProgramData\Miniconda3\envs\rwk-dev-py11-env\Lib\site-packages\kedro\framework\project\__init │
│ __.py:457 in find_pipelines                                                                      │
│                                                                                                  │
│ C:\ProgramData\Miniconda3\envs\rwk-dev-py11-env\Lib\site-packages\kedro\framework\project\__init │
│ __.py:354 in _create_pipeline                                                                    │
│                                                                                                  │
│ C:\Users\vila3\Projects\Restwert-Kompetenz\src\rwk_analytics\pipelines\preprocess\pipeline.py:20 │
│ in create_pipeline                                                                               │
│                                                                                                  │
│   17 │   conf_path = str(settings.CONF_SOURCE)                                                   │
│   18 │   print("Config path", conf_path)                                                         │
│   19 │   conf_loader = OmegaConfigLoader(conf_source=conf_path, env='local')                     │
│ ❱ 20 │   print("Config parameters", conf_loader['parameters'])                                   │
│   21 │   return Pipeline([])                                                                     │
│   22 │                                                                                           │
│   23 │   logger = Logger()                                                                       │
│                                                                                                  │
│ C:\ProgramData\Miniconda3\envs\rwk-dev-py11-env\Lib\site-packages\kedro\config\omegaconf_config. │
│ py:212 in __getitem__                                                                            │
│                                                                                                  │
│ C:\ProgramData\Miniconda3\envs\rwk-dev-py11-env\Lib\site-packages\kedro\config\omegaconf_config. │
│ py:302 in load_and_merge_dir_config                                                              │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
MissingConfigException: Given configuration path either does not exist or is not a valid directory: CONF_SOURCE_DEFINED_IN_SETTINGS
d
oh you're trying to access config directly in pipeline.py?
that's very much not encouraged
what are you trying to achieve, we can work back from there
v
Say I have a list of data coming from different sources that I want to access. I want to specify those in my
parameters.yaml
, like:
Copy code
# parameters.yaml
data1:
    source: source1
    query_file: data1.sql
data2:
    source: source2
    query_file: data2.sql
Instead of making a long list of nodes, I just do a for loop iterating over these entries, like
Copy code
def create_pipeline(**kwargs) -> Pipeline:
    conf_loader = OmegaConfigLoader(conf_source=conf_path, env='local')
    entities = conf_loader['parameters']

    node_list = []
    for entity in entities:
        node_list.append(
            node(
                func=extract,
                inputs=[
                    f"params:{entity}.query_file",
                    f"params:{entity}.source",
                ],
                outputs=entity,
            )
        )
    return pipeline(node_list)
d
Okay dynamic pipelines
So this pattern fights a lot of kedros design ideas because which is why you’re finding it hard This guide is the closest thing we have to an endorsed approach https://getindata.com/blog/kedro-dynamic-pipelines/