Shohatl
09/15/2025, 5:07 AMElena Khaustova
09/15/2025, 2:22 PMElena Khaustova
09/15/2025, 2:24 PMcreate_pipeline()
• Directly passing variables into create_pipeline() is not supported, because Kedro expects the pipeline definition to be static and reproducible. Pipelines are built once (when the session loads the project), not dynamically per run.
• If your input is only available at runtime, you have two main options:
    ◦ a. Use Hooks - You can use the before_pipeline_run or before_node_run hooks to inject runtime context, such as extra inputs.
    ◦ b. Generate the pipeline programmatically before session.run() - Your create_pipeline() function can be written to accept arguments, but Kedro itself won’t call it with arguments. Instead, you can build the pipeline dynamically in your custom script before creating the KedroSession.
    ◦ For example:
from kedro.pipeline import Pipeline, node
from kedro.framework.project import pipelines
def build_dynamic_pipeline(table_list):
    return Pipeline(
        [node(load_table, inputs=table, outputs=f"{table}_data") for table in table_list]
    )
with KedroSession.create("my_project") as session:
    pipeline = build_dynamic_pipeline(runtime_tables)
    # Add your pipeline to the pipelines
    session.run(pipeline_name=pipeline_name)Elena Khaustova
09/15/2025, 2:28 PMParallelRunner in a single session rather than spinning up multiple sessions in threads.
from kedro.runner import ParallelRunner
session.run(pipeline="etl_pipeline", runner=ParallelRunner())
• Or, if you must isolate them (e.g., separate datasets/catalogs), then spinning up multiple sessions is fine - but will need to use multiprocessing  (concurrent.futures.ProcessPoolExecutor) or external orchestration.Shohatl
09/15/2025, 2:55 PMElena Khaustova
09/15/2025, 3:04 PMElena Khaustova
09/15/2025, 3:04 PMElena Khaustova
09/15/2025, 3:05 PMElena Khaustova
09/15/2025, 3:10 PMShohatl
09/15/2025, 3:14 PMElena Khaustova
09/15/2025, 3:23 PMParallelRunner. An example of how it’s done on Kedro side, when you’re using it as a framework.
def register_pipelines() -> dict[str, Pipeline]:
    """Register the project's pipelines.
    Returns:
        A mapping from pipeline names to ``Pipeline`` objects.
    """
    pipelines = find_pipelines()
    pipelines["__default__"] = sum(pipelines.values())
    return pipelines
2. Or, if you must isolate them - via multiple sessions (manually)Shohatl
09/15/2025, 3:24 PMElena Khaustova
09/15/2025, 3:25 PMtarget_pipe = pipe_1 + pipe_2 ... + pipe_nElena Khaustova
09/15/2025, 3:26 PMShohatl
09/16/2025, 11:03 AMElena Khaustova
09/16/2025, 11:11 AMimport time
import random
import pandas as pd
import yaml
from <http://kedro.io|kedro.io> import DataCatalog
from kedro.pipeline import node
from kedro.pipeline import pipeline
from kedro.runner import ParallelRunner, SequentialRunner, ThreadRunner
# Simulate an I/O-bound task
def io_bound_task():
    time.sleep(3)  # Simulate an I/O wait (e.g., reading from a file)
    input_data = f"input_data_{str(random.randint(0, 100))}"
    data = {
        "label": ["A", "B", "C"],
        "country": ["GB", "ES", "FR"],
        input_data: [1, 2, 3]
    }
    df = pd.DataFrame(data)
    return df
# Simulate a compute-bound task (matrix multiplication)
def compute_bound_task(input_data) -> str:
    # Simulate heavy compute that are not using multicore (not pandas/numpy etc)
    ans = 1
    for i in range(1, 50000):
        ans = ans * i
    return f"dummy_{str(random.randint(0, 100))}"
def create_data_catalog(shared_memory=False):
    """
    Use dataset factory pattern to make sure the benchmark cover the slowest path.
    """
    catalog_conf = """
output_1:
    type: pandas.CSVDataset
    filepath: data/test/output_1.csv'
output_2:
    type: pandas.CSVDataset
    filepath: data/test/output_2.csv'
# output_5:
#     type: kedro.io.MemoryDataset
"""
    catalog_conf = yaml.safe_load(catalog_conf)
    if shared_memory:
        from kedro.io.data_catalog import SharedMemoryDataCatalog
        catalog = SharedMemoryDataCatalog.from_config(catalog_conf)
    else:
        catalog = DataCatalog.from_config(catalog_conf)
    return catalog
def create_pipeline():
    dummy_pipeline = pipeline(
        [
            node(io_bound_task, inputs=None, outputs="output_1", name="node_1"),
            node(io_bound_task, inputs=None, outputs="output_2", name="node_2"),
            node(compute_bound_task, inputs="output_1", outputs="output_3", name="node_3"),
            node(compute_bound_task, inputs="output_2", outputs="output_4", name="node_4"),
            node(io_bound_task, inputs=None, outputs="output_5", name="node_5")
        ]
    )
    return dummy_pipeline
def main():
    catalog = create_data_catalog(shared_memory=True)
    test_pipeline = create_pipeline()
    runner_obj = ParallelRunner()
    res = runner_obj.run(test_pipeline, catalog=catalog)Elena Khaustova
09/16/2025, 11:12 AMSequentialRunner or ThreadRunner you need DataCatalog
• For ParallelRunner - SharedMemoryDataCatalog