Hi everyone. I have a problem with hooks, maybe I ...
# questions
t
Hi everyone. I have a problem with hooks, maybe I do not understand them good enough. I am trying to process data before any pipeline (this project does not have any pipelines yet) The idea is that all the datasets in the catalos that follow certain convention should be processed Here is the hook:
Copy code
class DynamicDatasetHook:
    def __init__(self):
        self.projects = None

    @hook_impl
    def after_catalog_created(self, catalog: DataCatalog, **kwargs) -> None:
        """
        Register dynamic datasets without triggering infinite recursion.
        """

        self.projects = [key for key in catalog.list() if key.startswith("project_")]

        for project in self.projects:
            # Define paths for the datasets
            properties_path = f"data/03_primary/properties_{project}.csv"
            data_path = f"data/03_primary/data_{project}"

            # Register dynamic datasets without triggering additional processing
            catalog.add(
                f"properties_{project}",
                CSVDataset(filepath=properties_path),
            )

            catalog.add(
                f"data_{project}",
                PartitionedDataset(
                    path=data_path,
                    dataset=CSVDataset,
                    filename_suffix=".csv",
                ),
            )
    @hook_impl
    def after_catalog_loaded(self, catalog: DataCatalog, **kwargs) -> None:
        """
        Process datasets and save them only if necessary.
        """

        for project in self.projects:
            # Define paths for the datasets
            properties_path = f"data/03_primary/properties_{project}.csv"
            data_path = f"data/03_primary/data_{project}"

            from nanolab_processing_base.hooks_utils import separate_nanolab_dataset
            dataset = catalog.load(project)
            consolidated_props, indexed_data = separate_nanolab_dataset(dataset)

            # Save properties
            catalog.save(f"properties_{project}", consolidated_props)

            # Save partitioned dataset
            catalog.save(f"data_{project}", indexed_data)
My problem is that currently, the hook is not saving the data. The idea is for this to be available as soon as a project starts for example in a jupyter notebook. Any ideas on why it is not saving the datasets in disk? Thanks in advance! 🙂
h
Someone will reply to you shortly. In the meantime, this might help:
t
Btw the datasets do appear in the catalog, but I think they are empty, since the data I suspect is never processed
m
I am trying to process data before any pipeline
Does not seem like a good idea to me 🤔
s
Maybe you could try moving the dataset registration and processing logic into the
after_catalog_loaded
hook and set
self.projects
there.
t
I managed to fix it. Sorry for the delay
I had to put a flag since it was entering a recursion
@marrrcin I am trying to have a kedro project that does one basic operation before anything because it will be used by many people to analyze data and all the data has the same structure that needs to be processed in this way
Do you have a better solution for it? would you put it in a pipeline that is default? the problem with that is that I do not want the user to have to write in the catalog the default datasets that are supposed to be generated, they should not worry about this first processing
@Sajid Alam I did not understand your comment sorry, at the end I did this:
Copy code
class DynamicDatasetHook:
    def __init__(self):
        self.projects = None
        self.is_processing = False  # Flag to prevent recursion

    @hook_impl
    def after_catalog_created(self, catalog: DataCatalog, **kwargs) -> None:
        """
        Register dynamic datasets without triggering infinite recursion.
        """
        self.projects = [key for key in catalog.list() if key.startswith("project_")]

        for project in self.projects:
            # Define paths for the datasets
            properties_path = f"data/03_primary/properties_{project}.csv"
            data_path = f"data/03_primary/data_{project}"

            # Register dynamic datasets
            catalog.add(
                f"properties_{project}",
                CSVDataset(filepath=properties_path),
            )

            catalog.add(
                f"data_{project}",
                PartitionedDataset(
                    path=data_path,
                    dataset=CSVDataset,
                    filename_suffix=".csv",
                ),
            )

        if self.is_processing:  # Prevent recursive calls
            <http://logger.info|logger.info>("Already processing datasets. Skipping this execution.")
            return

        self.is_processing = True  # Set flag to avoid recursion

        try:
            for project in self.projects:
                # Define paths for the datasets
                properties_path = f"data/03_primary/properties_{project}.csv"
                data_path = f"data/03_primary/data_{project}"

                <http://logger.info|logger.info>(f"Processing dataset: {project}")
                dataset = catalog.load(project)  # This might trigger catalog hooks
                consolidated_props, indexed_data = separate_nanolab_dataset(dataset)

                # Save properties
                catalog.save(f"properties_{project}", consolidated_props)
                <http://logger.info|logger.info>(f"Saved properties for {project} to {properties_path}")

                # Save partitioned dataset
                catalog.save(f"data_{project}", indexed_data)
                <http://logger.info|logger.info>(f"Saved data partitions for {project} to {data_path}")
        except Exception as e:
            logger.error(f"Error processing datasets: {e}")
        finally:
            self.is_processing = False
The only problem now is that when I run a jupyter lab, the idea is that the hook makes the automatic datasets in the data folder as specified, but instead it creates it in the notebooks folder hehe any idea how to fix that?
In fact the best approach would be to have a parameter in the global parameters that says if I want to make this initial processing or not. That way it does not run for every new notebook
But I have no idea how to do that 😞