https://kedro.org/ logo
#questions
Title
# questions
f

FlorianGD

11/28/2023, 10:11 AM
Hello, I have a question regarding running kedro pipelines in threads. I have a (working) bash script that runs several pipelines in parallel, it looks like this
Copy code
#!/usr/bin/env bash

# Run the pipelines in parallel.
for t in "$@"
do
    kedro run --pipeline my_pipeline --params my_param:"$t" &
    pids+=( "$!" )
done

# Wait for completion of the threads before exiting
exit_status=0
for pid in "${pids[@]}"; do
  wait "$pid"; (( exit_status |= $? ))
done

exit "$exit_status"
I wanted more control (and I do not like writing bash at all), so I wanted to port this code to python. But when doing so, I encounter problems. I think the core of the problem is that the catalog is parametrized with
OmegoConf
's
runtime_params
and they somehow get mixed up in the threads. Here is the python script
Copy code
import argparse
import concurrent.futures
from logging import getLogger
from pathlib import Path
from kedro.framework.session import KedroSession
from kedro.framework.startup import bootstrap_project

bootstrap_project(Path.cwd())
logger = getLogger("uc16_pipelines")


def run_pipeline(t: int) -> None:
    try:
        with KedroSession.create(extra_params={"my_param": t}) as session:
            session.run("my_pipeline")
        return 0
    except Exception as e:
        logger.exception(e)
        return 1


def batch(ts: list[int]) -> dict[int, int]:
    with concurrent.futures.ThreadPoolExecutor( max_workers=10, thread_name_prefix="thread_id"
    ) as executor:
        results = executor.map(run_pipeline, ts)
    return dict(zip(ts, results))



if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("ts", type=int, nargs="+")
    args = parser.parse_args()

    statuses = batch(args.ts)
    failures = {t: status for t, status in statuses.items() if status > 0}
    successes = {t: status for t, status in statuses.items() if status == 0}
    if successes:
        logger.info(
            f"The ts {list(successes.keys())!r} were successfully updated"
        )
    if failures:
        logger.error(f"The ts {list(failures.keys())!r}'s update failed")
        raise SystemExit(1)
    raise SystemExit(0)
When I ran with the locals in the traceback, I can see that the catalog does not have the correct value:
Copy code
HTTPError: 404 Client Error: Not Found for url: http://<url>/app/api/v1/t/6/ 

    The above exception was the direct cause of the following exception:

    ╭────────────────────────────────────────── Traceback (most recent call last) ──────────────────────────────────────────╮
    │ /home/coder/uc16-pipelines/run_pipelines_parallel.py:18 in run_pipeline                                             │
    │                                                                                                                       │
    │   16 │   try:                                                                                                         │
    │   17 │   │   with KedroSession.create(extra_params={"my_param": t}) as session:                                       │
    │ ❱ 18 │   │   │   session.run("my_pipeline")                                                                │
    │   19 │   │   return 0                                                                                                 │
    │   20 │   except Exception as e:                                                                                       │
    │   21 │   │   logger.exception(e)                                                                                      │
    │                                                                                                                       │
    │ ╭───────────────────────────────────────────────────── locals ──────────────────────────────────────────────────────╮ │
    │ │          e = DatasetError('Failed to fetch data', HTTPError('404 Client Error: Not Found for url:                 │ │
    │ │              <url>/app/api/v1/t/6/'))                                                                             │ │
    │ │    session = <kedro.framework.session.session.KedroSession object at 0x7ff757743e80>                              │ │
    │ │ my_param   = 123                                                                                                  │ │
    │ ╰───────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ │
Notice the discrepancy between the value in
my_param
from the locals and the url that is printed. Here is the corresponding entry in the catalog
Copy code
data_from_db:
  type: api.APIDataset
  url: <url>/app/api/v1/t/${runtime_params:my_param,0}/
  method: GET
Another thing, if I set
max_workers
to 1, then it works as expected, but I do not run anything in parallel anymore
So my question, is it expected that the catalog would be "shared" somehow between threads?
m

marrrcin

11/28/2023, 12:27 PM
That’s an interesting question - @Merel / @datajoely maybe you can chip in here?
d

datajoely

11/28/2023, 12:35 PM
So without delving too deep into python concurrency - I don’t actually think your bash approach is too bad, you perhaps just need a more expressive orchestrator than the shell
f

FlorianGD

11/28/2023, 12:56 PM
I changed
ThreadPoolExecutor
to
ProcessPoolExecutor
and it seems to work. My mental model for the difference between those is not good, so I am not sure if this is exactly what I want...
2 Views