Carlos Prieto - Tomtom
01/28/2025, 9:10 AMpython
def export_results_to_delta(summary_df, output_path="/mnt/success5/Success5_results/metric_changes"):
if DeltaTable.isDeltaTable(spark, output_path):
DeltaTable.forPath(spark, output_path).alias("target").merge(
summary_df.alias("source"),
"""target.reference_id = source.reference_id AND
target.country = source.country AND
target.provider_id = source.provider_id AND
target.matching_run_id = source.matching_run_id"""
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
else:
summary_df.write.format("delta").mode("overwrite").partitionBy(
"country", "matching_run_id", "provider_id"
).save(output_path)
Is it possible to create a catalog entry in Kedro that allows me to store the dataset in this manner? If so, could you please provide an example of how to configure the catalog entry?
Thank you in advance for your help!Hall
01/28/2025, 9:10 AMJuan Luis
01/28/2025, 9:21 AMkedro_datasets.spark.DeltaTableDataset
is read-only for historical reasons. still, have a look at the code
https://github.com/kedro-org/kedro-plugins/blob/1a5e0f/kedro-datasets/kedro_datasets/spark/deltatable_dataset.py
my advice would be to create your own, with more or less this structure (pseudocode):
class SparkDeltaDataset(AbstractDataset):
def save(self, data: spark.DataFrame) -> None:
if DeltaTable.isDeltaTable(spark := get_spark(), output_path := self._filepath):
(
DeltaTable.forPath(spark, output_path).alias(self._alias).merge(
data,
self._merge_predicate,
)
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
else:
write_cmd = (
data
.write
.format("delta")
.mode("overwrite")
)
if partition_cols := self._partition_cols:
write_cmd = write_cmd.partitionBy(*partition_cols)
write_cmd.save(output_path)