https://kedro.org/ logo
#questions
Title
# questions
t

Thomas d'Hooghe

02/21/2024, 9:09 AM
Hey all, I are building a message generation (based on GenAI of course) pipeline in Kedro to generate personalized messaged for my clients customers. I am doing tons of LLM calls here since the model I am using is not that performant and therefore split up the tasks in very small pieces. To execute these small pieces, I am using parallelization (currently Parallel joblib with threading backend), which worked well. Problem: • After running the same pipeline on Python 3.11.7 (instead of 3.10.13), the pipeline gets stuck, and also unable to kill • This is caused by thread(s) that get stuck • I am really bound to 3.11.7 at the client side • 3.11.4 also seems to get stuck Tried: • Using ThreadPoolExecutor instead of Parallel --> also stuck • Using ProcessPoolExecutor --> works on 3.11.7 but 60s per iteration instead of 20s per iteration for multithreading This makes me think that there are some problems in Python 3.11.7 (and 3.11.4) together with Kedro 19.2 (version I am running now). Unfortunately, I am really bound to the Python version on the client side, so I am hoping to fix the issues that seem to be around multithreading. Does anyone have suggestion on how to proceed (i.e. fix multithreading, make multiprocessing faster, etc.). Code snippets of different implementations in thread 🙂
With Joblib:
Copy code
@retry(
        wait=wait_exponential_jitter(initial=5, max=180),
        stop=stop_after_attempt(4),
    )
    def _inner_loop(row: pd.Series, context: Dict[str, str]) -> str:
        # Add additional meta variable based on input col
        if input_col:
            context["_input_val"] = row[input_col]

        if conditional_col:
            return chain.predict(**{**row.to_dict(), **context}) if row[conditional_col] else None

        return chain.predict(**{**row.to_dict(), **context})

    # FUTURE: Can we use asyncio?
    outputs = Parallel(n_jobs=n_threads, backend="threading")(
        delayed(_inner_loop)(row, context) for _, row in input_df.iterrows()
    )
With ThreadPoolExecutor:
Copy code
def _inner_loop(row: pd.Series, context: Dict[str, str], chain, conditional_col, input_col) -> str:
        # Add additional meta variable based on input col
        if input_col:
            context["_input_val"] = row[1][input_col]

        if conditional_col:
            return chain.predict(**{**row[1].to_dict(), **context}) if row[1][conditional_col] else None

        return chain.predict(**{**row[1].to_dict(), **context})

    with concurrent.futures.ThreadPoolExecutor(max_workers=n_threads) as executor:
        outputs = list(
            executor.map(
                lambda row: _inner_loop(row, context, chain, conditional_col, input_col),
                input_df.iterrows(),
            )
        )
With ProcessPoolExecutor:
Copy code
def process_row(row, context, chain, conditional_col, input_col):
    return _inner_loop(row, context, chain, conditional_col, input_col)


def _inner_loop(row: pd.Series, context: Dict[str, str], chain, conditional_col, input_col) -> str:
    # Add additional meta variable based on input col
    if input_col:
        context["_input_val"] = row[input_col]

    if conditional_col:
        return chain.predict(**{**row.to_dict(), **context}) if row[conditional_col] else None

    return chain.predict(**{**row.to_dict(), **context})

def generate_in_parallel(
    chain: LLMChain,
    input_df: pd.DataFrame,
    List of generated strings.
    """

    with concurrent.futures.ProcessPoolExecutor(max_workers=8) as executor:
        # Submit tasks and gather futures
        futures = []
        for _, row in input_df.iterrows():
            future = executor.submit(process_row, row, context, chain, conditional_col, input_col)
            futures.append(future)

    # Collect results in order
    outputs = [future.result() for future in futures]
j

Juan Luis

02/21/2024, 9:11 AM
hello Thomas, thanks for the detailed writeup. we'll have a look at this soon
K 2
if you're making lots of LLM calls, sounds like in theory the ThreadPoolExecutor should be your best bet
👍 1