hi everyone, I am trying to add create some catalo...
# questions
d
hi everyone, I am trying to add create some catalogs dynamically within a node while running a pipeline and wanted to know if I can access the catalog object directly. I saw this but was not sure if it answered the question: https://github.com/kedro-org/kedro/discussions/861, I already tried passing the kwargs but that did not work for me. I am using the latest version of kedro
m
Hi @Dotun O, modifying the catalog on the fly isn't really something we'd recommend for reproducibility sake. The catalog is designed to be immutable. What are you trying to achieve? You might want to look at hooks to add extra behaviour to the main execution flow: https://docs.kedro.org/en/latest/hooks/index.html
d
hey @Merel thanks for the response! Yes I wont say I am modifying the catalog on the fly but rather I want to add a fixed number of catalog between a set min and max values array. So for an array with min = [0,30,60] and max=[30,60,90]. I want to create csv 0-30, 30-60, 60-90 location to save them in the future. The idea being that if we add a new threshold, lets say min 90 and max 120, the csv files can be created based on the array. I created an hooks.py to get the
self.catalog
but did not know how to reference it within the pipeline. Do you have any recommendations? Here is the node that I have created, all that is missing is the catalog. '
Copy code
def process_multi_history(df:pd.DataFrame, params:dict, catalog: DataCatalog):
    
    min_values = params["min_array"]
    max_values = params["max_array"]
    base_path = params["base_path"]
    for i, (min_val, max_val) in enumerate(zip(min_values, max_values)):
        processed_data = plot_and_save_disconnect_histogram(df, params, min_val, max_val)
        output_name = f"plot_multiple_discos_dataframe_{min_val}_{max_val}"
        file_path = os.path.join(base_path, f"{output_name}.csv")

        # Dynamically register the dataset
        try:
            catalog.add(output_name,CSVDataset(filepath=file_path))
        except DatasetError:
            # Dataset already exists, proceed
            dataset = catalog._get_dataset(output_name)
            dataset._filepath = file_path
            pass

        catalog.save(output_name, processed_data)
m
You will need to implement one of the existing hook specs: https://docs.kedro.org/en/latest/hooks/introduction.html#hook-specifications. When you have registered your custom hook class in settings.py: https://docs.kedro.org/en/latest/hooks/introduction.html#registering-the-hook-implementation-with-kedro. Kedro will then automatically detect the hook and execute it at the right moment in the Kedro execution flow depending on the spec you're overwriting.
d
hmm I see what you mean, but how would kedro know which of the files to save during runtime? My plan is to have one kedro node that saves accordingly. When I create the hook before_pipeline_run, what I would end up with is the data loaded in the catalog but without any way to access it during runtime. Thanks for your help so far!
I tried the following approach and it seems to do what I want but I want to make sure I am not breaking any kedro principles. the push here is I want the context.catalog to help with saving the files dynamically.
Copy code
def process_multi_history(df:pd.DataFrame, params:dict):
    
    package_name = "phoenix"  # Replace with your project's package name
    project_path = Path.cwd().parent
    conf_path = f"{project_path}/conf"
    config_loader = OmegaConfigLoader(conf_source=conf_path)
    env = "base"  # You can specify the environment you want to use
    hook_manager = _create_hook_manager()

    context = KedroContext(
        package_name=package_name,
        project_path=project_path,
        config_loader=config_loader,
        env=env,
        hook_manager=hook_manager,
    )

    catalog = context.catalog

    print("here is the project path")
    print(project_path)
    min_values = params["min_array"]
    max_values = params["max_array"]
    base_path = params["base_path"]
    for i, (min_val, max_val) in enumerate(zip(min_values, max_values)):
        processed_data = plot_and_save_disconnect_histogram(df, params, min_val, max_val)
        output_name = f"plot_multiple_discos_dataframe_{min_val}_{max_val}"
        file_path = os.path.join(f"{project_path}/{base_path}", f"{output_name}.csv")
        
        # Dynamically register the dataset
        try:
            catalog.add(output_name,CSVDataset(filepath=file_path))
        except DatasetError:
            # Dataset already exists, proceed
            dataset = catalog._get_dataset(output_name)
            dataset._filepath = file_path
            pass

        catalog.save(output_name, processed_data)
m
The above is conceptually right, but you're creating core components such as the config loader and context yourself. This is fine, since Kedro is also designed to be used as a library, but it diverges from the regular Kedro flow and it's your own responsibility to make sure you understand what's going on here and what side effects it might introduce. My main concern again is reproducibility and debugging in case something goes wrong. Perhaps a safer way to do the same is to have a separate pipeline to populate your catalog and then another to use the dynamically generated data?
d
Makes sense Merel! Is it possible for me to get the context without recreating myself during the pipeline run? In that case, I can just use the context without re-creating it! Thanks again for the help on this
m
We have an
after_context_created
hook. You're welcome! Use cases like this are tricky, because the solution is sometimes more in the pipeline design to make sure you don't end up with a hacky solution with weird side effects. I've also asked the rest of the team to see if anyone else has some ideas.
d
ok that makes sense! Pls I think what I am missing is how to reference a hook context in the pipeline. I tried it earlier for the catalog but it wasnt so straightforward! Appreciate the help on this
m
So you'll have to write a hook class and overwrite whichever spec you want, e.g.
Copy code
# src/<package_name>/hooks.py
import logging

from kedro.framework.hooks import hook_impl
from <http://kedro.io|kedro.io> import DataCatalog


class DataCatalogHooks:
    @property
    def _logger(self):
        return logging.getLogger(__name__)

    @hook_impl
    def after_catalog_created(self, catalog: DataCatalog) -> None:
        <http://self._logger.info|self._logger.info>(catalog.list())
Then inside your
settings.py
you register your new custom hooks class:
Copy code
# src/<package_name>/settings.py
from <package_name>.hooks import ProjectHooks, DataCatalogHooks

HOOKS = (ProjectHooks(), DataCatalogHooks())
Then when you do
kedro run
the hook should be triggered automatically.
d
awesome got it!
👍 1
hey the hooks approach did not seem to work. I could not reference the context within the node or the catalog file. Please let me know what you hear back from the team! Thanks
m
Ah yeah it seems that indeed the catalog updates aren't trickled through (https://github.com/kedro-org/kedro/issues/2728). This kind of confirms my initial thought around modifying the catalog during run execution not being a good idea 😅 You might need to design your pipeline differently. Like have a preprocess pipeline that does the data setup and then use that in another pipeline. Or perhaps in this case it's easier to just have a script that handles the CSV processing first.