Júlio Resende
03/01/2024, 12:11 PMIñigo Hidalgo
03/01/2024, 1:04 PMdeltalake.write_deltalake
which does not provide upsert functionality out of the box. You would have to write your own upsert functionality and add it to a custom dataset implementation.
The upsert functionality would look something like this: https://github.com/inigohidalgo/prefect-polygon-etl/blob/main/delta-rs-etl/src/delta_rs_etl/upsert.pyIñigo Hidalgo
03/01/2024, 1:04 PMIñigo Hidalgo
03/01/2024, 1:28 PMJuan Luis
03/01/2024, 1:56 PMpandas.DeltaTableDataset
and have a custom _save
method that does what you wantJúlio Resende
03/01/2024, 3:54 PMJúlio Resende
03/06/2024, 5:41 PMfrom deltalake.writer import write_deltalake
from deltalake import DeltaTable
from kedro_datasets.pandas import DeltaTableDataset
from kedro.io.core import DatasetError
from typing import List, Union, Any
import pandas as pd
import pyarrow as pa
def upsert(new_data: pa.Table, target_table: DeltaTable, primary_key: Union[str, List[str]]) -> dict:
predicate = (
f"target.{primary_key} = source.{primary_key}"
if type(primary_key) == str
else " AND ".join([f"target.{col} = source.{col}" for col in primary_key])
)
return (
target_table
.merge(
source=new_data,
predicate=predicate,
source_alias="source",
target_alias="target"
)
.when_matched_update_all()
.when_not_matched_insert_all()
.execute()
)
class CustomDeltaTableDataset(DeltaTableDataset):
"""
This is a variation of pandas.DeltaTableDataset with support to upsert write mode
"""
def __init__(self, primary_key: Union[str, List[str], None] = None, **kargs) -> None:
self.primary_key = primary_key
if kargs.get('save_args', {}).get('mode', '') == 'upsert':
self.upsert_mode = True
kargs['save_args']['mode'] = 'overwrite'
if not self.primary_key:
raise DatasetError(
"To use upsert write mode, you need to set the primare_key argument!"
)
else:
self.upsert_mode = False
super().__init__(**kargs)
def _save(self, data: pd.DataFrame) -> None:
data = pa.Table.from_pandas(data, preserve_index=False)
if self.is_empty_dir:
# first time creation of delta table
write_deltalake(
self._filepath,
data,
storage_options=self.fs_args,
**self._save_args,
)
self.is_empty_dir = False
self._delta_table = DeltaTable(
table_uri=self._filepath,
storage_options=self.fs_args,
version=self._version,
)
elif self.upsert_mode:
upsert(
new_data=data,
target_table=self._delta_table,
primary_key=self.primary_key
)
else:
write_deltalake(
self._delta_table,
data,
storage_options=self.fs_args,
**self._save_args,
)
def _describe(self) -> dict[str, Any]:
desc = super()._describe()
desc['primary_key'] = self.primary_key
if self.upsert_mode:
desc['save_args']['mode'] = 'upsert'
return desc
Juan Luis
03/06/2024, 5:43 PMJúlio Resende
03/06/2024, 5:47 PMIñigo Hidalgo
03/07/2024, 9:22 AM==
, it was a small PoC, but I'm glad it works for you 🙂 I might update it some time in the next month so I'll ping you if I do so you can incorporate that into your ds if you want/need