inco
08/05/2024, 2:38 PMnode(
func=nd.check_intermediate,
inputs="adatasets#{params:experiment}",
outputs="intermediate_exists",
name="check_intermediate",
),
inco
08/05/2024, 3:02 PMmarrrcin
08/06/2024, 9:31 AMinco
08/06/2024, 10:17 AMmarrrcin
08/06/2024, 10:38 AMinco
08/06/2024, 12:11 PMfrom kedro.config import OmegaConfigLoader
from kedro.pipeline import Pipeline, pipeline
def data_exists(project_name: str = "fresh", search_var: str = "experiment") -> bool:
conf_path = str(Path("../../conf/base").absolute())
conf_loader = OmegaConfigLoader(conf_source=conf_path)
experiment = conf_loader["parameters"][search_var]
for ds_name, ds in conf_loader["catalog"].items():
if search_var in ds_name and project_name in ds_name:
filepath = Path(ds["filepath"].replace("{experiment}", experiment))
return filepath.exists()
def create_pipeline(**kwargs: dict) -> Pipeline:
data_pipe = pipeline([...])
feature_pipe = pipeline([...])
if data_exists():
return feature_pipe
return data_pipe + feature_pipe
marrrcin
08/06/2024, 12:56 PMinco
08/06/2024, 2:47 PMinco
08/06/2024, 2:59 PMinco
08/06/2024, 7:15 PM