FlorianGD
11/28/2023, 10:11 AM#!/usr/bin/env bash
# Run the pipelines in parallel.
for t in "$@"
do
kedro run --pipeline my_pipeline --params my_param:"$t" &
pids+=( "$!" )
done
# Wait for completion of the threads before exiting
exit_status=0
for pid in "${pids[@]}"; do
wait "$pid"; (( exit_status |= $? ))
done
exit "$exit_status"
I wanted more control (and I do not like writing bash at all), so I wanted to port this code to python. But when doing so, I encounter problems. I think the core of the problem is that the catalog is parametrized with OmegoConf
's runtime_params
and they somehow get mixed up in the threads.
Here is the python script
import argparse
import concurrent.futures
from logging import getLogger
from pathlib import Path
from kedro.framework.session import KedroSession
from kedro.framework.startup import bootstrap_project
bootstrap_project(Path.cwd())
logger = getLogger("uc16_pipelines")
def run_pipeline(t: int) -> None:
try:
with KedroSession.create(extra_params={"my_param": t}) as session:
session.run("my_pipeline")
return 0
except Exception as e:
logger.exception(e)
return 1
def batch(ts: list[int]) -> dict[int, int]:
with concurrent.futures.ThreadPoolExecutor( max_workers=10, thread_name_prefix="thread_id"
) as executor:
results = executor.map(run_pipeline, ts)
return dict(zip(ts, results))
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("ts", type=int, nargs="+")
args = parser.parse_args()
statuses = batch(args.ts)
failures = {t: status for t, status in statuses.items() if status > 0}
successes = {t: status for t, status in statuses.items() if status == 0}
if successes:
logger.info(
f"The ts {list(successes.keys())!r} were successfully updated"
)
if failures:
logger.error(f"The ts {list(failures.keys())!r}'s update failed")
raise SystemExit(1)
raise SystemExit(0)
FlorianGD
11/28/2023, 10:19 AMHTTPError: 404 Client Error: Not Found for url: http://<url>/app/api/v1/t/6/
The above exception was the direct cause of the following exception:
╭────────────────────────────────────────── Traceback (most recent call last) ──────────────────────────────────────────╮
│ /home/coder/uc16-pipelines/run_pipelines_parallel.py:18 in run_pipeline │
│ │
│ 16 │ try: │
│ 17 │ │ with KedroSession.create(extra_params={"my_param": t}) as session: │
│ ❱ 18 │ │ │ session.run("my_pipeline") │
│ 19 │ │ return 0 │
│ 20 │ except Exception as e: │
│ 21 │ │ logger.exception(e) │
│ │
│ ╭───────────────────────────────────────────────────── locals ──────────────────────────────────────────────────────╮ │
│ │ e = DatasetError('Failed to fetch data', HTTPError('404 Client Error: Not Found for url: │ │
│ │ <url>/app/api/v1/t/6/')) │ │
│ │ session = <kedro.framework.session.session.KedroSession object at 0x7ff757743e80> │ │
│ │ my_param = 123 │ │
│ ╰───────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ │
Notice the discrepancy between the value in my_param
from the locals and the url that is printed.
Here is the corresponding entry in the catalog
data_from_db:
type: api.APIDataset
url: <url>/app/api/v1/t/${runtime_params:my_param,0}/
method: GET
FlorianGD
11/28/2023, 10:20 AMmax_workers
to 1, then it works as expected, but I do not run anything in parallel anymoreFlorianGD
11/28/2023, 10:26 AMmarrrcin
11/28/2023, 12:27 PMdatajoely
11/28/2023, 12:35 PMFlorianGD
11/28/2023, 12:56 PMThreadPoolExecutor
to ProcessPoolExecutor
and it seems to work. My mental model for the difference between those is not good, so I am not sure if this is exactly what I want...