TLDR; is there a suggested pattern for converting ...
# questions
m
TLDR; is there a suggested pattern for converting vanilla parquets to delta tables in a pipeline? Full context; First, sorry if I'm missing something obvious. I'm new to Kedro as well as Delta lake so I'm fumbling my way through this a bit 😬 I am working on a POC w/ Kedro for work. We source data from a number of different Azure lakes throughout our company primarily saved in vanilla parquets and read with pyspark. I want to convert those inputs into delta tables in the raw layer of my pipeline. I am trying to test w/ spark.DeltaTableDataset locally, but plan to use databricks.ManagedTableDataset in prod. I know from comments that some conversions should happen in the node, but I can't figure out a straight forward way to do that without specifying my output path in the node which would obviously not work with data catalog. For example, spark DataFrame to delta or pandas to delta. Based on this YAML example I thought I could just create a delta version in the data catalog that read from the same parquet. I think this does not work because the parquet was not saved as a delta table. Just to test, I created the catalog entries below and ran `kedro ipython`:
Copy code
raw_trips@spark:
  type: spark.SparkDataset
  filepath: data/01_raw/yellow_tripdata_2023_01.parquet
  file_format: "delta"

raw_trips@delta:
  type: spark.DeltaTableDataset
  filepath: data/01_raw/yellow_tripdata_2023_01.parquet
When I run
catalog.load("raw_trips@spark")
I get the following error:
Copy code
DatasetError: Failed while loading data from data set SparkDataset(file_format=delta, 
filepath=~/data/01_raw/yellow_tripdata_2023_01.parquet, load_args={}, save_args={}).
`~/data/01_raw/yellow_tripdata_2023_01.parquet` is not a Delta table.
When I run
catalog.load("raw_trips@delta")
I get the following error:
Copy code
DatasetError: Failed while loading data from data set 
DeltaTableDataset(filepath=~/01_raw/yellow_tripdata_2023_01.parquet, fs_prefix=).
`~/data/01_raw/yellow_tripdata_2023_01.parquet` is not a Delta table.
Am I missing something? Can I create delta tables within my pipeline?
j
hi @Mark Druffel! when using Delta, you're not targeting the individual Parquet files - that's an implementation detail. the Delta format takes care of arranging the individual
.parquet
files in a specific way. if you specify the
.parquet
file directly, then you'd need to load them as Parquet format, not Delta. I don't have much experience reading Delta with Spark, but I've done it a lot with Polars:
Copy code
table_path = "/path/to/delta-table/"
df = pl.read_delta(table_path)
notice the lack of extension there. I don't have much experience with
so, in your case, you would: 1. load the
.parquet
file with
SparkDataset
2. save it as a delta table, hence defining a
DeltaTableDataset
something like
Copy code
raw_trips_spark:
  type: spark.SparkDataset
  filepath: data/01_raw/yellow_tripdata_2023_01.parquet
  file_format: "parquet"

raw_trips_delta:
  type: spark.DeltaTableDataset
  filepath: data/02_intermediate/yellow_tripdata_2023_01
(not sure if local filepaths are supposed to work with Delta, but you get the idea) (also, I removed the
@
, don't think you should use transcoding for this)
m
Thanks @Juan Luis really appreciate the response! That all makes perfect sense. That said, I think I'm still stuck. I modified my YAML as you suggested. Again I load the kernel. I run the following commands:
Copy code
df_spark = catalog.load("raw_trips_spark")
df_delta = catalog.load("raw_trips_delta")
The first line works, but line 2 fails on
~/data/01_raw/yellow_tripdata_2023_01 is not a Delta table.
The error makes sense because I haven't written a delta table yet. I can run:
Copy code
df_spark.write.mode("overwrite").format("delta").save("data/01_raw/yellow_tripdata_2023_01")
Now, if I reload the kernel both frames will load. The initial load failed because the delta table didn't exist, but I don't know how to perform the
df_spark.write
to delta in a kedro pipeline without specifying a path inside my node. Does this make sense or am I missing something?
j
but I don't know how to perform the
df_spark.write
to delta in a kedro pipeline without specifying a path inside my node.
you don't have to! you can do
Copy code
df = catalog.load("raw_trips_spark")

catalog.save("raw_trips_delta", df)
and I think it should work (renamed to
df
because the data is the same, regardless of where did it come from)
👀 1
m
Sorry for the delayed reply. I didn't work (at least with my current config), it responded with
Copy code
DatasetError: DeltaTableDataset is a read only dataset type
Ok I think if I'm understanding correctly, we can only read from spark.DeltaTableDataset? Sorry this is very in the weeds, but I just want to make sure I'm not applying a really unintended design pattern... I was able to successfully convert parquets to delta using a kwargs node like so:
Copy code
# node.py
def write_delta(**kwargs):
    path = "data/bronze/managed/"
    for name, df in kwargs.items():
        filepath = f"{path}{name}"
        df.write.mode("overwrite").format("delta").save(filepath)
    return filepath

# pipeline.py
from kedro.pipeline import Pipeline, pipeline, node
from .nodes import write_delta
def create_pipeline(**kwargs) -> Pipeline:
    return pipeline(
        [
            node(
                func=write_delta,
                inputs={"yellow_trips": "yellow_trips_spark"},
                outputs=None,
                name="delta"
            )

        ]
    )
I think I could just use a namespace to do this, but this will make it impossible to connect my nodes between this pipeline and my actual pipeline, right?
j
ugh... you're right,
spark.DeltaTableDataset
is a read-only dataset https://github.com/kedro-org/kedro-plugins/blob/b675485ee57c08280da1738d57e24725dd[…]b6451/kedro-datasets/kedro_datasets/spark/deltatable_dataset.py turns out that, for this specific dataset, you're supposed to do the
df.write
from within the node yourself. there's more context in https://github.com/kedro-org/kedro/pull/964 for the record, I added a reference to this thread here https://github.com/kedro-org/kedro/issues/3578#issuecomment-1925659709
m
@Juan Luis Thank you, this is very helpful. I'm still a little fuzzy on what the impacts are to my POC, but I'm going to push on and see where I can get. I'll report back if I have any learnings that may (or may not) be of interest to your team
j
that would be greatly appreciated 🙏🏼 keep us posted!
m
@Juan Luis Sorry one more question. I was under the impression that ManagedTableDataset was the same as DeltaTableDataset w/ additional Databricks functionality. Maybe that was a bad assumption? The reason I ask is that I see ManagedTableDataset has _save implemented with append, overwrite, and upsert. I'm wondering why that implementation is valid / rational for ManagedTable but not DeltaTable?
👍🏼 1
j
that's a good question. @Nok Lam Chan I see you were listed as reviewer? also cc @Jannic Holzer in case you have a moment to shed some light 🙂
d
ManagedTable was made for Databricks while deltatable was made for general Delta implementations (not specific to databricks), not sure why save wasn’t implemented on delta table but would probably have to do with save location and additional permissions for different storage locations (s3, gcs, adls, etc)
👍🏼 1
j
at this point, I'm looping @datajoely too
n
I haven't looked into this. spark.DeltaTable was initially created https://github.com/kedro-org/kedro/pull/964 here, maybe there are some insight in the discussion
j
yeah unfortunately that PR is extremely long and lots of the conversation is inside the review threads. I tried to do some archeology there but ran out of patience time.