Hi everyone, I had a question around serving kedr...
# questions
f
Hi everyone, I had a question around serving kedro pipelines. If i have a pipeline x that has some nodes, what's the best way to serve this pipeline? As of now, pipeline reads and writes files but during serving, ideally you want to send the received input to the pipeline (not reading the data from somewhere else) and returning the output (perhaps also not writing to a storage) directly. If there are some pointers i can have a look, would be awesome.
d
You can use
add_feed_dict()
to inject catalog entries into a catalog: https://docs.kedro.org/en/stable/api/kedro.io.DataCatalog.html#kedro.io.DataCatalog.add_feed_dict This may be useful if you're doing something like batch inference. If you're wanting to do inference on a single record, you may need to convert it into a dataframe of sorts (batch inference on one) or create a node that accepts something like JSON input. Does that help? At the very least, I feel like this functionality should be more discoverable—maybe something to improve!
t
You can check kedro-boot
d
(I completely forgot about `kedro-boot`; @Fazil Topal definitely check it out, since it's a much more tailored solution to what you're likely looking for!)
f
Thanks for the links, I'm now working on this and will get back if they don't solve my issue!
~So I took a look at the kedro boot and it looks a bit complicated to me. I couldn't really understand how to pass a data from serving endpoint to my pipeline while keeping rest of the logic same. Im doing a custom code for now. I have dataset factories in my catalog so how does add_feed_dict work with them? I tried to simply add this
.add_feed_dict({"01_text.mytest#md": injected_text}, replace=True)
but right after this if i do catalog load on this dataset, it reads the original data, not my injected text. Any idea @Deepyaman Datta?~
Okay so i'm almost done and there is one thing missing. When i run a kedro pipeline, i don't get the outputs since they're processed by the catalog and saved. Is there a way to make sure we also get the outputs regardless? This works only if catalog entry is not defined and kedro will return memory dataset. I still want kedro to save the dataset as is but an option to somehow get the outputs in the pipeline output without reloading it. Is it somehow possible? This is the flow i image: Kedro pipeline run + injected input -> node1 (saves intermediate to s3) -> node2 (saves intermediate to s3)
--> Here we ideally return the final dataset
Attaching my solution here for reference: I have created the
KedroPipeline
class which does setups and simplifies the run:
Copy code
from typing import Iterable, Any
from kedro.framework.project import pipelines
from kedro.framework.session import KedroSession
from kedro.runner import AbstractRunner, SequentialRunner
from kedro.io.core import generate_timestamp


def get_pipeline(
    pipeline_name: str | None = None,
    tags: Iterable[str] | None = None,
    node_names: Iterable[str] | None = None,
    from_nodes: Iterable[str] | None = None,
    to_nodes: Iterable[str] | None = None,
    from_inputs: Iterable[str] | None = None,
    to_outputs: Iterable[str] | None = None,
    namespace: str | None = None,
):
    name = pipeline_name or "__default__"

    try:
        pipeline = pipelines[name]
    except KeyError as exc:
        raise ValueError(
            f"Failed to find the pipeline named '{name}'. "
            f"It needs to be generated and returned "
            f"by the 'register_pipelines' function."
        ) from exc

    filtered_pipeline = pipeline.filter(
        tags=tags,
        from_nodes=from_nodes,
        to_nodes=to_nodes,
        node_names=node_names,
        from_inputs=from_inputs,
        to_outputs=to_outputs,
        node_namespace=namespace,
    )

    return filtered_pipeline


def get_version(request_id: str) -> str:
    return f"{request_id}/{generate_timestamp()}"


class KedroPipeline:
    def __init__(
        self,
        session: KedroSession,
        runner: AbstractRunner | None = None,
        pipeline_name: str | None = None,
        tags: Iterable[str] | None = None,
        node_names: Iterable[str] | None = None,
        from_nodes: Iterable[str] | None = None,
        to_nodes: Iterable[str] | None = None,
        from_inputs: Iterable[str] | None = None,
        to_outputs: Iterable[str] | None = None,
        namespace: str | None = None,
        load_versions: dict[str, str] | None = None
    ):
        self.session = session
        self.runner = runner or SequentialRunner()
        if not isinstance(self.runner, AbstractRunner):
            raise ValueError(
                "KedroPipeline expect an instance of Runner instead of a class."
                "Have you forgotten the `()` at the end of the statement?"
            )

        self.pipeline = get_pipeline(
            pipeline_name=pipeline_name,
            tags=tags,
            node_names=node_names,
            from_nodes=from_nodes,
            to_nodes=to_nodes,
            from_inputs=from_inputs,
            to_outputs=to_outputs,
            namespace=namespace
        )
        self._params = {
            "tags": tags,
            "from_nodes": from_nodes,
            "to_nodes": to_nodes,
            "node_names": node_names,
            "from_inputs": from_inputs,
            "to_outputs": to_outputs,
            "pipeline_name": pipeline_name,
            "namespace": namespace,
            "load_versions": load_versions,
            "runner": getattr(runner, "__name__", str(runner)),
        }

        self.context = self.on_startup()

    def on_startup(self):
        session_id = self.session.store["session_id"]
        save_version = session_id
        extra_params = self.session.store.get("extra_params") or {}
        context = self.session.load_context()

        self._params.update({
            "session_id": session_id,
            "project_path": self.session._project_path.as_posix(),
            "env": context.env,
            "extra_params": extra_params,
            "save_version": save_version
        })

        return context

    def run(self, request_id: str, new_datasets: dict = None) -> tuple[dict[str, Any], Any]:
        """Runs the pipeline with a runner.

        Raises:
            ValueError: If the named or `__default__` pipeline is not
                defined by `register_pipelines`.
            Exception: Any uncaught exception during the run will be re-raised
                after being passed to ``on_pipeline_error`` hook.
        Returns:
            Any node outputs that cannot be processed by the ``DataCatalog``.
            These are returned in a dictionary, where the keys are defined
            by the node outputs.
        """
        # Dataset versioning
        self.context.config_loader.runtime_params.update(
            {"version": get_version(request_id)}
        )

        catalog = self.context._get_catalog(
            save_version=self._params["save_version"],
            load_versions=self._params["load_versions"],
        )

        # Run the runner
        hook_manager = self.session._hook_manager

        if new_datasets is not None:
            catalog.add_feed_dict(new_datasets, replace=True)

        hook_manager.hook.before_pipeline_run(
            run_params=self._params, pipeline=self.pipeline, catalog=catalog
        )
        try:
            run_result = self.runner.run(
                self.pipeline, catalog, hook_manager, self._params["session_id"]
            )
        except Exception as error:
            hook_manager.hook.on_pipeline_error(
                error=error,
                run_params=self._params,
                pipeline=self.pipeline,
                catalog=catalog,
            )
            raise

        hook_manager.hook.after_pipeline_run(
            run_params=self._params,
            run_result=run_result,
            pipeline=self.pipeline,
            catalog=catalog,
        )

        # Load the final dataset again
        out = catalog.load(list(self.pipeline.outputs())[0])
        return run_result, out
And this is the app.py file that does fastapi logic:
Copy code
from fastapi import FastAPI, APIRouter, Body
from pydantic import BaseModel
from contextlib import asynccontextmanager
from .kedro import KedroPipeline
from kedro.framework.session import KedroSession
from kedro.framework.startup import bootstrap_project
from myproj import PROJECT_PATH
from myproj.pipelines import Datasets as ds


pipelines: dict[str, KedroPipeline] = {}


@asynccontextmanager
async def lifespan(app: FastAPI):   # noqa
    bootstrap_project(PROJECT_PATH)
    session = KedroSession.create(PROJECT_PATH, env="staging")
    <http://session._logger.info|session._logger.info>("Kedro project %s", session._project_path.name)

    for p in ["serving"]:   # Define pipelines to run
        pipelines[p] = KedroPipeline(session=session, pipeline_name=p)

    yield

    session.close()
    pipelines.clear()


app = FastAPI(lifespan=lifespan)
router_v1 = APIRouter(prefix="/v1")


@app.get("/")
async def home():
    return {"msg": "Hello World"}


class ReqInput(BaseModel):
    request_id: str
    text: str


@router_v1.post("/create_text")
async def create_text(request: ReqInput = Body(...)):

    _, output = pipelines["serving"].run(
        request.request_id, {ds.SERVING_DATASET_TEST: request.text}
    )

    return {
        "request_id": request.request_id,
        "output": output,
    }


app.include_router(router_v1)
When application boot up, i create the session and context then we invoke kedro run on each incoming requests and since we have versioning data io will also be fine.
d
Can you simply fork your output? Basically, whatever your final output is, create one no-op node that passes it to a catalog entry that persists it, and another no-op node that still returns it as a memory dataset?
f
That would still require dataset load since previous node wrote into the catalog, no? My workaround for now was to do this: # Load the final dataset again out = catalog.load(list(self.pipeline.outputs())[0]) return run_result, out Basically at the end, i load the pipeline output again and return it.
d
That would still require dataset load since previous node wrote into the catalog, no?
You could just write the previous node as memory dataset, in which case it wouldn't
Copy code
# nodes.py

def final_node(inp1, inp2, inp3):
    return inp1.join(inp2).join(inp3)

def identity(x):
    return x

# pipeline.py

def create_pipeline():
    return Pipeline(
        [
            node(final_node, [inp1, inp2, inp3], out),
            node(identity, out, out_disk),
            node(identity, out, out_mem),
        ]
    )

# catalog.yml

# out, out_mem are memory datasets by default

out_disk:
  type: pickle.PickleDataset
f
Ah I see, yes that would work, a bit hacky but yes. I'll def keep this in mind if i need to remove my reloading logic, thanks!
d
Ah I see, yes that would work, a bit hacky but yes.
You can potentially make it less hacky with a hook that does it, but honestly not sure it's worth it for just one output like this. 🤷
f
If we could do a hook that is somehow executed only during serving, that would be nice but how would this behavior be replicated there to return the data? 🤔
I guess, we could do
after_node_run
store the data in the catalog as a memory dataset, since we hooks don't return anything, and then read this custom dataset after the pipeline is finished?
d
This is very old code that I'm sure would need modification to work, but, essentially, you can (1) add a catalog entry before pipeline run and (2) add another save call after node run: https://github.com/deepyaman/kedro-accelerator/blob/develop/src/kedro_accelerator/plugins/__init__.py#L49-L73 Again, lot more worthwhile for a situation like this where you're updating a lot of entries, not sure it is as helpful if it's just for one entry.
👍 1