Hello Kedro Community, I am working on a project w...
# questions
c
Hello Kedro Community, I am working on a project where I need to store a Spark DataFrame in Delta format using Kedro. Specifically, I want to ensure that the data is stored in a specific way, as shown in the following function:
Copy code
python
Copy code
def export_results_to_delta(summary_df, output_path="/mnt/success5/Success5_results/metric_changes"):
    if DeltaTable.isDeltaTable(spark, output_path):
        DeltaTable.forPath(spark, output_path).alias("target").merge(
            summary_df.alias("source"),
            """target.reference_id = source.reference_id AND 
               target.country = source.country AND 
               target.provider_id = source.provider_id AND 
               target.matching_run_id = source.matching_run_id"""
        ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
    else:
        summary_df.write.format("delta").mode("overwrite").partitionBy(
            "country", "matching_run_id", "provider_id"
        ).save(output_path)
Is it possible to create a catalog entry in Kedro that allows me to store the dataset in this manner? If so, could you please provide an example of how to configure the catalog entry? Thank you in advance for your help!
h
Someone will reply to you shortly. In the meantime, this might help:
j
hi @Carlos Prieto - Tomtom! I notice that our
kedro_datasets.spark.DeltaTableDataset
is read-only for historical reasons. still, have a look at the code https://github.com/kedro-org/kedro-plugins/blob/1a5e0f/kedro-datasets/kedro_datasets/spark/deltatable_dataset.py my advice would be to create your own, with more or less this structure (pseudocode):
Copy code
class SparkDeltaDataset(AbstractDataset):
  def save(self, data: spark.DataFrame) -> None:
    if DeltaTable.isDeltaTable(spark := get_spark(), output_path := self._filepath):
      (
        DeltaTable.forPath(spark, output_path).alias(self._alias).merge(
          data,
          self._merge_predicate,
        )
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
      )
    else:
      write_cmd = (
        data
        .write
        .format("delta")
        .mode("overwrite")
      )
      if partition_cols := self._partition_cols:
        write_cmd = write_cmd.partitionBy(*partition_cols)
      write_cmd.save(output_path)