https://kedro.org/ logo
#questions
Title
# questions
j

Jose Nuñez

06/12/2023, 3:32 PM
Hello fellow Kedroids K🤖! . I'm having a very strange issue when saving a file to parquet. I'm getting this error:
DataSetError: Failed while saving data to data set ParquetDataSet(filepath=/Users/jose_darnott/PycharmProjects/planta-litio/data/01_raw/data_sql.parquet, load_args={'engine': pyarrow}, protocol=file, save_args={'engine': pyarrow}). Duplicate column names found:  ['timestamp', 'lims_BFIL CO3', 'lims_BFIL Ca %', ...]
# It's basically showing all the columns inside the dataframe, (here I'm showing only 3 of them) . My catalog entry looks like this:
Copy code
data_sql:
  type: pandas.ParquetDataSet
  filepath: data/01_raw/data_sql.parquet
  load_args:
    engine: pyarrow
  save_args:
    engine: pyarrow
  layer: III
. I'm using: kedro==0.18.8 pandas==2.0.1 pyarrow==12.0.0 . The problem is quite similiar to this issue from 2022: https://github.com/kedro-org/kedro/discussions/1286 but in my case removing the load and save args as the OP mentions won't solve my problem. . This is quite puzzling, since I just did a df.to_clipboard() inside the node before returning my output, open it on a jupyter notebook and I see no problems with the dataframe, I can even save it to parquet without any issues. So that makes me thing the problem comes from kedro (?) . Anyways, as a workaround I'm saving the dataframe as csv and it's working just fine. But I'd like to find a way to make the parquet work again since this is a huge file. Thanks in advance 🦜!
n

Nok Lam Chan

06/12/2023, 3:36 PM
Can you post the full traceback or try to scroll up the error a bit? This is raised by Kedro but there should be an error throw by either pandas or pyarrow I suspect.
And do you have
kedro-datasets
installed?
j

Jose Nuñez

06/12/2023, 3:40 PM
yes, kedro-datasets==1.2.0
Copy code
[06/12/23 11:38:31] INFO     Kedro project planta-litio                                                                                                       session.py:360
[06/12/23 11:38:34] INFO     Running node: get_local_credentials: get_sql_credentials(None) -> [sql_credentials]                                                 node.py:329
                    INFO     Saving data to 'sql_credentials' (_SharedMemoryDataSet)...                                                                  data_catalog.py:382
[06/12/23 11:38:35] INFO     Loading data from 'diccionario_raw' (ExcelDataSet)...                                                                       data_catalog.py:343
                    INFO     Loading data from 'sql_credentials' (_SharedMemoryDataSet)...                                                               data_catalog.py:343
                    INFO     Loading data from 'params:sql_data.date_init' (MemoryDataSet)...                                                            data_catalog.py:343
                    INFO     Loading data from 'params:sql_data.date_end' (MemoryDataSet)...                                                             data_catalog.py:343
                    INFO     Running node: download_data_sql_azure:                                                                                              node.py:329
                             download_data_azure([diccionario_raw,sql_credentials,params:sql_data.date_init,params:sql_data.date_end]) -> [data_sql]                        
[06/12/23 11:38:36] WARNING  /Users/jose_darnott/PycharmProjects/planta-litio/src/planta_litio/pipelines/pre_processing/nodes.py:344: UserWarning: pandas    warnings.py:109
                             only supports SQLAlchemy connectable (engine/connection) or database string URI or sqlite3 DBAPI2 connection. Other DBAPI2                     
                             objects are not tested. Please consider using SQLAlchemy.                                                                                      
                               df = pd.read_sql(query_data, db_connection)                                                                                                  
                                                                                                                                                                            
[06/12/23 11:38:41] INFO     Saving data to 'data_sql' (ParquetDataSet)...                                                                               data_catalog.py:382
_RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/kedro/io/core.py", line 214, in save
    self._save(data)
  File "/Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/kedro_datasets/pandas/parquet_dataset.py", line 188, in _save
    data.to_parquet(bytes_buffer, **self._save_args)
  File "/Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/pandas/core/frame.py", line 2889, in to_parquet
    return to_parquet(
  File "/Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/pandas/io/parquet.py", line 411, in to_parquet
    impl.write(
  File "/Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/pandas/io/parquet.py", line 159, in write
    table = self.api.Table.from_pandas(df, **from_pandas_kwargs)
  File "pyarrow/table.pxi", line 3681, in pyarrow.lib.Table.from_pandas
  File "/Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/pyarrow/pandas_compat.py", line 570, in dataframe_to_arrays
    convert_fields) = _get_columns_to_convert(df, schema, preserve_index,
  File "/Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/pyarrow/pandas_compat.py", line 352, in _get_columns_to_convert
    raise ValueError(
ValueError: Duplicate column names found: ['timestamp', ...]

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/concurrent/futures/process.py", line 239, in _process_worker
    r = call_item.fn(*call_item.args, **call_item.kwargs)
  File "/Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/kedro/runner/parallel_runner.py", line 122, in _run_node_synchronization
    return run_node(node, catalog, hook_manager, is_async, session_id)
  File "/Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/kedro/runner/runner.py", line 319, in run_node
    node = _run_node_sequential(node, catalog, hook_manager, session_id)
  File "/Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/kedro/runner/runner.py", line 435, in _run_node_sequential
    catalog.save(name, data)
  File "/Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/kedro/io/data_catalog.py", line 384, in save
    dataset.save(data)
  File "/Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/kedro/io/core.py", line 613, in save
    super().save(data)
  File "/Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/kedro/io/core.py", line 221, in save
    raise DataSetError(message) from exc
kedro.io.core.DataSetError: Failed while saving data to data set ParquetDataSet(filepath=/Users/jose_darnott/PycharmProjects/planta-litio/data/01_raw/data_sql.parquet, 
load_args={'engine': pyarrow}, protocol=file, save_args={'engine': pyarrow}).
Duplicate column names found: ['timestamp' ...]

The above exception was the direct cause of the following exception:
Copy code
╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
│ /Users/jose_darnott/opt/miniconda3/envs/planta-litio/bin/kedro:8 in <module>                     │
│                                                                                                  │
│ /Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/kedro/framework │
│ /cli/cli.py:211 in main                                                                          │
│                                                                                                  │
│   208 │   """                                                                                    │
│   209 │   _init_plugins()                                                                        │
│   210 │   cli_collection = KedroCLI(project_path=Path.cwd())                                     │
│ ❱ 211 │   cli_collection()                                                                       │
│   212                                                                                            │
│                                                                                                  │
│ /Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/click/core.py:1 │
│ 130 in __call__                                                                                  │
│                                                                                                  │
│ /Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/kedro/framework │
│ /cli/cli.py:139 in main                                                                          │
│                                                                                                  │
│   136 │   │   )                                                                                  │
│   137 │   │                                                                                      │
│   138 │   │   try:                                                                               │
│ ❱ 139 │   │   │   super().main(                                                                  │
│   140 │   │   │   │   args=args,                                                                 │
│   141 │   │   │   │   prog_name=prog_name,                                                       │
│   142 │   │   │   │   complete_var=complete_var,                                                 │
│                                                                                                  │
│ /Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/click/core.py:1 │
│ 055 in main                                                                                      │
│                                                                                                  │
│ /Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/click/core.py:1 │
│ 657 in invoke                                                                                    │
│                                                                                                  │
│ /Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/click/core.py:1 │
│ 404 in invoke                                                                                    │
│                                                                                                  │
│ /Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/click/core.py:7 │
│ 60 in invoke                                                                                     │
│                                                                                                  │
│ /Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/kedro/framework │
│ /cli/project.py:472 in run                                                                       │
│                                                                                                  │
│   469 │   with KedroSession.create(                                                              │
│   470 │   │   env=env, conf_source=conf_source, extra_params=params                              │
│   471 │   ) as session:                                                                          │
│ ❱ 472 │   │   session.run(                                                                       │
│   473 │   │   │   tags=tag,                                                                      │
│   474 │   │   │   runner=runner(is_async=is_async),                                              │
│   475 │   │   │   node_names=node_names,                                                         │
│                                                                                                  │
│ /Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/kedro/framework │
│ /session/session.py:426 in run                                                                   │
│                                                                                                  │
│   423 │   │   )                                                                                  │
│   424 │   │                                                                                      │
│   425 │   │   try:                                                                               │
│ ❱ 426 │   │   │   run_result = runner.run(                                                       │
│   427 │   │   │   │   filtered_pipeline, catalog, hook_manager, session_id                       │
│   428 │   │   │   )                                                                              │
│   429 │   │   │   self._run_called = True                                                        │
│                                                                                                  │
│ /Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/kedro/runner/ru │
│ nner.py:91 in run                                                                                │
│                                                                                                  │
│    88 │   │   │   <http://self._logger.info|self._logger.info>(                                                             │
│    89 │   │   │   │   "Asynchronous mode is enabled for loading and saving data"                 │
│    90 │   │   │   )                                                                              │
│ ❱  91 │   │   self._run(pipeline, catalog, hook_manager, session_id)                             │
│    92 │   │                                                                                      │
│    93 │   │   <http://self._logger.info|self._logger.info>("Pipeline execution completed successfully.")                    │
│    94                                                                                            │
│                                                                                                  │
│ /Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/site-packages/kedro/runner/pa │
│ rallel_runner.py:334 in _run                                                                     │
│                                                                                                  │
│   331 │   │   │   │   │   break  # pragma: no cover                                              │
│   332 │   │   │   │   done, futures = wait(futures, return_when=FIRST_COMPLETED)                 │
│   333 │   │   │   │   for future in done:                                                        │
│ ❱ 334 │   │   │   │   │   node = future.result()                                                 │
│   335 │   │   │   │   │   done_nodes.add(node)                                                   │
│   336 │   │   │   │   │                                                                          │
│   337 │   │   │   │   │   # Decrement load counts, and release any datasets we                   │
│                                                                                                  │
│ /Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/concurrent/futures/_base.py:4 │
│ 37 in result                                                                                     │
│                                                                                                  │
│   434 │   │   │   │   if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:                     │
│   435 │   │   │   │   │   raise CancelledError()                                                 │
│   436 │   │   │   │   elif self._state == FINISHED:                                              │
│ ❱ 437 │   │   │   │   │   return self.__get_result()                                             │
│   438 │   │   │   │                                                                              │
│   439 │   │   │   │   self._condition.wait(timeout)                                              │
│   440                                                                                            │
│                                                                                                  │
│ /Users/jose_darnott/opt/miniconda3/envs/planta-litio/lib/python3.8/concurrent/futures/_base.py:3 │
│ 89 in __get_result                                                                               │
│                                                                                                  │
│   386 │   def __get_result(self):                                                                │
│   387 │   │   if self._exception:                                                                │
│   388 │   │   │   try:                                                                           │
│ ❱ 389 │   │   │   │   raise self._exception                                                      │
│   390 │   │   │   finally:                                                                       │
│   391 │   │   │   │   # Break a reference cycle with the exception in self._exception            │
│   392 │   │   │   │   self = None                                                                │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
DataSetError: Failed while saving data to data set ParquetDataSet(filepath=/Users/jose_darnott/PycharmProjects/planta-litio/data/01_raw/data_sql.parquet, 
load_args={'engine': pyarrow}, protocol=file, save_args={'engine': pyarrow}).
Duplicate column names found: ['timestamp', ...]
n

Nok Lam Chan

06/12/2023, 3:44 PM
Inteesting, can you double check if it works with
<http://df.to|df.to>_parquet
?
Copy code
bytes_buffer = BytesIO()
        data.to_parquet(bytes_buffer, **self._save_args)
This is literally what Kedro’s does behind the scene, we are using the same logic whatever pandas is doing, Kedro handles the path but nothing more.
This is weird. Please try to do this. 1.
<http://df.to|df.to>_parquet
- see if it works 2. If 1 works, then try
Copy code
from io import BytesIO
bytes_buffer = BytesIO()
data.to_parquet(bytes_buffer)
If 1 works but 2 fail, then I would suggest to create a minimal reproducible example and open an issue on either
pandas
or
pyarrow
j

Jose Nuñez

06/12/2023, 3:51 PM
I just added the df.to_parquet just above the return of the node, and it fails (in same fashion as above)... so you right, this is probably not a direct problem with kedro
n

Nok Lam Chan

06/12/2023, 3:58 PM
Maybe try to pickle it and load it from a notebook. There is good chance that there are just duplicate columns.
Or if you are Pycharm/VSCode user, it maybe easier to attach a debugger and step through the breakpoints. https://docs.kedro.org/en/stable/development/set_up_pycharm.html#debugging