Filip Panovski11/03/2022, 3:35 PM
I run this pipeline from my local workstation for testing purposes. My Dask Cluster is then deployed on AWS EC2 (Scheduler+Workers) and they communicate privately. I noticed that on the last node, the
dask.ParquetDataSet from s3 -> MemoryDataSet -> dask.ParquetDataSet to s3
causes the data to be transferred to my local machine where the Kedro pipeline is being run, and then transferred back to s3. Needless to say this introduces costs and lag and is not what I intended. Can I tell the workers to write this data directly to s3? If not, what is the intended way to do this? I read through the documentation, and there is some very good information on getting the Pipeline to run as either step functions or on AWS Batch, but this is not quite the deployment flow I had in mind. Is it intended for the pipeline to be run on the same infrastructure where the workers are deployed?
MemoryDataSet -> dask.ParquetDataSet to s3
datajoely11/03/2022, 3:43 PM
Filip Panovski11/03/2022, 3:54 PM
, but a custom one that subclasses it. All I'm doing is passing a pyarrow schema to the
as opposed to the standard `dask.ParquetDataSet#_save`:
def _save(self, data: dd.DataFrame) -> None: data.to_parquet(self._filepath, schema=pa.schema(self.pyarrow_schema), storage_options=self.fs_args, **self._save_args)
so I don't think this should have any bearing on behaviour, but just in case...
def _save(self, data: dd.DataFrame) -> None: data.to_parquet(self._filepath, storage_options=self.fs_args, **self._save_args)
Deepyaman Datta11/03/2022, 4:26 PM
does is load as a Dask.distributed dataframe; it doesn't actually do execution utilizing a Dask cluster. A common use case for this is when reading a partitioned Parquet (eg. written by Spark), because Pandas doesn't handle that natively. If you actually want to utilize a Dask cluster for execution, you should create a `DaskRunner`: https://kedro.readthedocs.io/en/latest/deployment/dask.html
Filip Panovski11/04/2022, 8:14 AM
, but my Tasks are now failing with the following error:
kedro run --runner=product_classification.runner.DaskRunner --pipeline type_transformation
I'm guessing this is some kind of serialization error where it's trying to serialize and distribute my code to the workers (and failing to do so)? Do I have to package the code and install it on the Workers manually for the DaskRunner approach to work? I can't find much info unfortunately, though I'm not 100% sure that this is a Kedro issue at this point... Here's my project structure in case it helps: https://gist.github.com/filpano/de3f92542eb225fdfd46a7bd7a43145c
2022-11-04 10:26:10,440 - distributed.worker - ERROR - Could not deserialize task _run_node-8a52e7a0be61dae2e732e8bcf7b955cc Traceback (most recent call last): File "/opt/conda/lib/python3.10/site-packages/distributed/worker.py", line 2845, in loads_function result = cache_loads[bytes_object] File "/opt/conda/lib/python3.10/site-packages/distributed/collections.py", line 24, in __getitem__ value = super().__getitem__(key) File "/opt/conda/lib/python3.10/collections/__init__.py", line 1106, in __getitem__ raise KeyError(key) KeyError: b'\x80\x04\x95F\x00\x00\x00\x00\x00\x00\x00\x8c)product_classification.runner.dask_runner\x94\x8c\x14DaskRunner._run_node\x94\x93\x94.' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/opt/conda/lib/python3.10/site-packages/distributed/worker.py", line 2212, in execute function, args, kwargs = await self._maybe_deserialize_task(ts) File "/opt/conda/lib/python3.10/site-packages/distributed/worker.py", line 2185, in _maybe_deserialize_task function, args, kwargs = _deserialize(*ts.run_spec) File "/opt/conda/lib/python3.10/site-packages/distributed/worker.py", line 2856, in _deserialize function = loads_function(function) File "/opt/conda/lib/python3.10/site-packages/distributed/worker.py", line 2847, in loads_function result = pickle.loads(bytes_object) File "/opt/conda/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 73, in loads return pickle.loads(x) ModuleNotFoundError: No module named 'product_classification'
Deepyaman Datta11/04/2022, 11:53 AM
should automatically include dependencies during serialization (
would not), but some threads from a few years ago say Dask uses
by default. However, https://distributed.dask.org/en/stable/serialization.html says that Dask uses a mixture of
by default, but maybe you can specify
? I'm unfortunately not a Dask expert (much as I would love to be!), so take these thoughts with plenty of salt.
Filip Panovski11/04/2022, 11:56 AM