Tomás Rojas
12/04/2024, 1:06 AMclass DynamicDatasetHook:
def __init__(self):
self.projects = None
@hook_impl
def after_catalog_created(self, catalog: DataCatalog, **kwargs) -> None:
"""
Register dynamic datasets without triggering infinite recursion.
"""
self.projects = [key for key in catalog.list() if key.startswith("project_")]
for project in self.projects:
# Define paths for the datasets
properties_path = f"data/03_primary/properties_{project}.csv"
data_path = f"data/03_primary/data_{project}"
# Register dynamic datasets without triggering additional processing
catalog.add(
f"properties_{project}",
CSVDataset(filepath=properties_path),
)
catalog.add(
f"data_{project}",
PartitionedDataset(
path=data_path,
dataset=CSVDataset,
filename_suffix=".csv",
),
)
@hook_impl
def after_catalog_loaded(self, catalog: DataCatalog, **kwargs) -> None:
"""
Process datasets and save them only if necessary.
"""
for project in self.projects:
# Define paths for the datasets
properties_path = f"data/03_primary/properties_{project}.csv"
data_path = f"data/03_primary/data_{project}"
from nanolab_processing_base.hooks_utils import separate_nanolab_dataset
dataset = catalog.load(project)
consolidated_props, indexed_data = separate_nanolab_dataset(dataset)
# Save properties
catalog.save(f"properties_{project}", consolidated_props)
# Save partitioned dataset
catalog.save(f"data_{project}", indexed_data)
My problem is that currently, the hook is not saving the data. The idea is for this to be available as soon as a project starts for example in a jupyter notebook. Any ideas on why it is not saving the datasets in disk?
Thanks in advance! 🙂Hall
12/04/2024, 1:06 AMTomás Rojas
12/04/2024, 1:09 AMmarrrcin
12/04/2024, 10:51 AMI am trying to process data before any pipelineDoes not seem like a good idea to me 🤔
Sajid Alam
12/04/2024, 10:52 AMafter_catalog_loaded
hook and set self.projects
there.Tomás Rojas
12/13/2024, 7:24 PMTomás Rojas
12/13/2024, 7:25 PMTomás Rojas
12/13/2024, 7:26 PMTomás Rojas
12/13/2024, 7:27 PMTomás Rojas
12/13/2024, 7:28 PMclass DynamicDatasetHook:
def __init__(self):
self.projects = None
self.is_processing = False # Flag to prevent recursion
@hook_impl
def after_catalog_created(self, catalog: DataCatalog, **kwargs) -> None:
"""
Register dynamic datasets without triggering infinite recursion.
"""
self.projects = [key for key in catalog.list() if key.startswith("project_")]
for project in self.projects:
# Define paths for the datasets
properties_path = f"data/03_primary/properties_{project}.csv"
data_path = f"data/03_primary/data_{project}"
# Register dynamic datasets
catalog.add(
f"properties_{project}",
CSVDataset(filepath=properties_path),
)
catalog.add(
f"data_{project}",
PartitionedDataset(
path=data_path,
dataset=CSVDataset,
filename_suffix=".csv",
),
)
if self.is_processing: # Prevent recursive calls
<http://logger.info|logger.info>("Already processing datasets. Skipping this execution.")
return
self.is_processing = True # Set flag to avoid recursion
try:
for project in self.projects:
# Define paths for the datasets
properties_path = f"data/03_primary/properties_{project}.csv"
data_path = f"data/03_primary/data_{project}"
<http://logger.info|logger.info>(f"Processing dataset: {project}")
dataset = catalog.load(project) # This might trigger catalog hooks
consolidated_props, indexed_data = separate_nanolab_dataset(dataset)
# Save properties
catalog.save(f"properties_{project}", consolidated_props)
<http://logger.info|logger.info>(f"Saved properties for {project} to {properties_path}")
# Save partitioned dataset
catalog.save(f"data_{project}", indexed_data)
<http://logger.info|logger.info>(f"Saved data partitions for {project} to {data_path}")
except Exception as e:
logger.error(f"Error processing datasets: {e}")
finally:
self.is_processing = False
Tomás Rojas
12/13/2024, 7:40 PMTomás Rojas
12/13/2024, 7:41 PMTomás Rojas
12/13/2024, 7:41 PM