Pedro Sousa Silva
06/25/2024, 2:49 PMoutput_df:
type: mypackage.datasets.databricks.ManagedTableDataset
catalog: ${globals:catalog}
write_mode: overwrite
database: database_name
table: table_name
The pipeline is something like:
def create_pipeline(**kwargs) -> Pipeline:
return pipeline(
[
node(
func=node1,
inputs={'df': 'input_df'},
outputs=['output_df'],
),
]
)
however i want it to run and overwrite the existing table based on a condition, e.g.
def node1(df: SparkDataFrame) -> SparkDataFrame:
df_out = some_specific_transformations(df)
if is_today_saturday():
return df_out
else:
return None
So my goal would be to bypass the overwriting if the condition on node1 is met. Since my mypackage.datasets.databricks.ManagedTableDataset
is a custom dataset based on databricks's ManagedTableDataset
I was initially thinking about re-writing the load functions to bypass the writing altogether if it receives a None, but kedro itself blocks it with DatasetError: Saving 'None' to a 'Dataset' is not allowed
So my question is: is there any best practice or insight to achieve bypassing of an overwriting based on a condition?Elena Khaustova
06/25/2024, 4:48 PMNone
if the condition is not met.Pedro Sousa Silva
06/25/2024, 4:53 PMPedro Sousa Silva
06/25/2024, 4:55 PMManagedTableDataset
with an additional param overwrite_if_empty
. If it helps someone with the same issue, here's how the new _load_overwrite function looks like now:
def _save_overwrite(self, data: DataFrame, overwrite_if_empty: bool) -> None:
"""Overwrites the data in the table with the data provided.
(this is the default save mode)
Args:
data (DataFrame): the Spark dataframe to overwrite the table with.
"""
if data.count() != 0 or overwrite_if_empty:
delta_table = data.write.format('delta')
if self._table.write_mode == 'overwrite':
delta_table = delta_table.mode('overwrite').option(
'overwriteSchema', 'true'
)
delta_table.saveAsTable(self._table.full_table_location())
Pedro Sousa Silva
06/25/2024, 4:56 PMspark.createDataFrame([], StructType())
as the ouput of my node