[SOLVED] :white_check_mark: Hi all. How can I upda...
# questions
r
[SOLVED] Hi all. How can I update (replace) catalog entries from an existing Kedro session? I've read in the docs that the
catalog
property from the
KedroContext
is read-only. Does it mean that
catalog.add
does not work when calling from the context? Seems like I’m missing something pretty basic here. I’m trying to replace the catalog entry
data_set_1
with the
data_frame_1
, and run the predict pipeline with the updated catalog. It does not work (no errors, the dataset is simply not updated):
Copy code
bootstrap_project(project_root)    

data_frame_1 = pd.DataFrame(# dataframe data here...) 

with KedroSession.create(project_path=project_root, env="prd") as session:
    session.load_context().catalog.add(
        data_set_name="data_set_1",
        data_set=data_frame_1,
        replace=True,
    )
    session.run(pipeline_name="predict")
When comparing
data_frame_1
with the catalog’s
data_set_1
(after running
catalog.add
) I have two different dataframes
j
hi @Rennan Haro, you might be hitting this https://github.com/kedro-org/kedro/issues/2728 so, yes you can do
catalog.add
but it won't be reflected on the rest of the application. am I understanding your issue correctly?
👍 1
r
Yep. I’m building a FastAPI application to expose a couple of endpoints that manipulate parameters/datasets based on the requests’ payloads and runs a kedro pipeline with the updated catalog. Is it possible at all to inject datasets “on the fly” to an existing catalog, and run a pipeline with this updated catalog?
t
Session.run
create/materialize the DataCatalog based on your catalog.yml. In your code it is executed after your data injection. You can try
kedro-boot
plugin, it try to help achieve your exact use case; injecting data/params into kedro pipelines from an external application, and performing multiple pipeline runs with low latency. Here is an example repo that demonstrates among other the integration of a fastapi app with kedro using kedro-boot
K 3
r
>
Session.run
create/materialize the DataCatalog based on your catalog.yml. In your code it is executed after your data injection. @Takieddine Kadiri the plugin looks awesome. Is it possible to materialize it before running the session, and ensure we’re running the session with the injected data?
y
If I understand your question correctly, when you instantiate the
BootSession
you can: • declare some inputs as
artifacts
to "preload" them and ensure the
BootSession
runs fast • Declare your data to inject at runtime as
inputs
👍 2
t
Here is a minimal example of using kedro boot for your use case. • Create a kedro boot session, and use it in your fastapi app.
Copy code
from fastapi import FastApi
from kedro_boot.booter import boot_session

kedro_boot_session = boot_session(project_path)

app = FastAPI()
@app.get("/your_endpoint")
def your_endpoint(your_data),
	return kedro_boot_session.run(name="your_pieline", inputs={"input_dataset": your_data.dict()})
_if
your_data
is a json and your
iput_dataset
a pandas, kedro boot will automatically do the conversion_ • In your
pipeline_registry.py
declare an AppPipeline and expose your input dataset
Copy code
from kedro.pipeline.modular_pipeline import pipeline
from kedro_boot.pipeline import app_pipeline

your_pipeline = pipeline([node(your_function, inputs=["input_dataset", "model"], outputs="output_dataset")])

your_app_pipeline = app_pipeline(
        your_pipeline,
        name="your_pipeline",
        inputs="input_dataset",
    )

return {"_default_": your_app_pipeline}
Bonus: If you want to preload some dataset as MemoryDataset at startup time (your ml model for example), so it will not be loaded at each request, you can declare it as artifacts in your
app_pipeline
Copy code
your_app_pipeline = app_pipeline(
        your_pipeline,
        name="your_pipeline",
        inputs="input_dataset",
        artifacts="model"
    )
👍 1
❤️ 1
r
Thanks a lot for the suggestions. I wish we had come across kedro-boot earlier hahaha. As we had implemented already a bunch of things and are short on time, migrating to kedro-boot might come on a future iteration. To solve our problem, what we did was: 1. Assign the session catalog to a variable:
catalog_ = session.load_context().catalog
2. Overwrite datasets as MemoryDatasets into this catalog:
dataset_to_update = MemoryDataset(data)
;
catalog_.add(data_set_name="existing_dataset", data_set=dataset_to_update, replace=True)
3. Run the pipeline with
runner.run
(instead of
session.run
), passing the created catalog as a parameter:
SequentialRunner().run(pipeline=pipelines["predict"], catalog=catalog_)
👍 1
👍🏼 1