Tom McHale
04/09/2024, 3:25 PMsession = KedroSession("example_project")
context = session.load_context()
# explicitly define the project path
project_path = context.project_path
conf_path = str(project_path / settings.CONF_SOURCE)
conf_loader = OmegaConfigLoader(
conf_source=conf_path,
base_env="s3_develop",
default_run_env="s3_develop",
config_patterns={
"spark": ["spark*", "spark*/**"],
}
)
# Load spark configuration settings from spark.yml
parameters = conf_loader.get("spark")
spark_conf = SparkConf().setAll(parameters.items())
And in one of the pipelines I use:
session = KedroSession("example_project")
context = session.load_context()
logger.info(f"Context is: {context}")
project_path = context.project_path
logger.info(f"Project path is {project_path}")
conf_path = f"{project_path}/{settings.CONF_SOURCE}"
conf_loader = OmegaConfigLoader(
conf_source=conf_path,
base_env="s3_develop",
default_run_env="s3_develop"
)
logger.info(f"Config Loader {conf_loader}")
params = conf_loader.get("parameters")
if params['forecasting_params']['retrain_model']:
return pipeline
I want to avoid hardcoding the base_env and default_run_env so that if I used the following run commands:
kedro run --env s3_test -p example_pipeline
Or
kedro run --env s3_dev -p example_pipeline
The config loader would load from the s3_test or s3_dev folders.
Is there a simpler way to do what I am currently doing, or is there some clever logic where I access the env you set in the cli command in src.settings.py and create the config loader in there. Thanks and hope this all makes sense.marrrcin
04/09/2024, 3:29 PMTom McHale
04/09/2024, 3:32 PMTom McHale
04/09/2024, 3:33 PMNok Lam Chan
04/09/2024, 3:36 PMNok Lam Chan
04/09/2024, 3:36 PMcontext = session.load_context()
config_loader = context.config_loader
marrrcin
04/09/2024, 3:36 PMNok Lam Chan
04/09/2024, 3:37 PMNok Lam Chan
04/09/2024, 3:38 PMconf_loader["parameters"]
instead of conf_loader.get(parameters)
btwNok Lam Chan
04/09/2024, 3:41 PMpipeline.yml
, try to separate it out from the "regular" parameters since they are very different thing.
2. Do you really need config loader to create pipeline? Are these parameters environment specific or you can just handle this with a simple yaml.load
?Tom McHale
04/09/2024, 3:44 PMTom McHale
04/09/2024, 4:02 PMcontext = session.load_context()
config_loader = context.config_loader
Nok Lam Chan
04/09/2024, 4:06 PMparameters
? the parameter is only available after pipeline creation, did you create a separate session just to get access of the config (in other words, there are 2 session created for 1 run?)Tom McHale
04/09/2024, 4:16 PMNok Lam Chan
04/09/2024, 4:20 PMTom McHale
04/09/2024, 4:34 PMNok Lam Chan
04/09/2024, 4:35 PMNok Lam Chan
04/09/2024, 4:37 PMclass Hook
def after_context_created(self):
self.nok = "nok|"
def before_pipeline_run(self):
print(self.nok)
Nok Lam Chan
04/09/2024, 4:39 PMTom McHale
04/09/2024, 4:56 PMNok Lam Chan
04/09/2024, 5:23 PMNelson Zambrano
05/10/2024, 10:09 PMpipeline.py
What I'm trying to do is access the context for some parameters that are environment specific, as they are created in a hook after the context is created.
I tried using:
from kedro.framework.session import KedroSession
from kedro.framework.startup import bootstrap_project
bootstrap_project(Path("."))
with KedroSession.create() as session:
context = session.load_context()
But since these are runtime parameters (KedroSession.create
-> extra_params
)
this approach won't do...
Could you guide me on how to load the current session context in code? I'm really curious to know if there's a way.
I tried using a namespace in a modular pipeline...
Example catalog:
"{namespace}.test_txt":
type: "${_datasets.text}"
filepath: "data/01_raw/{namespace}_test.txt"
Example pipeline
Where the run_date
(passed at runtime) is replaced by the delta run_date
(t-1, t-2,...). which evaluates to t-1.run_date
and so on...
template = pipeline([
node(
write_txt,
inputs = "params:run_date",
outputs = "test_txt",
name = "write_txt",
)
])
#t-1 version
t1_version = pipeline(
pipe = template,
namespace = "t-1")
#t-2 version
t2_version = pipeline(
pipe = template,
namespace = "t-2")
def create_pipeline(**kwargs) -> Pipeline:
return sum(t_version)
My problem is I need the value of the parameter in the catalog for the filepath
of the datasets, so I figured using the value of the parameter as a namespace would solve my problem.Nok Lam Chan
05/11/2024, 1:26 AMNelson Zambrano
05/14/2024, 4:18 PM