Hi all, I have a Kedro project where, in the hook...
# questions
t
Hi all, I have a Kedro project where, in the hooks and in the pipelines currently requires the config loader to load parameter and spark files. In the hooks I currently use:
Copy code
session = KedroSession("example_project")
context = session.load_context()
# explicitly define the project path 

project_path = context.project_path
conf_path = str(project_path / settings.CONF_SOURCE)

conf_loader = OmegaConfigLoader(
  conf_source=conf_path,
  base_env="s3_develop",
  default_run_env="s3_develop",
  config_patterns={
  "spark": ["spark*", "spark*/**"],
}
)

# Load spark configuration settings from spark.yml
parameters = conf_loader.get("spark")
spark_conf = SparkConf().setAll(parameters.items())
And in one of the pipelines I use:
Copy code
session = KedroSession("example_project")
context = session.load_context()
logger.info(f"Context is: {context}")
project_path = context.project_path
logger.info(f"Project path is {project_path}")

conf_path = f"{project_path}/{settings.CONF_SOURCE}"
conf_loader = OmegaConfigLoader(
  conf_source=conf_path,
  base_env="s3_develop",
  default_run_env="s3_develop"
)
logger.info(f"Config Loader {conf_loader}")
params = conf_loader.get("parameters")

if params['forecasting_params']['retrain_model']:

  return pipeline
I want to avoid hardcoding the base_env and default_run_env so that if I used the following run commands:
Copy code
kedro run --env s3_test -p example_pipeline
Or
Copy code
kedro run --env s3_dev -p example_pipeline
The config loader would load from the s3_test or s3_dev folders. Is there a simpler way to do what I am currently doing, or is there some clever logic where I access the env you set in the cli command in src.settings.py and create the config loader in there. Thanks and hope this all makes sense.
m
How do you run your pipelines? Where is the code you've pasted coming from? Is it your script in your project?
t
I use PyCharm and run the pipelines from the configurations in there. Those commands are kedro run.. (have updated). The code is from my kedro project. The first block is from the hooks.py file in one of my hooks. The second is from a pipeline.py as a conditional way of calling specific pipelines
My main question really is if there is a way to get the env you specify in the kedro run command at any point in the project
n
I think the simple solution here is don't create your own OmegaConfigLoader
Copy code
context = session.load_context()
config_loader = context.config_loader
👍 1
m
Yeah, but it seems to me like @Tom McHale is trying to use config during pipeline creation 🙈
😢 1
n
But why do you need spark parameters to create pipeline?
conf_loader["parameters"]
instead of
conf_loader.get(parameters)
btw
If you are somehow creating pipeline dynamically, few things to consider: 1. Move your pipeline parameters out to a separate file like
pipeline.yml
, try to separate it out from the "regular" parameters since they are very different thing. 2. Do you really need config loader to create pipeline? Are these parameters environment specific or you can just handle this with a simple
yaml.load
?
t
Thanks, it's not creating pipelines dynamically, more changing what pipelines are run based on flags in the parameters
Thanks guys, I agree I was making life difficult with the OmegaConfigLoader and the code below has done the trick
Copy code
context = session.load_context()
config_loader = context.config_loader
👍🏼 1
n
How are you doing this pipeline filtering based on
parameters
? the parameter is only available after pipeline creation, did you create a separate session just to get access of the config (in other words, there are 2 session created for 1 run?)
t
Yes I did do that haha, you're making me think it may not be the best practice thing to do...
n
IIRC @marrrcin way of doing that doesn't required multiple sessions, I am not sure. It's not end of the world creating the session twice, it did adds a little of overhead, and in the case that you may create Spark connection with hook, you are likely making 2 connections with Spark for 1 run. see: https://getindata.com/blog/kedro-dynamic-pipelines/
👍 1
t
yeah makes sense. Will read through the link and see if I can finetune how I'm doing it. The reason I create a session in my spark hooks is that I only wanted to create the spark session when pipelines with specific nodes are run, so used the @hook_iml before_pipeline_run instead of after_context_created
n
ya that makes a lot of sense
Oh actually - if you are only doing this because the hook arguments doesn't have it. You can make use of the fact that Hook are stateful (mind their execution order)
Copy code
class Hook

def after_context_created(self):
  self.nok = "nok|"

def before_pipeline_run(self):
    print(self.nok)
👍 2
Well I really hope someone just put this in the docs. https://github.com/kedro-org/kedro/issues/2690 It's prioritised as Low in our backlog, maybe easier if you or someone from the community want to make a PR for this.
t
wahey hadn't thought of this. Have passed the context from one to the other using self.context. Looks like it's working nicely, cheers!
K 1
n
Awesome!
n
Hi, I have a similar use case. I'm trying to access the context of the session at pipeline creation in a
pipeline.py
What I'm trying to do is access the context for some parameters that are environment specific, as they are created in a hook after the context is created. I tried using:
Copy code
from kedro.framework.session import KedroSession
from kedro.framework.startup import bootstrap_project
bootstrap_project(Path("."))

with KedroSession.create() as session:
    context = session.load_context()
But since these are runtime parameters (
KedroSession.create
->
extra_params
) this approach won't do... Could you guide me on how to load the current session context in code? I'm really curious to know if there's a way. I tried using a namespace in a modular pipeline... Example catalog:
Copy code
"{namespace}.test_txt":
  type: "${_datasets.text}"
  filepath: "data/01_raw/{namespace}_test.txt"
Example pipeline Where the
run_date
(passed at runtime) is replaced by the delta
run_date
(t-1, t-2,...). which evaluates to
t-1.run_date
and so on...
Copy code
template = pipeline([
    node(
        write_txt,
        inputs = "params:run_date",
        outputs = "test_txt",
        name = "write_txt",
    )
])

#t-1 version
t1_version = pipeline(
    pipe = template,
    namespace = "t-1")
#t-2 version
t2_version = pipeline(
    pipe = template,
    namespace = "t-2")  
  
def create_pipeline(**kwargs) -> Pipeline:
    return sum(t_version)
My problem is I need the value of the parameter in the catalog for the
filepath
of the datasets, so I figured using the value of the parameter as a namespace would solve my problem.
n
Sorry I am not sure what's the question is and how runtime param and namespace came into the picture, can you maybe write an example even if that code doesn't exist at all?
n
Hey @Nok Lam Chan here's a more detailed description of what we were trying to do and how we resolved it. Any feedback would be appreaciated. https://github.com/kedro-org/kedro/issues/2627#issuecomment-2110628229