Hi, I have a question, (and very likely what I'm t...
# questions
a
Hi, I have a question, (and very likely what I'm trying to do is a kedro anti-pattern) Basically I'd like to have a node pipeline With a diamond shape : EntryNode --> [IntermediateNode X for X in list] ---> OutputNode Doing this require each intermediate node to have a runtime (in code, not in static catalog.yml files) generation of dataset. I don't want to use them to store any data, thoses datasets would just be dummy ones in order to keep the dependency/ordering of nodes Any ideas on how I could deal with that ? (Ideally within the create_pipeline file) Thank you
d
Sorry, I meant to create an example and get back to you, but I didn't have time yet. ๐Ÿ˜… But, in short, this should be quite easy. Here's a quick example (with stubbed-out functionality):
Copy code
"""
This is a boilerplate pipeline 'diamond'
generated using Kedro 0.18.4
"""

import random
from functools import partial, update_wrapper
from operator import itemgetter

import pandas as pd
from kedro.pipeline import Pipeline, node, pipeline


def prepare_training_data(raw_data: pd.DataFrame) -> pd.DataFrame:
    return raw_data


def train_model(
    model: str, training_data: pd.DataFrame
) -> tuple["sklearn.BaseEstimator", tuple[str, float]]:
    trained_model, accuracy = "...", random.random()
    ...
    return trained_model, (model, accuracy)


def print_best_model(*accuracies: dict) -> None:
    print(max(accuracies, key=itemgetter(1)))


def create_pipeline(**kwargs) -> Pipeline:
    random.seed(42)

    names = [
        "nearest_neighbors",
        "linear_svm",
        "rbf_svm",
        "gaussian_process",
        "decision_tree",
        "random_forest",
        "neural_net",
        "adaboost",
        "naive_bayes",
        "qda",
    ]

    return pipeline(
        [
            node(prepare_training_data, "raw_data", "training_data"),
            *[
                node(
                    update_wrapper(partial(train_model, model=name), train_model),
                    {"training_data": "training_data"},
                    [f"trained_{name}_model", f"{name}_accuracy"],
                )
                for name in names
            ],
            node(print_best_model, [f"{name}_accuracy" for name in names], None),
        ]
    )
If I run `kedro-viz`:
(I created that example by just doing
kedro pipeline create diamond
in a new project, adding a dummy
raw_data
catalog entry that loads some CSV file, and adding the above contents to
src/${PROJECT_NAME}/pipelines/diamond/pipeline.py
, if you want to play with it) (I did also use some Python 3.9 or 3.10 annotation syntax; if it doesn't work for you, you can do
from __future__ import annotations
up top)
a
Hi @Deepyaman Datta Thank you so much for answering me and taking the time to create an example! Very glad the kedro viz is exactly what I was looking for Question regarding the datacatalog entries, did you created the entries in a specific file or are they implictly memoryDataSet if not specified ? Usually when I run a pipeline and a node input doesn't exist in the datacatalog, it raises an error
d
They're implicitly
MemoryDataSet
if not specified. You only must create catalog entries for inputs that are not produced elsewhere in the pipeline.
(which is why I added the
raw_data
entry)
a
@Deepyaman Datta Thanks again ๐Ÿ™ In the unfortunate case where I would need each train node to output their result to an S3 CSV dataset (because MemoryDataset won't work for runners that isolate nodes) is there a way to workaround that ? Some kind of templated datacatalog entry so each train node would write in s3 like the following :
Copy code
model/train_models/{trainer}_result.csv
Ideally would create the datacatalog entries at the begining of the create_pipeline function, but it seems that's the wrong way to do it and also env based credentials are out of reach
d
Yeah, this is where it gets hairy. There are several ways to achieve this in some form or another--maintaining another loop e.g. using Jinja in the catalog itself to generate these entries; using a
before_pipeline_run
hook;
after_node_run
hook that applies to this particular set of nodes and also writes them to S3 + a
PartitionedDataSet
configured to read from there; etc.--but none of them lead to a very clean solution.
Some kind of templated datacatalog entry so each train node would write in s3 like the following :
Copy code
model/train_models/{trainer}_result.csv
This is what
PartitionedDataSet
is best for. However, the standard way to use
PartitionedDataSet
is to write to it from a single node.
a
I see, thanks for listing these ideas ๐Ÿ™, I'll investigate and pick the best/cleanest one for us Is it something that Kedro would like to be able to handle at some point or an edge-case/misuse that is not planned to be covered ?
d
I believe it's on our radar, but I'm not sure what the timeline/priority is. https://github.com/kedro-org/kedro/issues/1963 covers an approach in a bit more depth. In case @Nok Lam Chan @datajoely @Merel @Yetunde are interested in weighing in (I know this is something that comes up quite often within QuantumBlack as well, but I'm no longer privy to those conversations :)).
n
Thanks @Deepyaman Datta , personally I still prefer the Jinja way more from the point of view that static pipeline are easier to understand and debug. Another side benefit is that it will be shown correctly in viz. I believe viz doesnโ€™t capture the dynamic changes from hook. I guess this issue also fall into the category of Creating a Dynamic Pipeline.
๐Ÿ‘ 2