Thiago José Moser Poletto
02/03/2025, 6:05 PMAttributeError: 'CustomDataCatalog' object has no attribute '_data_sets'
Hall
02/03/2025, 6:05 PMdatajoely
02/03/2025, 6:06 PMsettings.py
datajoely
02/03/2025, 6:06 PMThiago José Moser Poletto
02/03/2025, 6:09 PM%load_ext kedro.ipython
%reload_kedro ../
Thats where the error happendThiago José Moser Poletto
02/03/2025, 6:16 PMself.datasets = catalog._data_sets
datajoely
02/03/2025, 6:17 PMCustomDataCatalog
class toodatajoely
02/03/2025, 6:18 PMafter_context_created
hook which is trying to access the attribute of this custom catalog class.Thiago José Moser Poletto
02/03/2025, 7:58 PMclass ProjectHooks:
def __init__(self):
self.client = None
self.gcp_project = None
self.checked_datasets = []
self.data_quality_report = DataQualityReport()
def report_export(self, catalog):
minimal_report = self.data_quality_report.create_technical_report(hide_not_tested=True)
print("\n--- DATA QUALITY RESULTS ---\n")
print(minimal_report)
print("----------------------------")
detailed_report = self.data_quality_report.create_technical_report()
catalog.save("data_quality_report", detailed_report)
id = catalog.load("params:job_name")
bq_report = self.data_quality_report.create_dataframe_report(id)
catalog.save("bq_data_quality_report", bq_report)
@property
def _logger(self):
return logging.getLogger(self.__class__.__name__)
@staticmethod
def _create_log(msg, run_params, catalog):
save_bq_logs = run_params["extra_params"].get("save_bq_logs", False)
save_bq_logs = save_bq_logs == True or save_bq_logs == "True"
if save_bq_logs:
if not isinstance(msg, list):
msg = [msg]
log = pd.DataFrame({"status": msg, "datetime": [dt.datetime.now()] * len(msg)})
catalog.save("bq_logs", log)
@staticmethod
def _call_bq_sheets_tables_update(resource_id):
now = dt.datetime.now()
now = dt.datetime(now.year, now.month, now.day, tzinfo=dt.timezone.utc)
# create run
transfer_client = bigquery_datatransfer.DataTransferServiceClient()
response = transfer_client.schedule_transfer_runs(
parent=resource_id,
start_time=now,
end_time=now
).runs[0]
print(f"Google Sheets Tables Update - RUNNING")
# wait run to be finished
while True:
run_response = transfer_client.get_transfer_run(name=response.name)
run_status = run_response.state.name
if run_status in ['PENDING', 'RUNNING']:
time.sleep(5)
elif run_status == 'SUCCEEDED':
print("Google Sheets Tables Update - DONE")
break
else:
raise NameError(f'********** Error from run_status')
@hook_impl
def before_pipeline_run(self, run_params, pipeline, catalog) -> None:
if catalog.exists("params:save_parameters") and catalog.load("params:save_parameters"):
catalog.save("all_parameters", dict(catalog.load("parameters")))
if catalog.exists("params:scheduled_query_resource_id"):
self._call_bq_sheets_tables_update(catalog.load("params:scheduled_query_resource_id"))
pipeline_name = run_params["pipeline_name"]
self._create_log(f"Starting {pipeline_name} pipeline", run_params, catalog)
@hook_impl
def after_pipeline_run(self, run_params, run_result, pipeline, catalog) -> None:
pipeline_name = run_params["pipeline_name"]
self._create_log(f"Finished {pipeline_name} pipeline", run_params, catalog)
self.report_export(catalog)
@hook_impl
def on_pipeline_error(self, error, run_params, pipeline, catalog) -> None:
pipeline_name = run_params["pipeline_name"]
self._create_log(f"Error occurred in {pipeline_name} pipeline.\n{error}", run_params, catalog)
self.report_export(catalog)
@hook_impl
def before_node_run(self, node, catalog, inputs, is_async, session_id):
for k, v in inputs.items():
if k == "params:snapshot_date" and isinstance(v, str):
inputs[k] = pd.to_datetime(v).date()
if isinstance(v, dict):
inputs[k] = DefaultMunch.fromDict(v)
self._save_bq_backup_tables(catalog, inputs, exclude=self.checked_datasets)
apply_data_quality_tests(inputs.copy(), exclude=self.checked_datasets, report=self.data_quality_report)
self.checked_datasets.extend(list(inputs.keys()))
return inputs
@hook_impl
def after_node_run(self, node, catalog, outputs, is_async, session_id):
self._save_bq_backup_tables(catalog, outputs, exclude=self.checked_datasets)
apply_data_quality_tests(outputs.copy(), exclude=self.checked_datasets, report=self.data_quality_report)
self.checked_datasets.extend(list(outputs.keys()))
return None
@staticmethod
def _save_bq_backup_tables(catalog, inputs, exclude):
if not (catalog.exists("params:save_bq_tables") and catalog.load("params:save_bq_tables")):
return
inputs = {name: inputs[name] for name in inputs if name not in exclude}
for name in inputs:
if is_dataset_from_bq(name, catalog):
catalog.save(f"bq_backup_{name}", inputs[name])
but none of them do call that attribute...Thiago José Moser Poletto
02/03/2025, 8:04 PMElena Khaustova
02/03/2025, 11:06 PMcatalog._datasets
and catalog._data_sets
for Kedro 0.18.x
https://github.com/kedro-org/kedro-viz/blob/65f2c5ac6ee82a5c707d87ed4c277132418c5a2d/package/kedro_viz/integrations/kedro/hooks.py#L30Thiago José Moser Poletto
02/04/2025, 12:44 PMThiago José Moser Poletto
02/04/2025, 8:21 PM