Hey team, I have a kedro pipeline with a node that...
# questions
p
Hey team, I have a kedro pipeline with a node that outputs a dataframe, overwriting an existing delta table (in Databricks), defined as:
Copy code
output_df:
  type: mypackage.datasets.databricks.ManagedTableDataset
  catalog: ${globals:catalog}
  write_mode: overwrite
  database: database_name
  table: table_name
The pipeline is something like:
Copy code
def create_pipeline(**kwargs) -> Pipeline:
    return pipeline(
        [
            node(
                func=node1,
                inputs={'df': 'input_df'},
                outputs=['output_df'],
            ),
        ]
    )
however i want it to run and overwrite the existing table based on a condition, e.g.
Copy code
def node1(df: SparkDataFrame) -> SparkDataFrame:

    df_out = some_specific_transformations(df)

    if is_today_saturday():
         return df_out
    else:
         return None
So my goal would be to bypass the overwriting if the condition on node1 is met. Since my
mypackage.datasets.databricks.ManagedTableDataset
is a custom dataset based on databricks's
ManagedTableDataset
I was initially thinking about re-writing the load functions to bypass the writing altogether if it receives a None, but kedro itself blocks it with
DatasetError: Saving 'None' to a 'Dataset' is not allowed
So my question is: is there any best practice or insight to achieve bypassing of an overwriting based on a condition?
e
Hi Pedro, Do you need to bypass the overwriting because it’s an expensive operation, or do you just want to keep the original table? If it’s the last, then you can just input the original table and return it instead of
None
if the condition is not met.
p
Hey Elena. The latter - that was what I had in mind first, but the original table is not in memory so I would have to do an unnecessary spark.table('table_name') operation just to overwrite with the existing data
In any case, I was able to tweak
ManagedTableDataset
with an additional param
overwrite_if_empty
. If it helps someone with the same issue, here's how the new _load_overwrite function looks like now:
Copy code
def _save_overwrite(self, data: DataFrame, overwrite_if_empty: bool) -> None:
        """Overwrites the data in the table with the data provided.
        (this is the default save mode)

        Args:
            data (DataFrame): the Spark dataframe to overwrite the table with.
        """
        if data.count() != 0 or overwrite_if_empty:
            delta_table = data.write.format('delta')
            if self._table.write_mode == 'overwrite':
                delta_table = delta_table.mode('overwrite').option(
                    'overwriteSchema', 'true'
                )
            delta_table.saveAsTable(self._table.full_table_location())
so basically if i don't want to overwrite i just pass in an empty spark dataframe
spark.createDataFrame([], StructType())
as the ouput of my node
👍 1