Tom McHale
04/22/2024, 11:50 AMformatted_event_df:
type: pandas.ParquetDataset
filepath: "<s3://s3_bucket/filename.parquet>"
kedro.io.core.DatasetError: Failed while saving data to data set ParquetDataset(filepath=s3_bucket/file.parquet, load_args={}, protocol=s3, save_args={}).
Any ideas for what's going wrong here.
My custom class code is below:
from typing import Any, Dict
import numpy as np
from <http://kedro.io|kedro.io> import AbstractDataset
from pyathena import connect
from pyathena.pandas.cursor import PandasCursor
import pandas as pd
import boto3
import time
from <http://kedro.io|kedro.io> import AbstractDataset
class PyAthenaSQLDataset(AbstractDataset[np.ndarray, np.ndarray]):
"""``CustomAthenaPandasDataset`` loads / save image data from a given
sql query as a pandas dataframe
"""
def __init__(self, sql_query: str, s3_staging_dir: str, region_name: str):
"""Creates a new instance of CustomAthenaPandasDataset to load / save
data for given sql query.
Args:
sql_query: sql query for athena table
s3_staging_dir: s3 staging dict for env on aws
region_name: name of the region input.
"""
self.sql_query = sql_query
self.s3_staging_dir = s3_staging_dir
self.region_name = region_name
def _load(self) -> pd.DataFrame:
"""
Returns:
"""
cursor = connect(
s3_staging_dir=self.s3_staging_dir,
region_name=self.region_name,
cursor_class=PandasCursor,
).cursor()
pandas_df = cursor.execute(self.sql_query).as_pandas()
return pandas_df
def _save(self, data: pd.DataFrame) -> None:
"""Saves data to the specified filepath."""
return data
def _describe(self) -> Dict[str, Any]:
"""Returns a dict that describes the attributes of the dataset."""
return dict(s3_query=self.sql_query, save_location=self.s3_staging_dir)
Tom McHale
04/22/2024, 12:30 PMTraceback (most recent call last):
File "/opt/miniconda3/envs/test_env/lib/python3.10/site-packages/kedro/io/core.py", line 214, in save
self._save(data)
File "/opt/miniconda3/envs/test_env/lib/python3.10/site-packages/kedro_datasets/pandas/parquet_dataset.py", line 195, in _save
with self._fs.open(save_path, mode="wb") as fs_file:
File "/opt/miniconda3/envs/test_env/lib/python3.10/site-packages/fsspec/spec.py", line 1963, in __exit__
self.close()
File "/opt/miniconda3/envs/test_env/lib/python3.10/site-packages/pyathena/filesystem/s3.py", line 518, in close
super(S3File, self).close()
File "/opt/miniconda3/envs/test_env/lib/python3.10/site-packages/fsspec/spec.py", line 1930, in close
self.flush(force=True)
File "/opt/miniconda3/envs/test_env/lib/python3.10/site-packages/fsspec/spec.py", line 1796, in flush
self._initiate_upload()
File "/opt/miniconda3/envs/test_env/lib/python3.10/site-packages/pyathena/filesystem/s3.py", line 522, in _initiate_upload
raise NotImplementedError # pragma: no cover
NotImplementedError