https://kedro.org/ logo
#questions
Title
# questions
j

Júlio Resende

03/01/2024, 12:11 PM
Hello everyone! I would like to use the "upsert" save mode in pandas.DeltaTableDataset (like in databricks.ManagedTableDataset). Does anyone know any workaround to use it with pandas.DeltaTableDataset?
i

Iñigo Hidalgo

03/01/2024, 1:04 PM
As far as I know, the DeltaTableDataset is a wrapper around
deltalake.write_deltalake
which does not provide upsert functionality out of the box. You would have to write your own upsert functionality and add it to a custom dataset implementation. The upsert functionality would look something like this: https://github.com/inigohidalgo/prefect-polygon-etl/blob/main/delta-rs-etl/src/delta_rs_etl/upsert.py
💯 2
I do wonder whether that is functionality which could be considered within the scope of the existing dataset @Juan Luis, it isn't a huge development and would probably be easier to handle at the source than a custom dataset
https://github.com/kedro-org/kedro-plugins/issues/542 seems there's ongoing discussion about this
j

Juan Luis

03/01/2024, 1:56 PM
yeah we haven't made any progress since #542 that @Iñigo Hidalgo shared, unfortunately the only workaround at the moment is to create your own custom dataset @Júlio Resende. you can inherit from
pandas.DeltaTableDataset
and have a custom
_save
method that does what you want
j

Júlio Resende

03/01/2024, 3:54 PM
Thanks guys! I will test the code sent by @Iñigo Hidalgo.
Hi! I'm coming back to this thread just to register my custom dataset, in case it's useful to anyone:
Copy code
from deltalake.writer import write_deltalake
from deltalake import DeltaTable
from kedro_datasets.pandas import DeltaTableDataset
from kedro.io.core import DatasetError
from typing import List, Union, Any
import pandas as pd
import pyarrow as pa

def upsert(new_data: pa.Table, target_table: DeltaTable, primary_key: Union[str, List[str]]) -> dict:    
    predicate = (
        f"target.{primary_key} = source.{primary_key}"
        if type(primary_key) == str
        else " AND ".join([f"target.{col} = source.{col}" for col in primary_key])
    )
    
    return (
        target_table
        .merge(
            source=new_data,
            predicate=predicate,
            source_alias="source",
            target_alias="target"
        )
        .when_matched_update_all()
        .when_not_matched_insert_all()
        .execute()
    )


class CustomDeltaTableDataset(DeltaTableDataset):
    """
        This is a variation of pandas.DeltaTableDataset with support to upsert write mode
    """

    def __init__(self, primary_key: Union[str, List[str], None] = None, **kargs) -> None:
        self.primary_key = primary_key

        if kargs.get('save_args', {}).get('mode', '') == 'upsert':
            self.upsert_mode = True
            kargs['save_args']['mode'] = 'overwrite'
            if not self.primary_key:
                raise DatasetError(
                    "To use upsert write mode, you need to set the primare_key argument!"
                )
        else:
            self.upsert_mode = False
        
        super().__init__(**kargs)


    def _save(self, data: pd.DataFrame) -> None:
        data = pa.Table.from_pandas(data, preserve_index=False)

        if self.is_empty_dir:
            # first time creation of delta table
            write_deltalake(
                self._filepath,
                data,
                storage_options=self.fs_args,
                **self._save_args,
            )
            self.is_empty_dir = False
            self._delta_table = DeltaTable(
                table_uri=self._filepath,
                storage_options=self.fs_args,
                version=self._version,
            )
        elif self.upsert_mode:
            upsert(
                new_data=data,
                target_table=self._delta_table,
                primary_key=self.primary_key
            )
        else:            
            write_deltalake(
                self._delta_table,
                data,
                storage_options=self.fs_args,
                **self._save_args,
            )


    def _describe(self) -> dict[str, Any]:
        desc = super()._describe()
        desc['primary_key'] = self.primary_key

        if self.upsert_mode:
            desc['save_args']['mode'] = 'upsert'

        return desc
1
🎉 1
j

Juan Luis

03/06/2024, 5:43 PM
thanks a lot @Júlio Resende!! do you mind sharing it as is in https://github.com/kedro-org/kedro-plugins/issues/542 ?
j

Júlio Resende

03/06/2024, 5:47 PM
No, I don't mind. I'll share it there. Thanks @Juan Luis and @Iñigo Hidalgo
🥳 1
i

Iñigo Hidalgo

03/07/2024, 9:22 AM
Nice one @Júlio Resende!! The upsert can definitely be extended further to include other predicates than just
==
, it was a small PoC, but I'm glad it works for you 🙂 I might update it some time in the next month so I'll ping you if I do so you can incorporate that into your ds if you want/need
🥳 1
3 Views