Mark Druffel
02/02/2024, 8:31 PMraw_trips@spark:
type: spark.SparkDataset
filepath: data/01_raw/yellow_tripdata_2023_01.parquet
file_format: "delta"
raw_trips@delta:
type: spark.DeltaTableDataset
filepath: data/01_raw/yellow_tripdata_2023_01.parquet
When I run catalog.load("raw_trips@spark")
I get the following error:
DatasetError: Failed while loading data from data set SparkDataset(file_format=delta,
filepath=~/data/01_raw/yellow_tripdata_2023_01.parquet, load_args={}, save_args={}).
`~/data/01_raw/yellow_tripdata_2023_01.parquet` is not a Delta table.
When I run catalog.load("raw_trips@delta")
I get the following error:
DatasetError: Failed while loading data from data set
DeltaTableDataset(filepath=~/01_raw/yellow_tripdata_2023_01.parquet, fs_prefix=).
`~/data/01_raw/yellow_tripdata_2023_01.parquet` is not a Delta table.
Am I missing something? Can I create delta tables within my pipeline?Juan Luis
02/02/2024, 8:38 PM.parquet
files in a specific way.
if you specify the .parquet
file directly, then you'd need to load them as Parquet format, not Delta.
I don't have much experience reading Delta with Spark, but I've done it a lot with Polars:
table_path = "/path/to/delta-table/"
df = pl.read_delta(table_path)
notice the lack of extension there.
I don't have much experience withJuan Luis
02/02/2024, 8:42 PM.parquet
file with SparkDataset
2. save it as a delta table, hence defining a DeltaTableDataset
something like
raw_trips_spark:
type: spark.SparkDataset
filepath: data/01_raw/yellow_tripdata_2023_01.parquet
file_format: "parquet"
raw_trips_delta:
type: spark.DeltaTableDataset
filepath: data/02_intermediate/yellow_tripdata_2023_01
(not sure if local filepaths are supposed to work with Delta, but you get the idea)
(also, I removed the @
, don't think you should use transcoding for this)Mark Druffel
02/02/2024, 9:56 PMdf_spark = catalog.load("raw_trips_spark")
df_delta = catalog.load("raw_trips_delta")
The first line works, but line 2 fails on ~/data/01_raw/yellow_tripdata_2023_01 is not a Delta table.
The error makes sense because I haven't written a delta table yet. I can run:
df_spark.write.mode("overwrite").format("delta").save("data/01_raw/yellow_tripdata_2023_01")
Now, if I reload the kernel both frames will load.
The initial load failed because the delta table didn't exist, but I don't know how to perform the df_spark.write
to delta in a kedro pipeline without specifying a path inside my node. Does this make sense or am I missing something?Juan Luis
02/03/2024, 3:00 AMbut I don't know how to perform theyou don't have to! you can doto delta in a kedro pipeline without specifying a path inside my node.df_spark.write
df = catalog.load("raw_trips_spark")
catalog.save("raw_trips_delta", df)
and I think it should work
(renamed to df
because the data is the same, regardless of where did it come from)Mark Druffel
02/03/2024, 6:31 PMDatasetError: DeltaTableDataset is a read only dataset type
Mark Druffel
02/03/2024, 7:34 PM# node.py
def write_delta(**kwargs):
path = "data/bronze/managed/"
for name, df in kwargs.items():
filepath = f"{path}{name}"
df.write.mode("overwrite").format("delta").save(filepath)
return filepath
# pipeline.py
from kedro.pipeline import Pipeline, pipeline, node
from .nodes import write_delta
def create_pipeline(**kwargs) -> Pipeline:
return pipeline(
[
node(
func=write_delta,
inputs={"yellow_trips": "yellow_trips_spark"},
outputs=None,
name="delta"
)
]
)
I think I could just use a namespace to do this, but this will make it impossible to connect my nodes between this pipeline and my actual pipeline, right?Juan Luis
02/04/2024, 9:38 AMspark.DeltaTableDataset
is a read-only dataset https://github.com/kedro-org/kedro-plugins/blob/b675485ee57c08280da1738d57e24725dd[…]b6451/kedro-datasets/kedro_datasets/spark/deltatable_dataset.py
turns out that, for this specific dataset, you're supposed to do the df.write
from within the node yourself. there's more context in https://github.com/kedro-org/kedro/pull/964
for the record, I added a reference to this thread here https://github.com/kedro-org/kedro/issues/3578#issuecomment-1925659709Mark Druffel
02/05/2024, 9:13 PMJuan Luis
02/05/2024, 11:55 PMMark Druffel
02/06/2024, 6:15 PMJuan Luis
02/06/2024, 6:26 PMDanny Farah
02/06/2024, 6:37 PMJuan Luis
02/06/2024, 6:38 PMNok Lam Chan
02/06/2024, 7:38 PMJuan Luis
02/07/2024, 9:05 AMJuan Luis
02/07/2024, 9:39 AM