Hi all, I'm trying to make a custom dataset entry...
# questions
t
Hi all, I'm trying to make a custom dataset entry for loading data from an Athena Table on AWS as a pandas dataframe. This is because the default kedro way of doing it using 'pandas.SQLQueryDataset' uses a slow method of loading the data from Athena, read_sql, and I want to speed this up using pyathena connect. I have followed the steps in this article. I load the data from Athena, but tend to use this to then merge as features onto an existing dataframe, so won't need to use this class for saving any data. The data is loaded perfectly, but at the end of the node I save a new dataframe with the following catalog entry and get the error below which I can't work out how to fix:
Copy code
formatted_event_df:
  type: pandas.ParquetDataset
  filepath: "<s3://s3_bucket/filename.parquet>"
Copy code
kedro.io.core.DatasetError: Failed while saving data to data set ParquetDataset(filepath=s3_bucket/file.parquet, load_args={}, protocol=s3, save_args={}).
Any ideas for what's going wrong here. My custom class code is below:
Copy code
from typing import Any, Dict

import numpy as np

from <http://kedro.io|kedro.io> import AbstractDataset
from pyathena import connect
from pyathena.pandas.cursor import PandasCursor
import pandas as pd
import boto3
import time

from <http://kedro.io|kedro.io> import AbstractDataset



class PyAthenaSQLDataset(AbstractDataset[np.ndarray, np.ndarray]):
    """``CustomAthenaPandasDataset`` loads / save image data from a given
    sql query as a pandas dataframe

    """

    def __init__(self, sql_query: str, s3_staging_dir: str, region_name: str):
        """Creates a new instance of CustomAthenaPandasDataset to load / save
        data for given sql query.

        Args:
            sql_query: sql query for athena table
            s3_staging_dir: s3 staging dict for env on aws
            region_name: name of the region input.
        """
        self.sql_query = sql_query
        self.s3_staging_dir = s3_staging_dir
        self.region_name = region_name

    def _load(self) -> pd.DataFrame:
        """

        Returns:

        """
        cursor = connect(
            s3_staging_dir=self.s3_staging_dir,
            region_name=self.region_name,
            cursor_class=PandasCursor,
        ).cursor()

        pandas_df = cursor.execute(self.sql_query).as_pandas()

        return pandas_df

    def _save(self, data: pd.DataFrame) -> None:
        """Saves data to the specified filepath."""
        return data

    def _describe(self) -> Dict[str, Any]:
        """Returns a dict that describes the attributes of the dataset."""
        return dict(s3_query=self.sql_query, save_location=self.s3_staging_dir)
😢 2
Here is a bit more detail on the error, looks like a pyathena issue:
Copy code
Traceback (most recent call last):
  File "/opt/miniconda3/envs/test_env/lib/python3.10/site-packages/kedro/io/core.py", line 214, in save
    self._save(data)
  File "/opt/miniconda3/envs/test_env/lib/python3.10/site-packages/kedro_datasets/pandas/parquet_dataset.py", line 195, in _save
    with self._fs.open(save_path, mode="wb") as fs_file:
  File "/opt/miniconda3/envs/test_env/lib/python3.10/site-packages/fsspec/spec.py", line 1963, in __exit__
    self.close()
  File "/opt/miniconda3/envs/test_env/lib/python3.10/site-packages/pyathena/filesystem/s3.py", line 518, in close
    super(S3File, self).close()
  File "/opt/miniconda3/envs/test_env/lib/python3.10/site-packages/fsspec/spec.py", line 1930, in close
    self.flush(force=True)
  File "/opt/miniconda3/envs/test_env/lib/python3.10/site-packages/fsspec/spec.py", line 1796, in flush
    self._initiate_upload()
  File "/opt/miniconda3/envs/test_env/lib/python3.10/site-packages/pyathena/filesystem/s3.py", line 522, in _initiate_upload
    raise NotImplementedError  # pragma: no cover
NotImplementedError