https://kedro.org/ logo
#questions
Title
# questions
g

Galen Seilis

01/12/2024, 4:35 PM
How do I access the data catalog from within a pipeline? A use case for EDA is to run a function on all (or multiple) data sets. A rough sketch might look like this:
Copy code
target_datasets = [...]
for dataset in catalog.datasets:
    if dataset in target_datasets:
        nodes.append(
            node(dataset.describe(), inputs=dataset, outputs=f'{dataset}_describe')
            )
m

Merel

01/12/2024, 4:37 PM
You can use hooks for this: https://docs.kedro.org/en/stable/hooks/introduction.html, specifically
after_dataset_loaded
or
after_catalog_created
could help here.
g

Galen Seilis

01/12/2024, 4:46 PM
@Merel Thank you. I am unfamiliar with how to set up hooks but I will review the documentation.
m

Merel

01/12/2024, 4:49 PM
Hopefully the docs are clear, but feel free to ask more questions 🙂
g

Galen Seilis

01/12/2024, 4:58 PM
@Merel Thanks. I get a feeling that I should have read other parts of the documentation before understanding what is going on in the documentation for hooks. I see this:
Copy code
@hook_spec
def after_catalog_created(
    self,
    catalog: DataCatalog,
    conf_catalog: Dict[str, Any],
    conf_creds: Dict[str, Any],
    save_version: str,
    load_versions: Dict[str, str],
) -> None:
    pass
I am not sure where
@hook_spec
is supposed to be imported from, or how it is intended that I implement functionality into
after_catalog_created
. For example, if I wanted to call
pandas.DataFrame.describe
on each data set in the catalog and then include those tabular results back into the catalog I am unsure how it is intended for me to do that. Do I just monkey patch
catalog
or is there a method available for instances of
DataCatalog
? It just isn't concrete for me yet 😅
m

Merel

01/12/2024, 5:03 PM
Reading the docs is always a good idea! But maybe you can explain a bit more what you're trying to achieve? Would you want to do a describe on every dataset every time you run the pipeline or just once? And would those results be saved as new catalog entries or do you want to append them to the existing?
(FYI for more learning our youtube channel might be helpful https://www.youtube.com/@kedro-python/videos)
❤️ 1
g

Galen Seilis

01/12/2024, 5:04 PM
@Merel I am watching this video. I'll see where I get 🙂 If it doesn't make sense then I'll answer your question.
👍 1

https://www.youtube.com/watch?v=QVEgdJnUUsQ

👍 1
@Merel I got a proof of concept hook working. Thank you for your help!
@Merel I do have another question. Here is a prototype hook
Copy code
class PrintCatalog:

    @hook_impl
    def after_catalog_created(self, catalog: DataCatalog, conf_catalog: Dict[str, Any]) -> None:
        for thing in dir(catalog.datasets):
            try:
                print(getattr(catalog.datasets, thing).load().describe())
            except Exception as e:
                print(e)
        quit()
Naturally the catch-all Exception is not great, but not permanent. Just trying to take a look at what I am accessing. How would I add new entries to the data catalog which in turn would get saved while the hook is executing? Like if I wanted to write the results of
describe
to a the datasets folder in my Kedro project.
Ah, the
add
method on
DataCatalog
.
Copy code
Help on function add in module kedro.io.data_catalog:

add(self, dataset_name: 'str', dataset: 'AbstractDataset', replace: 'bool' = False) -> 'None'
    Adds a new ``AbstractDataset`` object to the ``DataCatalog``.
    
    Args:
        dataset_name: A unique data set name which has not been
            registered yet.
        dataset: A data set object to be associated with the given data
            set name.
        replace: Specifies whether to replace an existing dataset
            with the same name is allowed.
    
    Raises:
        DatasetAlreadyExistsError: When a data set with the same name
            has already been registered.
    
    Example:
    ::
    
        >>> from kedro_datasets.pandas import CSVDataset
        >>>
        >>> io = DataCatalog(datasets={
        >>>                   'cars': CSVDataset(filepath="cars.csv")
        >>>                  })
        >>>
        >>> io.add("boats", CSVDataset(filepath="boats.csv"))
👍 1