Thomas d'Hooghe
02/21/2024, 9:09 AMThomas d'Hooghe
02/21/2024, 9:10 AM@retry(
wait=wait_exponential_jitter(initial=5, max=180),
stop=stop_after_attempt(4),
)
def _inner_loop(row: pd.Series, context: Dict[str, str]) -> str:
# Add additional meta variable based on input col
if input_col:
context["_input_val"] = row[input_col]
if conditional_col:
return chain.predict(**{**row.to_dict(), **context}) if row[conditional_col] else None
return chain.predict(**{**row.to_dict(), **context})
# FUTURE: Can we use asyncio?
outputs = Parallel(n_jobs=n_threads, backend="threading")(
delayed(_inner_loop)(row, context) for _, row in input_df.iterrows()
)
With ThreadPoolExecutor:
def _inner_loop(row: pd.Series, context: Dict[str, str], chain, conditional_col, input_col) -> str:
# Add additional meta variable based on input col
if input_col:
context["_input_val"] = row[1][input_col]
if conditional_col:
return chain.predict(**{**row[1].to_dict(), **context}) if row[1][conditional_col] else None
return chain.predict(**{**row[1].to_dict(), **context})
with concurrent.futures.ThreadPoolExecutor(max_workers=n_threads) as executor:
outputs = list(
executor.map(
lambda row: _inner_loop(row, context, chain, conditional_col, input_col),
input_df.iterrows(),
)
)
With ProcessPoolExecutor:
def process_row(row, context, chain, conditional_col, input_col):
return _inner_loop(row, context, chain, conditional_col, input_col)
def _inner_loop(row: pd.Series, context: Dict[str, str], chain, conditional_col, input_col) -> str:
# Add additional meta variable based on input col
if input_col:
context["_input_val"] = row[input_col]
if conditional_col:
return chain.predict(**{**row.to_dict(), **context}) if row[conditional_col] else None
return chain.predict(**{**row.to_dict(), **context})
def generate_in_parallel(
chain: LLMChain,
input_df: pd.DataFrame,
List of generated strings.
"""
with concurrent.futures.ProcessPoolExecutor(max_workers=8) as executor:
# Submit tasks and gather futures
futures = []
for _, row in input_df.iterrows():
future = executor.submit(process_row, row, context, chain, conditional_col, input_col)
futures.append(future)
# Collect results in order
outputs = [future.result() for future in futures]
Juan Luis
02/21/2024, 9:11 AMJuan Luis
02/21/2024, 9:12 AM