Jose Nuñez
06/12/2023, 3:32 PMDataSetError: Failed while saving data to data set ParquetDataSet(filepath=/Users/jose_darnott/PycharmProjects/planta-litio/data/01_raw/data_sql.parquet, load_args={'engine': pyarrow}, protocol=file, save_args={'engine': pyarrow}). Duplicate column names found: ['timestamp', 'lims_BFIL CO3', 'lims_BFIL Ca %', ...]
# It's basically showing all the columns inside the dataframe, (here I'm showing only 3 of them)
.
My catalog entry looks like this:
data_sql:
type: pandas.ParquetDataSet
filepath: data/01_raw/data_sql.parquet
load_args:
engine: pyarrow
save_args:
engine: pyarrow
layer: III
.
I'm using:
kedro==0.18.8
pandas==2.0.1
pyarrow==12.0.0
.
The problem is quite similiar to this issue from 2022: https://github.com/kedro-org/kedro/discussions/1286 but in my case removing the load and save args as the OP mentions won't solve my problem.
.
This is quite puzzling, since I just did a df.to_clipboard() inside the node before returning my output, open it on a jupyter notebook and I see no problems with the dataframe, I can even save it to parquet without any issues. So that makes me thing the problem comes from kedro (?)
.
Anyways, as a workaround I'm saving the dataframe as csv and it's working just fine. But I'd like to find a way to make the parquet work again since this is a huge file.
Thanks in advance 🦜!Nok Lam Chan
06/12/2023, 3:36 PMkedro-datasets
installed?Jose Nuñez
06/12/2023, 3:40 PM[06/12/23 11:38:31] INFO Kedro project planta-litio session.py:360
[06/12/23 11:38:34] INFO Running node: get_local_credentials: get_sql_credentials(None) -> [sql_credentials] node.py:329
INFO Saving data to 'sql_credentials' (_SharedMemoryDataSet)... data_catalog.py:382
[06/12/23 11:38:35] INFO Loading data from 'diccionario_raw' (ExcelDataSet)... data_catalog.py:343
INFO Loading data from 'sql_credentials' (_SharedMemoryDataSet)... data_catalog.py:343
INFO Loading data from 'params:sql_data.date_init' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'params:sql_data.date_end' (MemoryDataSet)... data_catalog.py:343
INFO Running node: download_data_sql_azure: node.py:329
download_data_azure([diccionario_raw,sql_credentials,params:sql_data.date_init,params:sql_data.date_end]) -> [data_sql]
[06/12/23 11:38:36] WARNING /Users/jose_darnott/PycharmProjects/planta-litio/src/planta_litio/pipelines/pre_processing/nodes.py:344: UserWarning: pandas warnings.py:109
only supports SQLAlchemy connectable (engine/connection) or database string URI or sqlite3 DBAPI2 connection. Other DBAPI2
objects are not tested. Please consider using SQLAlchemy.
df = pd.read_sql(query_data, db_connection)
[06/12/23 11:38:41] INFO Saving data to 'data_sql' (ParquetDataSet)... data_catalog.py:382
_RemoteTraceback:
"""
Traceback (most recent call last):
File "/Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/kedro/io/core.py", line 214, in save
self._save(data)
File "/Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/kedro_datasets/pandas/parquet_dataset.py", line 188, in _save
data.to_parquet(bytes_buffer, **self._save_args)
File "/Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/pandas/core/frame.py", line 2889, in to_parquet
return to_parquet(
File "/Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/pandas/io/parquet.py", line 411, in to_parquet
impl.write(
File "/Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/pandas/io/parquet.py", line 159, in write
table = self.api.Table.from_pandas(df, **from_pandas_kwargs)
File "pyarrow/table.pxi", line 3681, in pyarrow.lib.Table.from_pandas
File "/Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/pyarrow/pandas_compat.py", line 570, in dataframe_to_arrays
convert_fields) = _get_columns_to_convert(df, schema, preserve_index,
File "/Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/pyarrow/pandas_compat.py", line 352, in _get_columns_to_convert
raise ValueError(
ValueError: Duplicate column names found: ['timestamp', ...]
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/concurrent/futures/process.py", line 239, in _process_worker
r = call_item.fn(*call_item.args, **call_item.kwargs)
File "/Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/kedro/runner/parallel_runner.py", line 122, in _run_node_synchronization
return run_node(node, catalog, hook_manager, is_async, session_id)
File "/Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/kedro/runner/runner.py", line 319, in run_node
node = _run_node_sequential(node, catalog, hook_manager, session_id)
File "/Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/kedro/runner/runner.py", line 435, in _run_node_sequential
catalog.save(name, data)
File "/Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/kedro/io/data_catalog.py", line 384, in save
dataset.save(data)
File "/Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/kedro/io/core.py", line 613, in save
super().save(data)
File "/Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/kedro/io/core.py", line 221, in save
raise DataSetError(message) from exc
kedro.io.core.DataSetError: Failed while saving data to data set ParquetDataSet(filepath=/Users/jose_darnott/PycharmProjects/planta-litio/data/01_raw/data_sql.parquet,
load_args={'engine': pyarrow}, protocol=file, save_args={'engine': pyarrow}).
Duplicate column names found: ['timestamp' ...]
The above exception was the direct cause of the following exception:
╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
│ /Users/jose_darnott/opt/miniconda3/envs/planta-litio/bin/kedro:8 in <module> │
│ │
│ /Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/kedro/framework │
│ /cli/cli.py:211 in main │
│ │
│ 208 │ """ │
│ 209 │ _init_plugins() │
│ 210 │ cli_collection = KedroCLI(project_path=Path.cwd()) │
│ ❱ 211 │ cli_collection() │
│ 212 │
│ │
│ /Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/click/core.py:1 │
│ 130 in __call__ │
│ │
│ /Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/kedro/framework │
│ /cli/cli.py:139 in main │
│ │
│ 136 │ │ ) │
│ 137 │ │ │
│ 138 │ │ try: │
│ ❱ 139 │ │ │ super().main( │
│ 140 │ │ │ │ args=args, │
│ 141 │ │ │ │ prog_name=prog_name, │
│ 142 │ │ │ │ complete_var=complete_var, │
│ │
│ /Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/click/core.py:1 │
│ 055 in main │
│ │
│ /Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/click/core.py:1 │
│ 657 in invoke │
│ │
│ /Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/click/core.py:1 │
│ 404 in invoke │
│ │
│ /Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/click/core.py:7 │
│ 60 in invoke │
│ │
│ /Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/kedro/framework │
│ /cli/project.py:472 in run │
│ │
│ 469 │ with KedroSession.create( │
│ 470 │ │ env=env, conf_source=conf_source, extra_params=params │
│ 471 │ ) as session: │
│ ❱ 472 │ │ session.run( │
│ 473 │ │ │ tags=tag, │
│ 474 │ │ │ runner=runner(is_async=is_async), │
│ 475 │ │ │ node_names=node_names, │
│ │
│ /Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/kedro/framework │
│ /session/session.py:426 in run │
│ │
│ 423 │ │ ) │
│ 424 │ │ │
│ 425 │ │ try: │
│ ❱ 426 │ │ │ run_result = runner.run( │
│ 427 │ │ │ │ filtered_pipeline, catalog, hook_manager, session_id │
│ 428 │ │ │ ) │
│ 429 │ │ │ self._run_called = True │
│ │
│ /Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/kedro/runner/ru │
│ nner.py:91 in run │
│ │
│ 88 │ │ │ <http://self._logger.info|self._logger.info>( │
│ 89 │ │ │ │ "Asynchronous mode is enabled for loading and saving data" │
│ 90 │ │ │ ) │
│ ❱ 91 │ │ self._run(pipeline, catalog, hook_manager, session_id) │
│ 92 │ │ │
│ 93 │ │ <http://self._logger.info|self._logger.info>("Pipeline execution completed successfully.") │
│ 94 │
│ │
│ /Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/kedro/runner/pa │
│ rallel_runner.py:334 in _run │
│ │
│ 331 │ │ │ │ │ break # pragma: no cover │
│ 332 │ │ │ │ done, futures = wait(futures, return_when=FIRST_COMPLETED) │
│ 333 │ │ │ │ for future in done: │
│ ❱ 334 │ │ │ │ │ node = future.result() │
│ 335 │ │ │ │ │ done_nodes.add(node) │
│ 336 │ │ │ │ │ │
│ 337 │ │ │ │ │ # Decrement load counts, and release any datasets we │
│ │
│ /Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/concurrent/futures/_base.py:4 │
│ 37 in result │
│ │
│ 434 │ │ │ │ if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: │
│ 435 │ │ │ │ │ raise CancelledError() │
│ 436 │ │ │ │ elif self._state == FINISHED: │
│ ❱ 437 │ │ │ │ │ return self.__get_result() │
│ 438 │ │ │ │ │
│ 439 │ │ │ │ self._condition.wait(timeout) │
│ 440 │
│ │
│ /Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/concurrent/futures/_base.py:3 │
│ 89 in __get_result │
│ │
│ 386 │ def __get_result(self): │
│ 387 │ │ if self._exception: │
│ 388 │ │ │ try: │
│ ❱ 389 │ │ │ │ raise self._exception │
│ 390 │ │ │ finally: │
│ 391 │ │ │ │ # Break a reference cycle with the exception in self._exception │
│ 392 │ │ │ │ self = None │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
DataSetError: Failed while saving data to data set ParquetDataSet(filepath=/Users/jose_darnott/PycharmProjects/planta-litio/data/01_raw/data_sql.parquet,
load_args={'engine': pyarrow}, protocol=file, save_args={'engine': pyarrow}).
Duplicate column names found: ['timestamp', ...]
Nok Lam Chan
06/12/2023, 3:44 PM<http://df.to|df.to>_parquet
?
bytes_buffer = BytesIO()
data.to_parquet(bytes_buffer, **self._save_args)
This is literally what Kedro’s does behind the scene, we are using the same logic whatever pandas is doing, Kedro handles the path but nothing more.<http://df.to|df.to>_parquet
- see if it works
2. If 1 works, then try
from io import BytesIO
bytes_buffer = BytesIO()
data.to_parquet(bytes_buffer)
pandas
or pyarrow
Jose Nuñez
06/12/2023, 3:51 PMNok Lam Chan
06/12/2023, 3:58 PM