Filip Panovski
11/03/2022, 3:35 PMdask.ParquetDataSet from s3 -> MemoryDataSet -> dask.ParquetDataSet to s3
I run this pipeline from my local workstation for testing purposes. My Dask Cluster is then deployed on AWS EC2 (Scheduler+Workers) and they communicate privately. I noticed that on the last node, the MemoryDataSet -> dask.ParquetDataSet to s3
causes the data to be transferred to my local machine where the Kedro pipeline is being run, and then transferred back to s3. Needless to say this introduces costs and lag and is not what I intended.
Can I tell the workers to write this data directly to s3? If not, what is the intended way to do this? I read through the documentation, and there is some very good information on getting the Pipeline to run as either step functions or on AWS Batch, but this is not quite the deployment flow I had in mind. Is it intended for the pipeline to be run on the same infrastructure where the workers are deployed?datajoely
11/03/2022, 3:43 PMFilip Panovski
11/03/2022, 3:54 PMdask.ParquetDataSet
, but a custom one that subclasses it. All I'm doing is passing a pyarrow schema to the _save
method though:
def _save(self, data: dd.DataFrame) -> None:
data.to_parquet(self._filepath, schema=pa.schema(self.pyarrow_schema), storage_options=self.fs_args, **self._save_args)
as opposed to the standard `dask.ParquetDataSet#_save`:
def _save(self, data: dd.DataFrame) -> None:
data.to_parquet(self._filepath, storage_options=self.fs_args, **self._save_args)
so I don't think this should have any bearing on behaviour, but just in case...Deepyaman Datta
11/03/2022, 4:26 PMdask.ParquetDataSet
does is load as a Dask.distributed dataframe; it doesn't actually do execution utilizing a Dask cluster. A common use case for this is when reading a partitioned Parquet (eg. written by Spark), because Pandas doesn't handle that natively.
If you actually want to utilize a Dask cluster for execution, you should create a `DaskRunner`: https://kedro.readthedocs.io/en/latest/deployment/dask.htmlFilip Panovski
11/04/2022, 8:14 AMkedro run --runner=product_classification.runner.DaskRunner --pipeline type_transformation
, but my Tasks are now failing with the following error:
2022-11-04 10:26:10,440 - distributed.worker - ERROR - Could not deserialize task _run_node-8a52e7a0be61dae2e732e8bcf7b955cc
Traceback (most recent call last):
File "/opt/conda/lib/python3.10/site-packages/distributed/worker.py", line 2845, in loads_function
result = cache_loads[bytes_object]
File "/opt/conda/lib/python3.10/site-packages/distributed/collections.py", line 24, in __getitem__
value = super().__getitem__(key)
File "/opt/conda/lib/python3.10/collections/__init__.py", line 1106, in __getitem__
raise KeyError(key)
KeyError: b'\x80\x04\x95F\x00\x00\x00\x00\x00\x00\x00\x8c)product_classification.runner.dask_runner\x94\x8c\x14DaskRunner._run_node\x94\x93\x94.'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/conda/lib/python3.10/site-packages/distributed/worker.py", line 2212, in execute
function, args, kwargs = await self._maybe_deserialize_task(ts)
File "/opt/conda/lib/python3.10/site-packages/distributed/worker.py", line 2185, in _maybe_deserialize_task
function, args, kwargs = _deserialize(*ts.run_spec)
File "/opt/conda/lib/python3.10/site-packages/distributed/worker.py", line 2856, in _deserialize
function = loads_function(function)
File "/opt/conda/lib/python3.10/site-packages/distributed/worker.py", line 2847, in loads_function
result = pickle.loads(bytes_object)
File "/opt/conda/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 73, in loads
return pickle.loads(x)
ModuleNotFoundError: No module named 'product_classification'
I'm guessing this is some kind of serialization error where it's trying to serialize and distribute my code to the workers (and failing to do so)? Do I have to package the code and install it on the Workers manually for the DaskRunner approach to work? I can't find much info unfortunately, though I'm not 100% sure that this is a Kedro issue at this point...
Here's my project structure in case it helps: https://gist.github.com/filpano/de3f92542eb225fdfd46a7bd7a43145cDeepyaman Datta
11/04/2022, 11:53 AMcloudpickle
should automatically include dependencies during serialization (pickle
would not), but some threads from a few years ago say Dask uses cloudpickle
by default. However, https://distributed.dask.org/en/stable/serialization.html says that Dask uses a mixture of dask
and pickle
by default, but maybe you can specify cloudpickle
?
I'm unfortunately not a Dask expert (much as I would love to be!), so take these thoughts with plenty of salt.Filip Panovski
11/04/2022, 11:56 AM