Swift
09/18/2025, 11:27 PMSwift
09/18/2025, 11:36 PMdef hacker_news_items(api: HackerNewsItemsAPIDataset, item_ids: list[int]) -> list[dict]:
return api.get_items(item_ids)Elena Khaustova
09/19/2025, 9:25 AM_load(). That’s why it feels awkward.
The recommended approach in Kedro is:
• Keep API I/O inside datasets, not in node functions.
• If you need runtime parameters (like the list of HackerNews IDs), capture them as data flowing through the pipeline, not as dataset arguments.
So in your example:
1. Create one dataset (HackerNewsTopAPIDataset) that loads the top N IDs.
2. Write a node function that takes those IDs and transforms them into a format that your next dataset can understand (for example, a list of IDs or a table).
3. Create another dataset (HackerNewsItemsAPIDataset) that is parametrized to fetch items for a given list of IDs - but instead of trying to pass the IDs directly into the dataset config, you make the dataset read from an intermediate file or memory dataset produced by the previous node.
In other words, rather than trying to call .load() on a dataset inside your node, you let Kedro orchestrate:
• Node 1 produces the IDs written to a dataset (MemoryDataset or JSONDataSet)
• Node 2 consumes that dataset (the IDs), and calls your items API
• Node 3 consumes the items and calls the summarization API
• Node 4 saves the outputs
Alternatively if you really need runtime kwargs:
Kedro does have a hook (before_dataset_loaded) where you can inject arguments dynamically at runtime (like changing the url or params of an API dataset).Swift
09/19/2025, 1:46 PMSwift
09/20/2025, 6:06 PMget_current_session and other solutions which no longer exist. I am not able to figure out how to obtain access to the current catalog instance within the dataset.
I am assuming that I need to get access to the dataset within HackerNewsItems:
from <http://kedro.io|kedro.io> import AbstractDataset
class HackerNewsItemsDataset(AbstractDataset):
def __init__(self, url: str):
self._url = url
def _describe(self):
return {
"url": self._url
}
def _load(self):
# how do I get catalog in the dataset?
top = catalog.load("hn_top")
# do other things
I am going to look into hooks to see if I can do something like set_catalog but again this feels off and janky.
I am starting to wonder if I am trying to force a square peg into a round hole with kedro.Elena Khaustova
09/22/2025, 8:30 AMcatalog object in datasets.Elena Khaustova
09/22/2025, 8:52 AMHackerNewsItemsDataset loads those IDs and fetches articles in its _load()Swift
09/22/2025, 1:30 PMSwift
09/22/2025, 1:36 PMHN_TOP_IDS_JSON="path/to/top_ids.json"
As far as I can see there is no config object available in the dataset either.Elena Khaustova
09/22/2025, 1:53 PM# catalog.yml
hn_ids_file:
type: kedro.extras.datasets.json.JSONDataSet
filepath: data/01_raw/hn_ids.json
hn_items:
type: path.to.HackerNewsItemsDataset
ids_filepath: data/01_raw/hn_ids.json
...
# nodes.py
def identity_save_ids(ids):
# simply return ids and Kedro will save it to hn_ids_file if the node outputs are configured accordingly
return hn_ids_file
# pipeline.py
from kedro.pipeline import Pipeline, node, pipeline
from .nodes import identity_save_ids
def create_pipeline(**kwargs) -> Pipeline:
return pipeline(
[
# 1) load top ids, 2) save them to JSON dataset (hn_ids_file),
node(identity_save_ids, inputs="hn_top", outputs="hn_ids_file", name="save_ids"),
# 3) when some node needs hn_items, Kedro will call hn_items.load(),
# which reads ids_filepath and fetches articles
]
)Swift
09/22/2025, 2:28 PMfrom typing import Optional
import aiohttp
import asyncio
import pandas as pd
from <http://kedro.io|kedro.io> import AbstractDataset
from <http://kedro.io|kedro.io> import DataCatalog
# Uses api defined at <https://github.com/HackerNews/API?tab=readme-ov-file>
class HackerNewsUrlsDataset(AbstractDataset):
# dependencies is a list of other datasets defined in the catalog
def __init__(self, url: str, dependencies: list[str]):
self._url = url
self._catalog: Optional[DataCatalog] = None
self._dependencies = dependencies
self._datasets = {}
def _describe(self):
return {
"url": self._url
}
def inject_catalog(self, catalog):
self._catalog = catalog
def load(self):
if not self._catalog:
raise RuntimeError("catalog has not been injected into HackerNewsUrlsDataset")
datasets = self._load_datasets()
return self.get_items(datasets['hn_top'])
def save(self, data: pd.DataFrame):
raise NotImplementedError("HackerNewsUrls is a readonly api")
def _load_datasets(self):
datasets = {}
for dependency in self._dependencies:
datasets[dependency] = self._catalog.load(dependency)
return datasets
def get_items(self, item_ids: pd.DataFrame) -> pd.DataFrame:
# <https://hacker-news.firebaseio.com/v0/item/{id}.json> -> <https://hacker-news.firebaseio.com/v0/item/1234567.json>
urls = [self._url.format(id=item_id["item_id"]) for _, item_id in item_ids.iterrows()]
items_df = asyncio.run(self._fetch_all(urls))
return items_df
async def _fetch_all(self, urls: list[str]) -> pd.DataFrame:
async with aiohttp.ClientSession() as session:
tasks = [self._fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return pd.DataFrame(results)
async def _fetch_url(self, session: aiohttp.ClientSession, url: str) -> dict:
async with session.get(url) as response:
return await response.json()Swift
09/22/2025, 2:33 PMhn_top is not seen in the pipeline 😞
Here is the pipeline
def create_pipeline(**kwargs) -> Pipeline:
return Pipeline(
[
node(
func=current_collection,
inputs=None,
outputs="current_collection",
name="get_current_collection_node"
),
node(
func=hacker_news_get_top,
inputs=["current_collection", "hn_top_api", "params:hacker_news_api"],
outputs=["hn_top_db", "hn_top"],
name="get_hacker_news_top_items"
),
node(
func=hacker_news_urls,
inputs=["current_collection", "hn_urls_api", "hn_top"],
outputs="hn_urls_db",
name="get_hacker_news_urls_node"
)
]
)
I put the "hn_top" in the inputs=[] only to document the dependency, its not used and does not need to be there.Swift
09/22/2025, 2:34 PM@hook_impl
def after_catalog_created(self, catalog: DataCatalog) -> None:
for _, dataset in catalog.items():
if hasattr(dataset, "inject_catalog"):
dataset.inject_catalog(catalog)
but the primary issue is the implicit dependencies this creates.