Andrew Stewart
02/02/2023, 6:28 PMawswrangler
, and it did a pretty decent job!
import awswrangler as wr
import pandas as pd
from <http://kedro.io|kedro.io> import DataSetError, AbstractDataSet
class AthenaDataSet(AbstractDataSet):
def __init__(self, query, s3_output, database, region_name='us-west-2'):
self._query = query
self._s3_output = s3_output
self._database = database
self._region_name = region_name
def _load(self):
df = wr.athena.read_sql_query(self._query, database=self._database, region_name=self._region_name)
return df
def _save(self, df: pd.DataFrame):
wr.athena.to_parquet(
df=df,
path=self._s3_output,
database=self._database,
region_name=self._region_name
)
def _exists(self):
return wr.s3.exists(self._s3_output)
def _describe(self):
return f"AthenaDataSet: query={self._query}, s3_output={self._s3_output}, database={self._database}"
It even included a usage example:
# catalog.yml
...
datasets:
my_dataset:
type: my_module.AthenaDataSet
query: "SELECT * FROM my_table"
s3_output: "<s3://my-bucket/my-data/my_table>"
database: "my_database"
region_name: "us-west-2"
...
datajoely
02/02/2023, 6:51 PMAndrew Stewart
02/02/2023, 7:00 PMdatajoely
02/02/2023, 7:00 PMAndrew Stewart
02/02/2023, 10:51 PMBen Horsburgh
02/03/2023, 2:56 PMAndrew Stewart
02/03/2023, 5:04 PMimport pyspark
from pyspark.sql import SparkSession
import awswrangler as wr
from <http://kedro.io|kedro.io> import AbstractDataSet, DataSetError
class AthenaAwswranglerDataSet(AbstractDataSet):
def __init__(self, table: str, database: str = None, s3_staging_dir: str = None, spark_session: SparkSession = None, **kwargs):
self._table = table
self._database = database
self._s3_staging_dir = s3_staging_dir
self._spark = spark_session or SparkSession.builder.getOrCreate()
super().__init__(**kwargs)
def _save(self, data: pyspark.sql.DataFrame) -> None:
try:
data.write.parquet(f"s3a://{self._s3_staging_dir}/{self._table}")
wr.catalog.create_parquet_table(
path=f"s3a://{self._s3_staging_dir}/{self._table}",
table=self._table,
database=self._database,
if_not_exists=True
)
except Exception as e:
raise DataSetError(f"Failed to save data to Athena table `{self._table}`: {e}")
def _load(self) -> pyspark.sql.DataFrame:
raise DataSetError("Loading data from Athena using PySpark is not supported.")
def _exists(self) -> bool:
raise DataSetError("Checking if an Athena table exists using PySpark is not supported.")
datajoely
02/10/2023, 10:08 AM