(Cross-posting from Discord since I saw the announ...
# questions
f
(Cross-posting from Discord since I saw the announcement there... a little too late) Hello everyone! I'm relatively new to Kedro. I'm using it together with Dask for some data processing, and I have some issues/questions with regards to data locality. I have a pipeline that has three nodes where the datasets are loaded like follows:
dask.ParquetDataSet from s3 -> MemoryDataSet -> dask.ParquetDataSet to s3
I run this pipeline from my local workstation for testing purposes. My Dask Cluster is then deployed on AWS EC2 (Scheduler+Workers) and they communicate privately. I noticed that on the last node, the
MemoryDataSet -> dask.ParquetDataSet to s3
causes the data to be transferred to my local machine where the Kedro pipeline is being run, and then transferred back to s3. Needless to say this introduces costs and lag and is not what I intended. Can I tell the workers to write this data directly to s3? If not, what is the intended way to do this? I read through the documentation, and there is some very good information on getting the Pipeline to run as either step functions or on AWS Batch, but this is not quite the deployment flow I had in mind. Is it intended for the pipeline to be run on the same infrastructure where the workers are deployed?
d
Iā€™m not actually sure on this one, @Deepyaman Datta any ideas?
f
It might be worth noting that the last node isn't quite the standard
dask.ParquetDataSet
, but a custom one that subclasses it. All I'm doing is passing a pyarrow schema to the
_save
method though:
Copy code
def _save(self, data: dd.DataFrame) -> None:
        data.to_parquet(self._filepath, schema=pa.schema(self.pyarrow_schema), storage_options=self.fs_args, **self._save_args)
as opposed to the standard `dask.ParquetDataSet#_save`:
Copy code
def _save(self, data: dd.DataFrame) -> None:
        data.to_parquet(self._filepath, storage_options=self.fs_args, **self._save_args)
so I don't think this should have any bearing on behaviour, but just in case...
šŸ™ 1
d
All
dask.ParquetDataSet
does is load as a Dask.distributed dataframe; it doesn't actually do execution utilizing a Dask cluster. A common use case for this is when reading a partitioned Parquet (eg. written by Spark), because Pandas doesn't handle that natively. If you actually want to utilize a Dask cluster for execution, you should create a `DaskRunner`: https://kedro.readthedocs.io/en/latest/deployment/dask.html
f
Ah, I did see those docs but I must have misunderstood them a bit... I'll give it a try. Thanks!
I'm running into some problems with the above approach, unfortunately. I've got a runner set up after following the docs and am starting the pipeline as
kedro run --runner=product_classification.runner.DaskRunner --pipeline type_transformation
, but my Tasks are now failing with the following error:
Copy code
2022-11-04 10:26:10,440 - distributed.worker - ERROR - Could not deserialize task _run_node-8a52e7a0be61dae2e732e8bcf7b955cc
Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/distributed/worker.py", line 2845, in loads_function
    result = cache_loads[bytes_object]
  File "/opt/conda/lib/python3.10/site-packages/distributed/collections.py", line 24, in __getitem__
    value = super().__getitem__(key)
  File "/opt/conda/lib/python3.10/collections/__init__.py", line 1106, in __getitem__
    raise KeyError(key)
KeyError: b'\x80\x04\x95F\x00\x00\x00\x00\x00\x00\x00\x8c)product_classification.runner.dask_runner\x94\x8c\x14DaskRunner._run_node\x94\x93\x94.'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/distributed/worker.py", line 2212, in execute
    function, args, kwargs = await self._maybe_deserialize_task(ts)
  File "/opt/conda/lib/python3.10/site-packages/distributed/worker.py", line 2185, in _maybe_deserialize_task
    function, args, kwargs = _deserialize(*ts.run_spec)
  File "/opt/conda/lib/python3.10/site-packages/distributed/worker.py", line 2856, in _deserialize
    function = loads_function(function)
  File "/opt/conda/lib/python3.10/site-packages/distributed/worker.py", line 2847, in loads_function
    result = pickle.loads(bytes_object)
  File "/opt/conda/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 73, in loads
    return pickle.loads(x)
ModuleNotFoundError: No module named 'product_classification'
I'm guessing this is some kind of serialization error where it's trying to serialize and distribute my code to the workers (and failing to do so)? Do I have to package the code and install it on the Workers manually for the DaskRunner approach to work? I can't find much info unfortunately, though I'm not 100% sure that this is a Kedro issue at this point... Here's my project structure in case it helps: https://gist.github.com/filpano/de3f92542eb225fdfd46a7bd7a43145c
d
Yeah, exactly, the code needs to be available to all the workers, but I'm not exactly sure what the solution here is. I did a bit of quick looking around, and it seems like
cloudpickle
should automatically include dependencies during serialization (
pickle
would not), but some threads from a few years ago say Dask uses
cloudpickle
by default. However, https://distributed.dask.org/en/stable/serialization.html says that Dask uses a mixture of
dask
and
pickle
by default, but maybe you can specify
cloudpickle
? I'm unfortunately not a Dask expert (much as I would love to be!), so take these thoughts with plenty of salt.
šŸ‘ 1
f
Thank you very much, that's already more info than I was able to find on the topic - I'll take a look. I'm also unfortunately not a Dask expert : )