Dotun O
07/08/2024, 2:20 AMMerel
07/08/2024, 10:37 AMDotun O
07/08/2024, 11:57 AMself.catalog
but did not know how to reference it within the pipeline. Do you have any recommendations? Here is the node that I have created, all that is missing is the catalog. '
def process_multi_history(df:pd.DataFrame, params:dict, catalog: DataCatalog):
min_values = params["min_array"]
max_values = params["max_array"]
base_path = params["base_path"]
for i, (min_val, max_val) in enumerate(zip(min_values, max_values)):
processed_data = plot_and_save_disconnect_histogram(df, params, min_val, max_val)
output_name = f"plot_multiple_discos_dataframe_{min_val}_{max_val}"
file_path = os.path.join(base_path, f"{output_name}.csv")
# Dynamically register the dataset
try:
catalog.add(output_name,CSVDataset(filepath=file_path))
except DatasetError:
# Dataset already exists, proceed
dataset = catalog._get_dataset(output_name)
dataset._filepath = file_path
pass
catalog.save(output_name, processed_data)
Merel
07/08/2024, 12:17 PMMerel
07/08/2024, 12:18 PMbefore_pipeline_run
https://docs.kedro.org/en/latest/api/kedro.framework.hooks.specs.PipelineSpecs.html#kedro.framework.hooks.specs.PipelineSpecs.before_pipeline_runMerel
07/08/2024, 12:19 PMbefore_node_run
if you need access to a specific node input https://docs.kedro.org/en/latest/api/kedro.framework.hooks.specs.NodeSpecs.html#kedro.framework.hooks.specs.NodeSpecs.before_node_runDotun O
07/08/2024, 12:52 PMDotun O
07/08/2024, 12:54 PMdef process_multi_history(df:pd.DataFrame, params:dict):
package_name = "phoenix" # Replace with your project's package name
project_path = Path.cwd().parent
conf_path = f"{project_path}/conf"
config_loader = OmegaConfigLoader(conf_source=conf_path)
env = "base" # You can specify the environment you want to use
hook_manager = _create_hook_manager()
context = KedroContext(
package_name=package_name,
project_path=project_path,
config_loader=config_loader,
env=env,
hook_manager=hook_manager,
)
catalog = context.catalog
print("here is the project path")
print(project_path)
min_values = params["min_array"]
max_values = params["max_array"]
base_path = params["base_path"]
for i, (min_val, max_val) in enumerate(zip(min_values, max_values)):
processed_data = plot_and_save_disconnect_histogram(df, params, min_val, max_val)
output_name = f"plot_multiple_discos_dataframe_{min_val}_{max_val}"
file_path = os.path.join(f"{project_path}/{base_path}", f"{output_name}.csv")
# Dynamically register the dataset
try:
catalog.add(output_name,CSVDataset(filepath=file_path))
except DatasetError:
# Dataset already exists, proceed
dataset = catalog._get_dataset(output_name)
dataset._filepath = file_path
pass
catalog.save(output_name, processed_data)
Merel
07/08/2024, 2:16 PMDotun O
07/08/2024, 2:21 PMMerel
07/08/2024, 2:24 PMafter_context_created
hook.
You're welcome! Use cases like this are tricky, because the solution is sometimes more in the pipeline design to make sure you don't end up with a hacky solution with weird side effects. I've also asked the rest of the team to see if anyone else has some ideas.Dotun O
07/08/2024, 2:25 PMMerel
07/08/2024, 2:27 PM# src/<package_name>/hooks.py
import logging
from kedro.framework.hooks import hook_impl
from <http://kedro.io|kedro.io> import DataCatalog
class DataCatalogHooks:
@property
def _logger(self):
return logging.getLogger(__name__)
@hook_impl
def after_catalog_created(self, catalog: DataCatalog) -> None:
<http://self._logger.info|self._logger.info>(catalog.list())
Then inside your settings.py
you register your new custom hooks class:
# src/<package_name>/settings.py
from <package_name>.hooks import ProjectHooks, DataCatalogHooks
HOOKS = (ProjectHooks(), DataCatalogHooks())
Then when you do kedro run
the hook should be triggered automatically.Merel
07/08/2024, 2:28 PMDotun O
07/08/2024, 2:30 PMDotun O
07/08/2024, 3:38 PMMerel
07/08/2024, 4:19 PM