Ismail Ahmady
12/20/2023, 2:17 PMmarrrcin
12/20/2023, 2:25 PMIsmail Ahmady
12/20/2023, 2:34 PMfor namespace, variants in settings.DYNAMIC_PIPELINES_MAPPING.items()
loop?
• if so, that means I would have sort of a recurring in/output that gets updated at each loop? Not sure how that'd look likemarrrcin
12/20/2023, 2:38 PMdef aggregator_pipeline():
p = lambda version: pipeline(
[
node(
func=lambda: print(version) or 666,
inputs=None,
outputs="metrics",
name="calculate_metrics",
)
]
)
root_namespace = "model_training"
versions = ["v1", "v2", "v3"]
pipes = []
for version in versions:
pipes.append(pipeline(p(version), namespace=f"{root_namespace}.{version}"))
aggregation = pipeline(
[
node(
func=lambda *args: print(args) or sum(args),
inputs=[f"{root_namespace}.{version}.metrics" for version in versions],
outputs="aggregated_metrics",
name="aggregate_metrics",
)
]
)
return sum(pipes) + aggregation
Ismail Ahmady
12/20/2023, 5:12 PMJulian Nowak
02/09/2024, 10:18 AMtrain_model_node
Do I have to precise manually then each sub_pipeline, or is there an automated way to do this?
For example, in my case I get the message:
You can resume the pipeline run from the nearest nodes with persisted inputs by adding the following argument to your previous command:
--from-nodes
"sap_number.123.categorize_fails,sap_number.234.categorize_fails,sap_number.345.categorize_fails [...]