Alexis Eutrope
02/18/2023, 8:22 PMDeepyaman Datta
02/20/2023, 10:28 PM"""
This is a boilerplate pipeline 'diamond'
generated using Kedro 0.18.4
"""
import random
from functools import partial, update_wrapper
from operator import itemgetter
import pandas as pd
from kedro.pipeline import Pipeline, node, pipeline
def prepare_training_data(raw_data: pd.DataFrame) -> pd.DataFrame:
return raw_data
def train_model(
model: str, training_data: pd.DataFrame
) -> tuple["sklearn.BaseEstimator", tuple[str, float]]:
trained_model, accuracy = "...", random.random()
...
return trained_model, (model, accuracy)
def print_best_model(*accuracies: dict) -> None:
print(max(accuracies, key=itemgetter(1)))
def create_pipeline(**kwargs) -> Pipeline:
random.seed(42)
names = [
"nearest_neighbors",
"linear_svm",
"rbf_svm",
"gaussian_process",
"decision_tree",
"random_forest",
"neural_net",
"adaboost",
"naive_bayes",
"qda",
]
return pipeline(
[
node(prepare_training_data, "raw_data", "training_data"),
*[
node(
update_wrapper(partial(train_model, model=name), train_model),
{"training_data": "training_data"},
[f"trained_{name}_model", f"{name}_accuracy"],
)
for name in names
],
node(print_best_model, [f"{name}_accuracy" for name in names], None),
]
)
kedro pipeline create diamond
in a new project, adding a dummy raw_data
catalog entry that loads some CSV file, and adding the above contents to src/${PROJECT_NAME}/pipelines/diamond/pipeline.py
, if you want to play with it)
(I did also use some Python 3.9 or 3.10 annotation syntax; if it doesn't work for you, you can do from __future__ import annotations
up top)Alexis Eutrope
02/22/2023, 10:03 PMDeepyaman Datta
02/22/2023, 10:15 PMMemoryDataSet
if not specified. You only must create catalog entries for inputs that are not produced elsewhere in the pipeline.raw_data
entry)Alexis Eutrope
02/22/2023, 10:32 PMmodel/train_models/{trainer}_result.csv
Deepyaman Datta
02/22/2023, 10:44 PMbefore_pipeline_run
hook; after_node_run
hook that applies to this particular set of nodes and also writes them to S3 + a PartitionedDataSet
configured to read from there; etc.--but none of them lead to a very clean solution.
Some kind of templated datacatalog entry so each train node would write in s3 like the following :
This is whatCopy codemodel/train_models/{trainer}_result.csv
PartitionedDataSet
is best for. However, the standard way to use PartitionedDataSet
is to write to it from a single node.Alexis Eutrope
02/22/2023, 11:05 PMDeepyaman Datta
02/23/2023, 12:38 AMNok Lam Chan
02/23/2023, 6:34 AM