Evžen Šírek
02/03/2023, 10:01 AMfastparquet
engine with the ParquetDataSet?
There is a possibility to specify the engine in the catalog entry:
dataset:
type: pandas.ParquetDataSet
filepath: data/dataset.parquet
load_args:
engine: fastparquet
save_args:
engine: fastparquet
However, when I do that, I get the DataSetError
with I/O operation on closed file
when Kedro tries to save the dataset.
When I manually save the data with pandas
and engine=fastparquet
(which is what Kedro should do according to the docs), it works well.
Is this expected? Thanks! :))
Environment:
python==3.10.4, pandas==1.5.1, kedro==0.18.4, fastparquet==2023.1.0
datajoely
02/03/2023, 1:21 PMEvžen Šírek
02/03/2023, 1:55 PMpyarrow
- like not being able to save timedelta
datatypes - which fastparquet
helped us with, so we would like to stick with it. We would also like to use the append
feature of fastparquet
.datajoely
02/03/2023, 2:04 PMIf you look at our implementation the save method is doing a few things:
def _save(self, data: pd.DataFrame) -> None:
save_path = get_filepath_str(self._get_save_path(), self._protocol)
if Path(save_path).is_dir():
raise DataSetError(
f"Saving {self.__class__.__name__} to a directory is not supported."
)
if "partition_cols" in self._save_args:
raise DataSetError(
f"{self.__class__.__name__} does not support save argument "
f"'partition_cols'. Please use 'kedro.io.PartitionedDataSet' instead."
)
bytes_buffer = BytesIO()
data.to_parquet(bytes_buffer, **self._save_args)
with self._fs.open(save_path, mode="wb") as fs_file:
fs_file.write(bytes_buffer.getvalue())
self._invalidate_cache()
• It writes the dataframe to a bytes buffer in memory
• It opens an fsspec path as binary
• it writes the data to fileI/O operation on closed file
is happening at the fsspec part<http://location.of.my|location.of.my>_class.ParquetDataSet
Evžen Šírek
02/03/2023, 2:16 PMdatajoely
02/03/2023, 2:24 PMEvžen Šírek
02/03/2023, 2:31 PM