has anyone got the case where using ThreadPoolExec...
# questions
g
has anyone got the case where using ThreadPoolExecutor in a node (running queries in parallel for example) causes the kedro process to hang and not terminate after execution ?
h
Someone will reply to you shortly. In the meantime, this might help:
d
So Kedro has several node level runners • parallel runner uses multiprocessing • Thread runner uses threads (designed for external execution engines like spark / ibis) If you do parallelism within a node itself you have to use the sequential runner as they will conflict with the two above
g
I am using
Copy code
with ThreadPoolExecutor(max_workers=10) as executor:  
        future_to_query = {executor.submit(execute_single_query, q_name, q_value): q_name for q_name, q_value in queries.items()}
        for future in as_completed(future_to_query):
            query_name, data = future.result()
            if data is not None:
                results[query_name] = data
    executor.shutdown(wait=False)  # Ensure all threads are cleaned up
    del executor
and as you can see the kedro process doesnt complete. when I don't use threadpoolexecutor, this doesnt happen.
d
What are you executing your queries against?
g
a denodo db using jdbc
d
Got it
In general this is already outside of the pattern that Kedro encourages with i/o living within the catalog
I need to think what’s the best option here
Are you just looking to pull data out of denodo and dump it somewhere for processing?
g
I got the same problem with sequential
pretty much yes
d
I’m tempted to say you’re getting none of the benefits of Kedro doing this step here
Once the data is out you’re in a great place to transform it with Kedro
But you’re going to be banging your head against the wall trying to do this within a node itself think
g
it works perfectly fine without multithreading, just gotta find out why kedro hangs
the pipeline aggregates these results as a catalog entry
d
Which version of Kedro are you in?
There was a regression recently that may have introduced this
g
kedro, version 0.19.10
d
Can you try 0.19.12 it should be safe to upgrade with no changes
g
sure
will probably be available tomorrow in my org's internal repo. from the release notes this seems promising. thx for the tip Changed the execution of
SequentialRunner
to not use an executor pool to ensure it's single threaded.
okay, same issue on 0.19.12
d
hmm
@Merel who's the wizard this week?
m
@Rashida Kanchwala 🙂
🙏 1
r
@Gauthier Pierard, do you mind raising this is as a github issue and we will look into it.
g
sure. thanks
doing the same with joblib solves the issue:
Copy code
from joblib import Parallel, delayed
# ...
results = Parallel(n_jobs=nb_executors)(
        delayed(execute_single_query)(q_name, q_value, input_url, denodo_user, denodo_password, driverpath, namespace)
        for q_name, q_value in queries.items()
    )
❤️ 2
d
Simpler code too