Hugo Evers
06/07/2024, 10:01 AMDatasetError: Failed while saving data to data set PickleDataset(backend=pickle,
filepath=.../performance_optimisation/client/data/08_reporting/explaine
r.joblib, load_args=
{}
, protocol=s3, save_args=
{}
).
[Errno 22] Part number must be an integer between 1 and 10000, inclusive
we’re generating a huge explainer for an automatic bidding engine, we’ve now sampling on roughly 1M datapoints to keep things manageable. the pipeline is deployed to AWS batch and running on instances with 600gb+ of RAM, so thats not an issue.
the issue is that we should probably specify a larger chunksize for saving to s3, but my question is on how to do that.
i saw there is a _`fs_args` key for the pickledataset, specifically the `open_args_save`_
However it is unclear to me how to specify the chunksize given the fsspec docs, does anyone have experience doing this?datajoely
06/07/2024, 10:13 AMHugo Evers
06/07/2024, 10:18 AMHugo Evers
06/07/2024, 10:18 AMdatajoely
06/07/2024, 10:18 AMHugo Evers
06/07/2024, 10:19 AMdatajoely
06/07/2024, 10:21 AMHugo Evers
06/07/2024, 10:22 AMdatajoely
06/07/2024, 10:23 AMParittionedDataSet
(or inherit from this and extend) and then read them back
import pickle
import fsspec
# Function to write data in chunks
def write_pickle_in_chunks(data, file_path, chunk_size=1024):
pickled_data = pickle.dumps(data)
with fsspec.open(file_path, 'wb') as f:
for i in range(0, len(pickled_data), chunk_size):
chunk = pickled_data[i:i+chunk_size]
f.write(chunk)
# Function to read data in chunks and reconstruct the pickle
def read_pickle_in_chunks(file_path, chunk_size=1024):
chunks = []
with fsspec.open(file_path, 'rb') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
chunks.append(chunk)
pickled_data = b''.join(chunks)
data = pickle.loads(pickled_data)
return data
# Sample data to pickle
data = {'key': 'value', 'number': 42, 'list': [1, 2, 3, 4, 5]}
# Specify the file path
file_path = 'data.pickle'
# Write data in chunks
write_pickle_in_chunks(data, file_path)
# Read data in chunks and reconstruct the original data
reconstructed_data = read_pickle_in_chunks(file_path)
# Verify that the reconstructed data matches the original data
print(reconstructed_data == data) # Should print: True
datajoely
06/07/2024, 10:24 AMdatajoely
06/07/2024, 10:24 AMHugo Evers
06/07/2024, 10:25 AMimport boto3
import joblib
import io
from kedro.io.core import AbstractDataset, DatasetError
class S3MultipartPickleDataSet(AbstractDataset):
def __init__(self, filepath: str, save_args: dict = None):
self._filepath = filepath
self._save_args = save_args or {}
self._s3 = boto3.client('s3')
def _describe(self) -> dict:
return dict(filepath=self._filepath, save_args=self._save_args)
def _load(self) -> object:
raise DatasetError("Load not implemented")
def _save(self, data: object) -> None:
try:
# Create an in-memory bytes buffer
with io.BytesIO() as temp_buffer:
# Dump the data to the in-memory buffer using joblib
joblib.dump(data, temp_buffer)
# Ensure the buffer's cursor is at the start
temp_buffer.seek(0)
# Stream the buffer to S3
bucket, key = self._parse_s3_url(self._filepath)
self._s3.upload_fileobj(temp_buffer, bucket, key, Config=self._get_transfer_config())
except Exception as exc:
raise DatasetError(f"Failed to save data to {self._filepath}") from exc
def _get_transfer_config(self):
return boto3.s3.transfer.TransferConfig(
multipart_threshold=int(self._save_args.get('multipart_chunksize', 25 * 1024 * 1024)),
max_concurrency=10,
multipart_chunksize=int(self._save_args.get('multipart_chunksize', 25 * 1024 * 1024)),
use_threads=True
)
@staticmethod
def _parse_s3_url(s3_url: str):
assert s3_url.startswith('s3://')
bucket_key = s3_url[len('s3://'):].split('/', 1)
return bucket_key[0], bucket_key[1]
Hugo Evers
06/07/2024, 10:25 AMdatajoely
06/07/2024, 10:25 AMdatajoely
06/07/2024, 10:25 AMHugo Evers
06/07/2024, 10:26 AMHugo Evers
06/07/2024, 10:26 AMHugo Evers
06/07/2024, 10:27 AMself
but pickling it direcly works fineHugo Evers
06/07/2024, 10:27 AMdatajoely
06/07/2024, 10:27 AMdatajoely
06/07/2024, 10:27 AMHugo Evers
06/07/2024, 10:28 AMdatajoely
06/07/2024, 10:28 AMHugo Evers
06/07/2024, 10:28 AMHugo Evers
06/07/2024, 10:29 AMdatajoely
06/07/2024, 10:32 AMPartitionedPickleDataset
or prove that it worksdatajoely
06/07/2024, 10:32 AMmarrrcin
06/07/2024, 10:38 AMHugo Evers
06/07/2024, 10:42 AMmarrrcin
06/07/2024, 10:42 AMHugo Evers
06/07/2024, 10:42 AM< its right in the name:p
Hugo Evers
06/07/2024, 10:42 AMdatajoely
06/07/2024, 10:45 AMkedro-datasets
for consumability?marrrcin
06/07/2024, 10:45 AMmarrrcin
06/07/2024, 10:45 AMJuan Luis
06/07/2024, 11:07 AMHugo Evers
06/07/2024, 11:07 AMHugo Evers
06/12/2024, 4:28 PMJuan Luis
06/12/2024, 5:03 PMdatajoely
06/12/2024, 5:15 PMHugo Evers
06/14/2024, 9:24 AM