Tom McHale
04/22/2024, 11:50 AMformatted_event_df:
  type: pandas.ParquetDataset
  filepath: "<s3://s3_bucket/filename.parquet>"
kedro.io.core.DatasetError: Failed while saving data to data set ParquetDataset(filepath=s3_bucket/file.parquet, load_args={}, protocol=s3, save_args={}).
Any ideas for what's going wrong here.
My custom class code is below:
from typing import Any, Dict
import numpy as np
from <http://kedro.io|kedro.io> import AbstractDataset
from pyathena import connect
from pyathena.pandas.cursor import PandasCursor
import pandas as pd
import boto3
import time
from <http://kedro.io|kedro.io> import AbstractDataset
class PyAthenaSQLDataset(AbstractDataset[np.ndarray, np.ndarray]):
    """``CustomAthenaPandasDataset`` loads / save image data from a given
    sql query as a pandas dataframe
    """
    def __init__(self, sql_query: str, s3_staging_dir: str, region_name: str):
        """Creates a new instance of CustomAthenaPandasDataset to load / save
        data for given sql query.
        Args:
            sql_query: sql query for athena table
            s3_staging_dir: s3 staging dict for env on aws
            region_name: name of the region input.
        """
        self.sql_query = sql_query
        self.s3_staging_dir = s3_staging_dir
        self.region_name = region_name
    def _load(self) -> pd.DataFrame:
        """
        Returns:
        """
        cursor = connect(
            s3_staging_dir=self.s3_staging_dir,
            region_name=self.region_name,
            cursor_class=PandasCursor,
        ).cursor()
        pandas_df = cursor.execute(self.sql_query).as_pandas()
        return pandas_df
    def _save(self, data: pd.DataFrame) -> None:
        """Saves data to the specified filepath."""
        return data
    def _describe(self) -> Dict[str, Any]:
        """Returns a dict that describes the attributes of the dataset."""
        return dict(s3_query=self.sql_query, save_location=self.s3_staging_dir)Tom McHale
04/22/2024, 12:30 PMTraceback (most recent call last):
  File "/opt/miniconda3/envs/test_env/lib/python3.10/site-packages/kedro/io/core.py", line 214, in save
    self._save(data)
  File "/opt/miniconda3/envs/test_env/lib/python3.10/site-packages/kedro_datasets/pandas/parquet_dataset.py", line 195, in _save
    with self._fs.open(save_path, mode="wb") as fs_file:
  File "/opt/miniconda3/envs/test_env/lib/python3.10/site-packages/fsspec/spec.py", line 1963, in __exit__
    self.close()
  File "/opt/miniconda3/envs/test_env/lib/python3.10/site-packages/pyathena/filesystem/s3.py", line 518, in close
    super(S3File, self).close()
  File "/opt/miniconda3/envs/test_env/lib/python3.10/site-packages/fsspec/spec.py", line 1930, in close
    self.flush(force=True)
  File "/opt/miniconda3/envs/test_env/lib/python3.10/site-packages/fsspec/spec.py", line 1796, in flush
    self._initiate_upload()
  File "/opt/miniconda3/envs/test_env/lib/python3.10/site-packages/pyathena/filesystem/s3.py", line 522, in _initiate_upload
    raise NotImplementedError  # pragma: no cover
NotImplementedError