Hello everyone, Is there a way to pass a variable ...
# questions
s
Hello everyone, Is there a way to pass a variable to the create_pipeline() function? My use case is initializing several kedrosessions in a single run where each session will run an etl on each table. The erl can change according to the input. For example if I have several inputs for an etl, I want to dynamically initialize a node per input that loads the table. Is there a way to dynamically create a pipeline based on the input? The input is available only at runtime so I can't use the parameters.yaml Also I wanted to know what is the best practice to run multiple pipelines in parallel. I am currently creating a session per pipeline and running it in a separate thread
👀 1
e
Hi @Shohatl
1. Passing runtime variables to
create_pipeline()
• Directly passing variables into
create_pipeline()
is not supported, because Kedro expects the pipeline definition to be static and reproducible. Pipelines are built once (when the session loads the project), not dynamically per run. • If your input is only available at runtime, you have two main options: ◦ a. Use Hooks - You can use the
before_pipeline_run
or
before_node_run
hooks to inject runtime context, such as extra inputs. ◦ b. Generate the pipeline programmatically before session.run() - Your
create_pipeline()
function can be written to accept arguments, but Kedro itself won’t call it with arguments. Instead, you can build the pipeline dynamically in your custom script before creating the
KedroSession
. ◦ For example:
Copy code
from kedro.pipeline import Pipeline, node
from kedro.framework.project import pipelines

def build_dynamic_pipeline(table_list):
    return Pipeline(
        [node(load_table, inputs=table, outputs=f"{table}_data") for table in table_list]
    )

with KedroSession.create("my_project") as session:
    pipeline = build_dynamic_pipeline(runtime_tables)
    # Add your pipeline to the pipelines
    session.run(pipeline_name=pipeline_name)
2. Running multiple pipelines in parallel • Best practice: Use the
ParallelRunner
in a single session rather than spinning up multiple sessions in threads.
Copy code
from kedro.runner import ParallelRunner
session.run(pipeline="etl_pipeline", runner=ParallelRunner())
• Or, if you must isolate them (e.g., separate datasets/catalogs), then spinning up multiple sessions is fine - but will need to use multiprocessing (
concurrent.futures.ProcessPoolExecutor
) or external orchestration.
s
Hey @Elena Khaustova, thanks for the response, I tried the session.run(pipeline) but in kedro v1.0.0 they don't support passing the pipeline anymore, only a pipeline name. I found a hack where I can modify the pipeline dict that is being imported but I wanted to know if there is a legitimate way to support it. I also tried initializing the dynamic pipeline via a hook but I noticed that the hook doesn't have access to the pipeline dict because it is a const in one of the kedro py files. Also about the parallel runner, I read that it's used to run nodes in parallel but the pipelines run one after the other which isn't the behaviour I wanted, is there a way to define the parallel runner to also run pipelines in parallel?
e
Yeah, my bet - you’ll need to modify the pipeline dict as well
There’s no legitimate behaviour as Kedro does not support dynamic pipelines
So you kinda need a custom session for that
As for running the pipelines - pipeline is just an abstraction to combine your nodes. When running all the nodes from the target pipelines are used to build a DAG, the execution happens by nodes (sequential, in parallel or using threads) based on the selected runner.
s
How can I run multiple pipelines in parallel though? I saw that kedro itself is coupled to the idea of one pipeline at a time per session. If I have pipelines that don't rely on eachother so they can run in parallel and pipelines that need to run after other ones because they rely on the update they do to the data, how can I run them in parallel and in the correct order? What I had till now is a layer generator which grouped all the pipelines that have no dependencies and ran them together, and then after they finished it would run all the pipelines that depend on the first pipelines and so on...
e
Like mentioned above: 1. Create the pipeline, which combines those that you need to run and use
ParallelRunner
. An example of how it’s done on Kedro side, when you’re using it as a framework.
Copy code
def register_pipelines() -> dict[str, Pipeline]:
    """Register the project's pipelines.

    Returns:
        A mapping from pipeline names to ``Pipeline`` objects.
    """
    pipelines = find_pipelines()
    pipelines["__default__"] = sum(pipelines.values())
    return pipelines
2. Or, if you must isolate them - via multiple sessions (manually)
s
You mean like create the pipeline that contains the pipelines combined? How does it work if I have multiple starting nodes and final nodes?
👍 1
e
Kedro expects a pipeline to run a session but it can be
target_pipe = pipe_1 + pipe_2 ... + pipe_n
Kedro takes all the nodes and creates a DAG - which determines an execution order. At the time of the execution Kedro operates at the level of the nodes, not pipelines.
s
Hey @Elena Khaustova, thanks for all the help, I had one last question about the dynamic pipeline creation. I wanted to try and use a runner to run the pipeline instead of modifying the pipeline dict and wanted to know if you knew how to use the thread runner. I saw that it uses alot of the things that the session itself initializes like the hook manager and catalog
👍 1
e
See this example
Copy code
import time
import random

import pandas as pd
import yaml
from <http://kedro.io|kedro.io> import DataCatalog
from kedro.pipeline import node
from kedro.pipeline import pipeline
from kedro.runner import ParallelRunner, SequentialRunner, ThreadRunner


# Simulate an I/O-bound task
def io_bound_task():
    time.sleep(3)  # Simulate an I/O wait (e.g., reading from a file)
    input_data = f"input_data_{str(random.randint(0, 100))}"
    data = {
        "label": ["A", "B", "C"],
        "country": ["GB", "ES", "FR"],
        input_data: [1, 2, 3]
    }
    df = pd.DataFrame(data)
    return df


# Simulate a compute-bound task (matrix multiplication)
def compute_bound_task(input_data) -> str:
    # Simulate heavy compute that are not using multicore (not pandas/numpy etc)
    ans = 1
    for i in range(1, 50000):
        ans = ans * i
    return f"dummy_{str(random.randint(0, 100))}"


def create_data_catalog(shared_memory=False):
    """
    Use dataset factory pattern to make sure the benchmark cover the slowest path.
    """
    catalog_conf = """

output_1:
    type: pandas.CSVDataset
    filepath: data/test/output_1.csv'
output_2:
    type: pandas.CSVDataset
    filepath: data/test/output_2.csv'
# output_5:
#     type: kedro.io.MemoryDataset
"""
    catalog_conf = yaml.safe_load(catalog_conf)
    if shared_memory:
        from kedro.io.data_catalog import SharedMemoryDataCatalog
        catalog = SharedMemoryDataCatalog.from_config(catalog_conf)
    else:
        catalog = DataCatalog.from_config(catalog_conf)
    return catalog


def create_pipeline():
    dummy_pipeline = pipeline(
        [
            node(io_bound_task, inputs=None, outputs="output_1", name="node_1"),
            node(io_bound_task, inputs=None, outputs="output_2", name="node_2"),
            node(compute_bound_task, inputs="output_1", outputs="output_3", name="node_3"),
            node(compute_bound_task, inputs="output_2", outputs="output_4", name="node_4"),
            node(io_bound_task, inputs=None, outputs="output_5", name="node_5")
        ]
    )
    return dummy_pipeline


def main():
    catalog = create_data_catalog(shared_memory=True)
    test_pipeline = create_pipeline()
    runner_obj = ParallelRunner()
    res = runner_obj.run(test_pipeline, catalog=catalog)
• For
SequentialRunner
or
ThreadRunner
you need
DataCatalog
• For
ParallelRunner
-
SharedMemoryDataCatalog