Emilio Gagliardi
07/24/2023, 5:40 PMrss_feed_extract:
type: kedro_workbench.extras.datasets.RSSDataSet.RSSFeedExtract
url: <https://api.msrc.microsoft.com/update-guide/rss>
rss_feed_load:
type: kedro_workbench.extras.datasets.RSSDataSet.RSSFeedLoad
mongo_url: "mongodb+srv://<username>:<password>@bighatcluster.wamzrdr.mongodb.net/"
mongo_db: "TBD"
mongo_collection: "TBD"
mongo_table: "TBD"
credentials: mongo_atlas
nodes.py
def extract_rss_feed() -> Dict[str, Any]:
raw_rss_feed = RSSFeedExtract() # Q. how does the catalog 'url' value get passed to the __init__ method?
raw_rss_feed.load()
return {'key_1':'value_1', 'key_2': 'value_2'}
def transform_rss_feed(raw_rss_feed: Dict[str, Any]) -> List[Dict[str, Any]]:
return [{'key_1_T':'value_1_T', 'key_2_T': 'value_2_T'}]
def load_rss_feed(prepped_rss_items: List[Dict[str, Any]]) -> None:
rss_feed_load = RSSFeedLoad(prepped_rss_items) # not clear how to create the custom dataset that takes data from catalog and credentials and the previous node
rss_feed_load.save()
pipeline.py
pipeline([
node(
func=extract_rss_feed,
inputs=None,
outputs='rss_feed_for_transforming',
name="extract_rss_feed",
),
node(
func=transform_rss_feed,
inputs="rss_feed_for_transforming",
outputs='rss_for_loading',
name="transform_rss_items",
),
node(
func=load_rss_feed,
inputs="rss_for_loading",
outputs="rss_feed_load",
name="load_rss_items",
),
])
custom datasets
class RSSFeedExtract(AbstractDataSet):
def __init__(self, url: str):
self._url = URL
class RSSFeedLoad(AbstractDataSet):
def __init__(self, mongo_url: str, mongo_db: str, mongo_collection: str, mongo_table: str, credentials: Dict[str, Any], data: Any = None):
self._data = data # comes from the previous node
self._mongo_url = mongo_url
self._mongo_db = mongo_db
self._mongo_collection = mongo_collection
self._mongo_table = mongo_table
self._username = credentials['username']
self._password = credentials['password']
Deepyaman Datta
07/24/2023, 5:43 PMtype
) should be parameters to the __init__
method of your dataset. E.g.
class RSSFeedExtract(url, load_args=None, save_args=None):
...
RSSDataSet
a module name (bit weird convention; probably should call it rss_dataset
if so), and RSSFeedExtract
and RSSFeedLoad
are two dataset implementations?Emilio Gagliardi
07/24/2023, 5:53 PM__init__
method, but I'm not clear how the catalog parameters get passed in. I created a basic API Dataset that specified a url parameter and I didn't need to pass the URL into the node function.
in my second DataSet, I'm not sure how all the data gets combined. _data is a list of dictionaries being returned by the previous node, all _mongo values are stored in the catalog and credentials are stored in credentials.Deepyaman Datta
07/24/2023, 5:57 PMI created a basic API Dataset that specified a url parameter and I didn't need to pass the URL into the node function.I think this may be the confusion. The
url
parameter won't get passed to the node; it's just available to the dataset. The result of RSSFeedExtract._load
would be passed to the node._load
, _save
, etc.Emilio Gagliardi
07/24/2023, 5:58 PMDeepyaman Datta
07/24/2023, 5:59 PM_load
method?Emilio Gagliardi
07/24/2023, 6:02 PM__init__
method firstDeepyaman Datta
07/24/2023, 7:27 PM1. how do I set up the node function/pipeline configure for a dataset that is readonly, ie, fetching an RSS feedYou mark
_save
method to throw an error. See something like APIDataSet
.1. how do I set up a custom dataset that combines data from the catalog, credentials and previous nodeCan you explain more what you're trying to do?
1. does including a dataset name in the pipeline definition automatically execute the _load or _save methods of the class? or do I need to do that in the node function?I don't fully understand this question, and what "including a dataset name in the pipeline definition" means. But, basically,
_load
is called when input to a node and _save
when output from a node.Emilio Gagliardi
07/24/2023, 7:43 PMfrom kedro_workbench.extras.datasets.RSSDataSet import RSSFeedExtract, RSSFeedLoad
def extract_rss_feed() -> Dict[str, Any]:
raw_rss_feed = RSSFeedExtract()
raw_rss_feed.load()
return {'key_1':'value_1', 'key_2': 'value_2'}
the pipeline def:
node(
func=extract_rss_feed,
inputs=None,
outputs='rss_feed_for_transforming',
name="extract_rss_feed",
),
and when I run the pipeline, I get the error:
TypeError: RSSFeedExtract.__init__() missing 1 required positional argument: 'url'
So, the way I'm doing this is incorrect, but I'm unclear why. If the catalog parameter 'url' is made available to the DataSet, why is it asking for the url value?__init__
method?Nok Lam Chan
07/26/2023, 9:19 AMinit
looks right except data
shouldn’t be there.
data
is only getting pass either load
or save.
When you create an instance of dataset, it doesn’t load up anything, it’s almost like a dataclass that contain all the necessary information to load a dataset when a node
request it.
If you haven’t, I will suggest read https://docs.kedro.org/en/stable/data/kedro_io.html to understand how Kedro I/O workEmilio Gagliardi
07/26/2023, 8:40 PMNok Lam Chan
07/26/2023, 8:48 PMextract_rss_feed
is calling load
directly, which is not the way how Kedro work.
Let’s take a step back to a simpler example to understand how datasets/nodes work together. Let say you need to read a csv, and do some processing and return it.
You will have a function
def process_my_data(df):
...
return processed_df
In a Python script without Kedro, you will do this
df = pd.read_csv(some_path, some_extra_argument)
processed_df = process_my_data(df)
In Kedro world, Data Catalog take cares of I/O, and you don’t need to call load
or save
directly. You will define a node, which is a very thin wrapper on top of your function.
node(process_my_df, inputs="your_dataset", outputs="processed_df")
How do node knows where is the data and how to read it?
• It will refer to the type
of the dataset and call its load
method to load the data (with the extra argument you provided if any)Emilio Gagliardi
07/28/2023, 4:36 AMNok Lam Chan
07/28/2023, 1:00 PM