Riley Brady
06/21/2024, 8:45 PMdask
runner. This runner essentially generates a LocalCluster()
and distributed.Client
object for our distributed work. This is assigned to self._client
in the custom runner.
In our nodes we need to reference the Client
object directly to have fine-grained control over submitting task graphs. Trying to figure out how best to expose this within the nodes.
Code:
class LocalDaskRunner(SequentialRunner):
"""``LocalDaskRunner`` is a ``SequentialRunner`` implementation. It can be
used to distribute execution of ``Node``s in the ``Pipeline`` across
a local Dask cluster, relying on dask-native libraries like xarray or pandas.
"""
log = logging.getLogger(__name__)
def __init__(self, cluster_args: Dict[str, Any] = None, is_async: bool = False):
"""Instantiates the runner by creating a ``distributed.Client``.
Args:
cluster_args: Arguments defining the distributed.deploy.Cluster class
is_async: If True, the node inputs and outputs are loaded and saved
asynchronously with threads. Defaults to False.
"""
if cluster_args is None:
cluster_args = {}
super().__init__(is_async=is_async)
cluster_class = cluster_args.pop("cluster_class")
cluster_class = load_obj(cluster_class)
<http://self.log.info|self.log.info>(f"Connecting to Local Dask cluster with args: {cluster_args}")
self._cluster = cluster_class(**cluster_args)
<http://self.log.info|self.log.info>(f"Cluster specs: {self._cluster}")
self._client = Client(address=self._cluster)
<http://self.log.info|self.log.info>(f"Client specs: {self._client}")
Then of course in the CLI we have something like this under run
with execution_context:
session.run(
tags=tag,
runner=runner_instance,
node_names=node_names,
from_nodes=from_nodes,
to_nodes=to_nodes,
from_inputs=from_inputs,
to_outputs=to_outputs,
load_versions=load_version,
pipeline_name=pipeline,
)
Wondering what to do with that runner_instance
.Riley Brady
06/21/2024, 9:14 PMfrom dask.distributed import Client
client = Client.current()
Will test with this, but curious if there's any other thoughts in the meantime.Nok Lam Chan
06/24/2024, 6:03 AMIn our nodes we need to reference theCan you elaborate how this works? having Nodes to access runner is coupling your node logic to a specific runner (i.e. it doesn't work when you useobject directly to have fine-grained control over submitting task graphsClient
SequentialRunner
because Client doesn't exist).
It would be helpful if you can provide a snippet assuming you can access the client
within node, how are you using it?Riley Brady
06/24/2024, 4:05 PMClient.current()
command ended up working. I think I'm going to handle this at the package/code level with a try/except
block with trying to access Client.current()
.
We're just running a series of client.submit(...)
calls with dask
to use dask futures functionality. This is an internal package used in our pipelines that cannot be run without dask, so it's safe in our case to build that logic in. But I'll keep it at the package level and not at the kedro/node/runner level.Nok Lam Chan
06/24/2024, 4:10 PMRiley Brady
06/24/2024, 6:18 PMRiley Brady
06/24/2024, 6:19 PM