hi kedronaughts, I'm trying to get my first pipeli...
# questions
e
hi kedronaughts, I'm trying to get my first pipeline working and I'm confused on a few pieces I'm hoping you can correct my thinking on. I have one custom DataSet that connects to an RSS feed. I have another custom DatSet that stores the processed feed items and saves them to a mongo db. I'm confused around how to setup the catalog entries and node functions in regards to how the catalog values get passed into the DataSets. how do I create a catalog entry that combines with values from credentials.yml? so 'mongo_url' contains my username and password which I stored in credentials.yml catalog entries:
Copy code
rss_feed_extract:
  type: kedro_workbench.extras.datasets.RSSDataSet.RSSFeedExtract
  url: <https://api.msrc.microsoft.com/update-guide/rss>

rss_feed_load:
  type: kedro_workbench.extras.datasets.RSSDataSet.RSSFeedLoad
  mongo_url: "mongodb+srv://<username>:<password>@bighatcluster.wamzrdr.mongodb.net/"
  mongo_db: "TBD"
  mongo_collection: "TBD"
  mongo_table: "TBD"
  credentials: mongo_atlas
nodes.py
Copy code
def extract_rss_feed() -> Dict[str, Any]:
    raw_rss_feed = RSSFeedExtract() # Q. how does the catalog 'url' value get passed to the __init__ method?
    raw_rss_feed.load()
    
    return {'key_1':'value_1', 'key_2': 'value_2'}
    
    
def transform_rss_feed(raw_rss_feed: Dict[str, Any]) -> List[Dict[str, Any]]:
    
    return [{'key_1_T':'value_1_T', 'key_2_T': 'value_2_T'}]
    
    
def load_rss_feed(prepped_rss_items: List[Dict[str, Any]]) -> None:
    rss_feed_load = RSSFeedLoad(prepped_rss_items) # not clear how to create the custom dataset that takes data from catalog and credentials and the previous node
    rss_feed_load.save()
pipeline.py
Copy code
pipeline([
        node(
            func=extract_rss_feed,
                inputs=None,
                outputs='rss_feed_for_transforming',
                name="extract_rss_feed",
        ),
        node(
            func=transform_rss_feed,
                inputs="rss_feed_for_transforming",
                outputs='rss_for_loading',
                name="transform_rss_items",
        ),
        node(
            func=load_rss_feed,
                inputs="rss_for_loading",
                outputs="rss_feed_load",
                name="load_rss_items",
        ),

    ])
custom datasets
Copy code
class RSSFeedExtract(AbstractDataSet):
    def __init__(self, url: str):
        self._url = URL

class RSSFeedLoad(AbstractDataSet):
    def __init__(self, mongo_url: str, mongo_db: str, mongo_collection: str, mongo_table: str, credentials: Dict[str, Any], data: Any = None):
        self._data = data # comes from the previous node
        self._mongo_url = mongo_url
        self._mongo_db = mongo_db
        self._mongo_collection = mongo_collection
        self._mongo_table = mongo_table
        self._username = credentials['username']
        self._password = credentials['password']
d
All of the arguments (except
type
) should be parameters to the
__init__
method of your dataset. E.g.
Copy code
class RSSFeedExtract(url, load_args=None, save_args=None):
    ...
Just to confirm, is
RSSDataSet
a module name (bit weird convention; probably should call it
rss_dataset
if so), and
RSSFeedExtract
and
RSSFeedLoad
are two dataset implementations?
e
thanks Deepyaman, how do catalog parameters get passed to the DataSet in the node function? I understand that I should put the catalog parameter in the
__init__
method, but I'm not clear how the catalog parameters get passed in. I created a basic API Dataset that specified a url parameter and I didn't need to pass the URL into the node function. in my second DataSet, I'm not sure how all the data gets combined. _data is a list of dictionaries being returned by the previous node, all _mongo values are stored in the catalog and credentials are stored in credentials.
the file "RSSDataSet.py" is stored at /extras/datasets/ and contains the two custom DataSets.
d
I created a basic API Dataset that specified a url parameter and I didn't need to pass the URL into the node function.
I think this may be the confusion. The
url
parameter won't get passed to the node; it's just available to the dataset. The result of
RSSFeedExtract._load
would be passed to the node.
Have you looked at https://docs.kedro.org/en/stable/extend_kedro/custom_datasets.html? It goes through implementation of a custom dataset, including defining
_load
,
_save
, etc.
e
yes, I did read that
d
Does your dataset have a
_load
method?
e
yes, I just didn't paste them here, because I'm just trying to understand how to set up the
__init__
method first
questions: 1. how do I set up the node function/pipeline configure for a dataset that is readonly, ie, fetching an RSS feed 2. how do I set up a custom dataset that combines data from the catalog, credentials and previous node 3. does including a dataset name in the pipeline definition automatically execute the _load or _save methods of the class? or do I need to do that in the node function?
d
1. how do I set up the node function/pipeline configure for a dataset that is readonly, ie, fetching an RSS feed
You mark
_save
method to throw an error. See something like
APIDataSet
.
1. how do I set up a custom dataset that combines data from the catalog, credentials and previous node
Can you explain more what you're trying to do?
1. does including a dataset name in the pipeline definition automatically execute the _load or _save methods of the class? or do I need to do that in the node function?
I don't fully understand this question, and what "including a dataset name in the pipeline definition" means. But, basically,
_load
is called when input to a node and
_save
when output from a node.
e
thanks for following up! 1. I have this function for my 'extract' node.
Copy code
from kedro_workbench.extras.datasets.RSSDataSet import RSSFeedExtract, RSSFeedLoad
def extract_rss_feed() -> Dict[str, Any]:
    raw_rss_feed = RSSFeedExtract() 
    raw_rss_feed.load()
    
    return {'key_1':'value_1', 'key_2': 'value_2'}
the pipeline def:
Copy code
node(
            func=extract_rss_feed,
                inputs=None,
                outputs='rss_feed_for_transforming',
                name="extract_rss_feed",
        ),
and when I run the pipeline, I get the error: TypeError: RSSFeedExtract.__init__() missing 1 required positional argument: 'url' So, the way I'm doing this is incorrect, but I'm unclear why. If the catalog parameter 'url' is made available to the DataSet, why is it asking for the url value?
q2. my second custom dataset is defined as: rss_feed_load: type: kedro_workbench.extras.datasets.RSSDataSet.RSSFeedLoad mongo_url: "mongodb+srv//&lt;username&gt;<password>@bighatcluster.wamzrdr.mongodb.net/" mongo_db: "TBD" mongo_collection: "TBD" mongo_table: "TBD" credentials: mongo_atlas what is the correct kedro way to build the 'mongo_url' property? <username> and <password> are both contained in credentials.yml. do I modify catalog.yml to insert the credentials, or do I build the 'mongo_url' string in the class
__init__
method?
Short answer, your
init
looks right except
data
shouldn’t be there.
data
is only getting pass either
load
or
save.
When you create an instance of dataset, it doesn’t load up anything, it’s almost like a dataclass that contain all the necessary information to load a dataset when a
node
request it. If you haven’t, I will suggest read https://docs.kedro.org/en/stable/data/kedro_io.html to understand how Kedro I/O work
e
Thanks Nok, I did look at those files, I'm asking questions here obviously because I'm not understanding aspects. I'm not sure why you posted a link to my question? I asked a question a second time because it wasn't answered. not all of us are lifetime programmers, we're just getting started. if my questions are too basic for you, you can ignore them. cheers,
n
Sorry I linked the SO only for cross referncing, since we do monitor both #questions and SO, it’s purely for us to support better and make sure we don’t end up answering it twice.😅
your current
extract_rss_feed
is calling
load
directly, which is not the way how Kedro work. Let’s take a step back to a simpler example to understand how datasets/nodes work together. Let say you need to read a csv, and do some processing and return it. You will have a function
Copy code
def process_my_data(df):
   ...
   return processed_df
In a Python script without Kedro, you will do this
Copy code
df = pd.read_csv(some_path, some_extra_argument)
processed_df = process_my_data(df)
In Kedro world, Data Catalog take cares of I/O, and you don’t need to call
load
or
save
directly. You will define a node, which is a very thin wrapper on top of your function.
Copy code
node(process_my_df, inputs="your_dataset", outputs="processed_df")
How do node knows where is the data and how to read it? • It will refer to the
type
of the dataset and call its
load
method to load the data (with the extra argument you provided if any)
e
@Nok Lam Chan thanks for taking the time to answer this more clearly. I'm starting to understand. sorry I'm a slow learner.
👍🏼 1
n
Take your time, keep asking!