:wave: I am just starting with kedro. I am putting...
# questions
s
👋 I am just starting with kedro. I am putting together an example pipeline to make sure I understand all the concepts before I build a larger project with it. One concept I have not able to fully figure out is how to work with APIs. The simple project I am trying to build is: 1. get top N articles on hacker news 2. fetch the items, to get the urls 3. use an api to summarize the url 4. save the summary, url, etc I am able to easily build the HackerNewsTopAPIDataset for getting the top items. However, I am not able to figure out how to get those item ids into the HackerNewsItemsAPIDataset. I am of course able to fill up the node function with all kinds of io and get it to work, which is what I did. However, everything I read says this is the wrong approach and node functions should be purely functional. I have stumbled into the stackoverflow question, https://stackoverflow.com/questions/73430557/dynamic-parameters-on-datasets-in-kedro, which talks about adding kwargs to the _load(). However I do not see how to pass arguments into the load without explicitly pulling datasets into the node function and calling it directly. This brings me back to doing io in node functions. Now I am left scratching my head on how to link datasets that require input to be able to function. Any insights or pointers would be greatly appreciated.
The other idea I played with is to have the HackerNewsItemsAPIDataset.load() return self and call the function get_items()
Copy code
def hacker_news_items(api: HackerNewsItemsAPIDataset, item_ids: list[int]) -> list[dict]:
  return api.get_items(item_ids)
e
What you’re running into is a pattern with dynamic inputs - APIs where the second call depends on the result of the first. Kedro datasets are designed to be declarative and static (configured in the catalog), so they don’t directly support passing runtime values into
_load()
. That’s why it feels awkward. The recommended approach in Kedro is: • Keep API I/O inside datasets, not in node functions. • If you need runtime parameters (like the list of HackerNews IDs), capture them as data flowing through the pipeline, not as dataset arguments. So in your example: 1. Create one dataset (
HackerNewsTopAPIDataset
) that loads the top N IDs. 2. Write a node function that takes those IDs and transforms them into a format that your next dataset can understand (for example, a list of IDs or a table). 3. Create another dataset (
HackerNewsItemsAPIDataset
) that is parametrized to fetch items for a given list of IDs - but instead of trying to pass the IDs directly into the dataset config, you make the dataset read from an intermediate file or memory dataset produced by the previous node. In other words, rather than trying to call
.load()
on a dataset inside your node, you let Kedro orchestrate: • Node 1 produces the IDs written to a dataset (
MemoryDataset
or
JSONDataSet
) • Node 2 consumes that dataset (the IDs), and calls your items API • Node 3 consumes the items and calls the summarization API • Node 4 saves the outputs Alternatively if you really need runtime kwargs: Kedro does have a hook (
before_dataset_loaded
) where you can inject arguments dynamically at runtime (like changing the
url
or
params
of an API dataset).
s
Oh! The mistake in my thinking was having a dataset equate to a single datasource. And assuming I should only pass information through the pipeline/node functions. When a dataset class can pull in N other datasets to create a single view! This was very helpful, thank you!
🙌 1
I am stuck on set 3. I am not able to figure out how to access the catalog in a HackerNewsItems in order to load a dataset created in 2. The internet is full of obsolete documentation solving this problem with
get_current_session
and other solutions which no longer exist. I am not able to figure out how to obtain access to the current catalog instance within the dataset. I am assuming that I need to get access to the dataset within HackerNewsItems:
Copy code
from <http://kedro.io|kedro.io> import AbstractDataset


class HackerNewsItemsDataset(AbstractDataset):
    def __init__(self, url: str):
        self._url = url

    def _describe(self):
        return {
            "url": self._url
        }

    def _load(self):
        # how do I get catalog in the dataset?
        top = catalog.load("hn_top")
        # do other things
I am going to look into hooks to see if I can do something like
set_catalog
but again this feels off and janky. I am starting to wonder if I am trying to force a square peg into a round hole with kedro.
e
Datasets should not depend on the catalog. They’re meant to be small, declarative wrappers around I/O (load/save a file, call an API, etc.), configured in the catalog. If a dataset reaches into the catalog to load another dataset, you end up with hidden dependencies and break reproducibility. That’s why you won’t find an official
catalog
object in datasets.
In your case you can do the following: • Node writes IDs to an intermediate dataset (e.g. JSON file). • Custom dataset
HackerNewsItemsDataset
loads those IDs and fetches articles in its
_load()
s
I think the thing I am missing is how HackerNewsItemsDataset gets the json file. If its not meant to be available through the catalog and the ids should not be passed in via a function in the node, then I am at a loss how to refer to the id dataset.
Is the recommendation to hardcode a relative path in the dataset?
HN_TOP_IDS_JSON="path/to/top_ids.json"
As far as I can see there is no config object available in the dataset either.
e
Your use case is a bit tricky because you want to configure the dataset dynamically, though normally, it’s static and done via catalog. That’s why I suggest a workaround here.
Copy code
# catalog.yml

hn_ids_file:
  type: kedro.extras.datasets.json.JSONDataSet
  filepath: data/01_raw/hn_ids.json

hn_items:
  type: path.to.HackerNewsItemsDataset
  ids_filepath: data/01_raw/hn_ids.json
  ...
Copy code
# nodes.py
def identity_save_ids(ids):
    # simply return ids and Kedro will save it to hn_ids_file if the node outputs are configured accordingly
    return hn_ids_file
Copy code
# pipeline.py

from kedro.pipeline import Pipeline, node, pipeline
from .nodes import identity_save_ids

def create_pipeline(**kwargs) -> Pipeline:
    return pipeline(
        [
            # 1) load top ids, 2) save them to JSON dataset (hn_ids_file),
            node(identity_save_ids, inputs="hn_top", outputs="hn_ids_file", name="save_ids"),
            # 3) when some node needs hn_items, Kedro will call hn_items.load(),
            #    which reads ids_filepath and fetches articles
        ]
    )
s
This is what I came up with:
Copy code
from typing import Optional
import aiohttp
import asyncio
import pandas as pd
from <http://kedro.io|kedro.io> import AbstractDataset
from <http://kedro.io|kedro.io> import DataCatalog

# Uses api defined at <https://github.com/HackerNews/API?tab=readme-ov-file>
class HackerNewsUrlsDataset(AbstractDataset):
    # dependencies is a list of other datasets defined in the catalog
    def __init__(self, url: str, dependencies: list[str]):
        self._url = url
        self._catalog: Optional[DataCatalog] = None
        self._dependencies = dependencies
        self._datasets = {}

    def _describe(self):
        return {
            "url": self._url
        }

    def inject_catalog(self, catalog):
        self._catalog = catalog

    def load(self):
        if not self._catalog:
            raise RuntimeError("catalog has not been injected into HackerNewsUrlsDataset")

        datasets = self._load_datasets()

        return self.get_items(datasets['hn_top'])

    def save(self, data: pd.DataFrame):
        raise NotImplementedError("HackerNewsUrls is a readonly api")


    def _load_datasets(self):
        datasets = {}
        for dependency in self._dependencies:
            datasets[dependency] = self._catalog.load(dependency)
        return datasets


    def get_items(self, item_ids: pd.DataFrame) -> pd.DataFrame:
        # <https://hacker-news.firebaseio.com/v0/item/{id}.json> -> <https://hacker-news.firebaseio.com/v0/item/1234567.json>
        urls = [self._url.format(id=item_id["item_id"]) for _, item_id in item_ids.iterrows()]

        items_df = asyncio.run(self._fetch_all(urls))
        return items_df

    async def _fetch_all(self, urls: list[str]) -> pd.DataFrame:
        async with aiohttp.ClientSession() as session:
            tasks = [self._fetch_url(session, url) for url in urls]
            results = await asyncio.gather(*tasks)
            return pd.DataFrame(results)

    async def _fetch_url(self, session: aiohttp.ClientSession, url: str) -> dict:
        async with session.get(url) as response:
            return await response.json()
I am not exactly happy with it because of the 'inject_catalog' function. The reason I want to rely on the catalog, is because at some point there will be apis that require credentials. And yes, this means that the dependency on
hn_top
is not seen in the pipeline 😞 Here is the pipeline
Copy code
def create_pipeline(**kwargs) -> Pipeline:

    return Pipeline(
        [
            node(
                func=current_collection,
                inputs=None,
                outputs="current_collection",
                name="get_current_collection_node"
            ),
            node(
                func=hacker_news_get_top,
                inputs=["current_collection", "hn_top_api", "params:hacker_news_api"],
                outputs=["hn_top_db", "hn_top"],
                name="get_hacker_news_top_items"
            ),
            node(
                func=hacker_news_urls,
                inputs=["current_collection", "hn_urls_api", "hn_top"],
                outputs="hn_urls_db",
                name="get_hacker_news_urls_node"
            )

        ]
    )
I put the "hn_top" in the inputs=[] only to document the dependency, its not used and does not need to be there.
I think injecting the catalog from a hook 'works'
Copy code
@hook_impl
    def after_catalog_created(self, catalog: DataCatalog) -> None:
        for _, dataset in catalog.items():
            if hasattr(dataset, "inject_catalog"):
                dataset.inject_catalog(catalog)
but the primary issue is the implicit dependencies this creates.