Fazil Topal
04/20/2025, 2:13 PMDeepyaman Datta
04/20/2025, 2:34 PMadd_feed_dict()
to inject catalog entries into a catalog: https://docs.kedro.org/en/stable/api/kedro.io.DataCatalog.html#kedro.io.DataCatalog.add_feed_dict
This may be useful if you're doing something like batch inference. If you're wanting to do inference on a single record, you may need to convert it into a dataframe of sorts (batch inference on one) or create a node that accepts something like JSON input.
Does that help? At the very least, I feel like this functionality should be more discoverable—maybe something to improve!Takieddine Kadiri
04/20/2025, 7:20 PMDeepyaman Datta
04/20/2025, 8:19 PMFazil Topal
04/27/2025, 1:30 PMFazil Topal
04/28/2025, 1:45 PM.add_feed_dict({"01_text.mytest#md": injected_text}, replace=True)
but right after this if i do catalog load on this dataset, it reads the original data, not my injected text. Any idea @Deepyaman Datta?~Fazil Topal
04/28/2025, 7:05 PM--> Here we ideally return the final dataset
Fazil Topal
04/28/2025, 7:17 PMKedroPipeline
class which does setups and simplifies the run:
from typing import Iterable, Any
from kedro.framework.project import pipelines
from kedro.framework.session import KedroSession
from kedro.runner import AbstractRunner, SequentialRunner
from kedro.io.core import generate_timestamp
def get_pipeline(
pipeline_name: str | None = None,
tags: Iterable[str] | None = None,
node_names: Iterable[str] | None = None,
from_nodes: Iterable[str] | None = None,
to_nodes: Iterable[str] | None = None,
from_inputs: Iterable[str] | None = None,
to_outputs: Iterable[str] | None = None,
namespace: str | None = None,
):
name = pipeline_name or "__default__"
try:
pipeline = pipelines[name]
except KeyError as exc:
raise ValueError(
f"Failed to find the pipeline named '{name}'. "
f"It needs to be generated and returned "
f"by the 'register_pipelines' function."
) from exc
filtered_pipeline = pipeline.filter(
tags=tags,
from_nodes=from_nodes,
to_nodes=to_nodes,
node_names=node_names,
from_inputs=from_inputs,
to_outputs=to_outputs,
node_namespace=namespace,
)
return filtered_pipeline
def get_version(request_id: str) -> str:
return f"{request_id}/{generate_timestamp()}"
class KedroPipeline:
def __init__(
self,
session: KedroSession,
runner: AbstractRunner | None = None,
pipeline_name: str | None = None,
tags: Iterable[str] | None = None,
node_names: Iterable[str] | None = None,
from_nodes: Iterable[str] | None = None,
to_nodes: Iterable[str] | None = None,
from_inputs: Iterable[str] | None = None,
to_outputs: Iterable[str] | None = None,
namespace: str | None = None,
load_versions: dict[str, str] | None = None
):
self.session = session
self.runner = runner or SequentialRunner()
if not isinstance(self.runner, AbstractRunner):
raise ValueError(
"KedroPipeline expect an instance of Runner instead of a class."
"Have you forgotten the `()` at the end of the statement?"
)
self.pipeline = get_pipeline(
pipeline_name=pipeline_name,
tags=tags,
node_names=node_names,
from_nodes=from_nodes,
to_nodes=to_nodes,
from_inputs=from_inputs,
to_outputs=to_outputs,
namespace=namespace
)
self._params = {
"tags": tags,
"from_nodes": from_nodes,
"to_nodes": to_nodes,
"node_names": node_names,
"from_inputs": from_inputs,
"to_outputs": to_outputs,
"pipeline_name": pipeline_name,
"namespace": namespace,
"load_versions": load_versions,
"runner": getattr(runner, "__name__", str(runner)),
}
self.context = self.on_startup()
def on_startup(self):
session_id = self.session.store["session_id"]
save_version = session_id
extra_params = self.session.store.get("extra_params") or {}
context = self.session.load_context()
self._params.update({
"session_id": session_id,
"project_path": self.session._project_path.as_posix(),
"env": context.env,
"extra_params": extra_params,
"save_version": save_version
})
return context
def run(self, request_id: str, new_datasets: dict = None) -> tuple[dict[str, Any], Any]:
"""Runs the pipeline with a runner.
Raises:
ValueError: If the named or `__default__` pipeline is not
defined by `register_pipelines`.
Exception: Any uncaught exception during the run will be re-raised
after being passed to ``on_pipeline_error`` hook.
Returns:
Any node outputs that cannot be processed by the ``DataCatalog``.
These are returned in a dictionary, where the keys are defined
by the node outputs.
"""
# Dataset versioning
self.context.config_loader.runtime_params.update(
{"version": get_version(request_id)}
)
catalog = self.context._get_catalog(
save_version=self._params["save_version"],
load_versions=self._params["load_versions"],
)
# Run the runner
hook_manager = self.session._hook_manager
if new_datasets is not None:
catalog.add_feed_dict(new_datasets, replace=True)
hook_manager.hook.before_pipeline_run(
run_params=self._params, pipeline=self.pipeline, catalog=catalog
)
try:
run_result = self.runner.run(
self.pipeline, catalog, hook_manager, self._params["session_id"]
)
except Exception as error:
hook_manager.hook.on_pipeline_error(
error=error,
run_params=self._params,
pipeline=self.pipeline,
catalog=catalog,
)
raise
hook_manager.hook.after_pipeline_run(
run_params=self._params,
run_result=run_result,
pipeline=self.pipeline,
catalog=catalog,
)
# Load the final dataset again
out = catalog.load(list(self.pipeline.outputs())[0])
return run_result, out
And this is the app.py file that does fastapi logic:
from fastapi import FastAPI, APIRouter, Body
from pydantic import BaseModel
from contextlib import asynccontextmanager
from .kedro import KedroPipeline
from kedro.framework.session import KedroSession
from kedro.framework.startup import bootstrap_project
from myproj import PROJECT_PATH
from myproj.pipelines import Datasets as ds
pipelines: dict[str, KedroPipeline] = {}
@asynccontextmanager
async def lifespan(app: FastAPI): # noqa
bootstrap_project(PROJECT_PATH)
session = KedroSession.create(PROJECT_PATH, env="staging")
<http://session._logger.info|session._logger.info>("Kedro project %s", session._project_path.name)
for p in ["serving"]: # Define pipelines to run
pipelines[p] = KedroPipeline(session=session, pipeline_name=p)
yield
session.close()
pipelines.clear()
app = FastAPI(lifespan=lifespan)
router_v1 = APIRouter(prefix="/v1")
@app.get("/")
async def home():
return {"msg": "Hello World"}
class ReqInput(BaseModel):
request_id: str
text: str
@router_v1.post("/create_text")
async def create_text(request: ReqInput = Body(...)):
_, output = pipelines["serving"].run(
request.request_id, {ds.SERVING_DATASET_TEST: request.text}
)
return {
"request_id": request.request_id,
"output": output,
}
app.include_router(router_v1)
When application boot up, i create the session and context then we invoke kedro run on each incoming requests and since we have versioning data io will also be fine.Deepyaman Datta
04/28/2025, 7:53 PMFazil Topal
04/28/2025, 8:06 PMDeepyaman Datta
04/28/2025, 8:07 PMThat would still require dataset load since previous node wrote into the catalog, no?You could just write the previous node as memory dataset, in which case it wouldn't
Deepyaman Datta
04/28/2025, 8:13 PM# nodes.py
def final_node(inp1, inp2, inp3):
return inp1.join(inp2).join(inp3)
def identity(x):
return x
# pipeline.py
def create_pipeline():
return Pipeline(
[
node(final_node, [inp1, inp2, inp3], out),
node(identity, out, out_disk),
node(identity, out, out_mem),
]
)
# catalog.yml
# out, out_mem are memory datasets by default
out_disk:
type: pickle.PickleDataset
Fazil Topal
04/28/2025, 8:31 PMDeepyaman Datta
04/28/2025, 8:51 PMAh I see, yes that would work, a bit hacky but yes.You can potentially make it less hacky with a hook that does it, but honestly not sure it's worth it for just one output like this. 🤷
Fazil Topal
04/28/2025, 9:13 PMFazil Topal
04/28/2025, 9:19 PMafter_node_run
store the data in the catalog as a memory dataset, since we hooks don't return anything, and then read this custom dataset after the pipeline is finished?Deepyaman Datta
04/29/2025, 11:21 AM