Rennan Haro
11/14/2023, 4:51 PMcatalog
property from the KedroContext
is read-only. Does it mean that catalog.add
does not work when calling from the context? Seems like I’m missing something pretty basic here.
I’m trying to replace the catalog entry data_set_1
with the data_frame_1
, and run the predict pipeline with the updated catalog. It does not work (no errors, the dataset is simply not updated):
bootstrap_project(project_root)
data_frame_1 = pd.DataFrame(# dataframe data here...)
with KedroSession.create(project_path=project_root, env="prd") as session:
session.load_context().catalog.add(
data_set_name="data_set_1",
data_set=data_frame_1,
replace=True,
)
session.run(pipeline_name="predict")
data_frame_1
with the catalog’s data_set_1
(after running catalog.add
) I have two different dataframesJuan Luis
11/14/2023, 5:14 PMcatalog.add
but it won't be reflected on the rest of the application. am I understanding your issue correctly?Rennan Haro
11/14/2023, 5:19 PMTakieddine Kadiri
11/14/2023, 5:33 PMSession.run
create/materialize the DataCatalog based on your catalog.yml. In your code it is executed after your data injection.
You can try kedro-boot
plugin, it try to help achieve your exact use case; injecting data/params into kedro pipelines from an external application, and performing multiple pipeline runs with low latency. Here is an example repo that demonstrates among other the integration of a fastapi app with kedro using kedro-bootRennan Haro
11/14/2023, 5:43 PMSession.run
create/materialize the DataCatalog based on your catalog.yml. In your code it is executed after your data injection.
@Takieddine Kadiri the plugin looks awesome.
Is it possible to materialize it before running the session, and ensure we’re running the session with the injected data?Yolan Honoré-Rougé
11/14/2023, 6:09 PMBootSession
you can:
• declare some inputs as artifacts
to "preload" them and ensure the BootSession
runs fast
• Declare your data to inject at runtime as inputs
Takieddine Kadiri
11/14/2023, 6:39 PMfrom fastapi import FastApi
from kedro_boot.booter import boot_session
kedro_boot_session = boot_session(project_path)
app = FastAPI()
@app.get("/your_endpoint")
def your_endpoint(your_data),
return kedro_boot_session.run(name="your_pieline", inputs={"input_dataset": your_data.dict()})
_if your_data
is a json and your iput_dataset
a pandas, kedro boot will automatically do the conversion_
• In your pipeline_registry.py
declare an AppPipeline and expose your input dataset
from kedro.pipeline.modular_pipeline import pipeline
from kedro_boot.pipeline import app_pipeline
your_pipeline = pipeline([node(your_function, inputs=["input_dataset", "model"], outputs="output_dataset")])
your_app_pipeline = app_pipeline(
your_pipeline,
name="your_pipeline",
inputs="input_dataset",
)
return {"_default_": your_app_pipeline}
Bonus: If you want to preload some dataset as MemoryDataset at startup time (your ml model for example), so it will not be loaded at each request, you can declare it as artifacts in your app_pipeline
your_app_pipeline = app_pipeline(
your_pipeline,
name="your_pipeline",
inputs="input_dataset",
artifacts="model"
)
Rennan Haro
11/14/2023, 8:41 PMcatalog_ = session.load_context().catalog
2. Overwrite datasets as MemoryDatasets into this catalog: dataset_to_update = MemoryDataset(data)
; catalog_.add(data_set_name="existing_dataset", data_set=dataset_to_update, replace=True)
3. Run the pipeline with runner.run
(instead of session.run
), passing the created catalog as a parameter: SequentialRunner().run(pipeline=pipelines["predict"], catalog=catalog_)