Hello, I want to create a custom incremental datas...
# questions
a
Hello, I want to create a custom incremental dataset. So i made my custom class inherit from Incremental dataset. But when i want to call super.init() with kwargs, path var is not set any more (it is in the catalog) and filepath is set instead. I suspect that kedro core comportment is impacting args when loading datasets from catalog. Someone know how to fix it ?
n
Can you show your implementation and an example how to use it in code and the error message?
Kedro don't do anything special about datasets (as you can see all datasets has different args so it's impossible to know ahead) with one exception. https://github.com/kedro-org/kedro/issues/2942
👍 1
d
Hi Nok, thanks for your reply. Here snippets of our code: catalog.yaml "Dataset_name": type: folder.extras.datasets.customWds.CustomWds path: gs://bucket/folder/ CustomWds.py class Fakedataset(AbstractDataset): def _load(self): pass def _save(self, data): pass def _describe(self): pass class CustomWds(IncrementalDataset): def __init__( self, *, dataset = Fakedataset, **kwargs: Any, ): # for k,v in kwargs.items(): # print(k, v) # kwargs["dataset"] = dataset # kwargs["path"] = kwargs["filepath"] # del kwargs["filepath"] super().__init__(**kwargs) When loading kedro context got this error:
Copy code
TypeError: IncrementalDataset.__init__() got an unexpected keyword argument 'filepath'
in customWds the commented part is a working solution but we don't get why
n
I see. There are two types of datasets in Kedro. One is a wrapper dataset (IncrementalDataset, ParittionedDataset), the other is implementation of specific one (CSVdataset etc).
Copy code
from kedro.io import IncrementalDataSet

# these credentials will be passed to:
# a) 'fsspec.filesystem()' call,
# b) the dataset initializer,
# c) the checkpoint initializer
credentials = {"key1": "secret1", "key2": "secret2"}

data_set = IncrementalDataSet(
    path="<s3://bucket-name/path/to/folder>",
    dataset="pandas.CSVDataSet",
    credentials=credentials
)
loaded = data_set.load()  # loads all available partitions
# assert isinstance(loaded, dict)

data_set.confirm()  # update checkpoint value to the last processed partition ID
reloaded = data_set.load()  # still loads all available partitions
If you look at the implementation, it takes
path
as an argument but not
filepath
d
Thanks, Nok, I see. The custom dataset I'm working on should receive a folder (path) instead of a file (filepath) from catalog. This is why I extended Incrementaldataset. But I don't want to associate an underlying dataset to every file returned by list_partitions method. Ideally the custom dataset should access parent methods ( list partitions and all the checkpoint logic) to implement a custom split of data. This is needed in order to deal with sharding and shuffling.