Question: Is there a straight-forward way to retur...
# questions
r
Question: Is there a straight-forward way to return an object from a custom runner to reference later? Or to reference the runner class at runtime? Context: We have a custom
dask
runner. This runner essentially generates a
LocalCluster()
and
distributed.Client
object for our distributed work. This is assigned to
self._client
in the custom runner. In our nodes we need to reference the
Client
object directly to have fine-grained control over submitting task graphs. Trying to figure out how best to expose this within the nodes. Code:
Copy code
class LocalDaskRunner(SequentialRunner):
    """``LocalDaskRunner`` is a ``SequentialRunner`` implementation. It can be
    used to distribute execution of ``Node``s in the ``Pipeline`` across
    a local Dask cluster, relying on dask-native libraries like xarray or pandas.
    """

    log = logging.getLogger(__name__)

    def __init__(self, cluster_args: Dict[str, Any] = None, is_async: bool = False):
        """Instantiates the runner by creating a ``distributed.Client``.

        Args:
            cluster_args: Arguments defining the distributed.deploy.Cluster class
            is_async: If True, the node inputs and outputs are loaded and saved
                asynchronously with threads. Defaults to False.
        """
        if cluster_args is None:
            cluster_args = {}

        super().__init__(is_async=is_async)

        cluster_class = cluster_args.pop("cluster_class")
        cluster_class = load_obj(cluster_class)
        <http://self.log.info|self.log.info>(f"Connecting to Local Dask cluster with args: {cluster_args}")
        self._cluster = cluster_class(**cluster_args)
        <http://self.log.info|self.log.info>(f"Cluster specs: {self._cluster}")
        self._client = Client(address=self._cluster)
        <http://self.log.info|self.log.info>(f"Client specs: {self._client}")
Then of course in the CLI we have something like this under
run
Copy code
with execution_context:
            session.run(
                tags=tag,
                runner=runner_instance,
                node_names=node_names,
                from_nodes=from_nodes,
                to_nodes=to_nodes,
                from_inputs=from_inputs,
                to_outputs=to_outputs,
                load_versions=load_version,
                pipeline_name=pipeline,
            )
Wondering what to do with that
runner_instance
.
Update: discovered
Copy code
from dask.distributed import Client
client = Client.current()
Will test with this, but curious if there's any other thoughts in the meantime.
n
In our nodes we need to reference the
Client
object directly to have fine-grained control over submitting task graphs
Can you elaborate how this works? having Nodes to access runner is coupling your node logic to a specific runner (i.e. it doesn't work when you use
SequentialRunner
because Client doesn't exist). It would be helpful if you can provide a snippet assuming you can access the
client
within node, how are you using it?
r
That's a good point. I didn't think about the issue of coupling a node to a runner. The above
Client.current()
command ended up working. I think I'm going to handle this at the package/code level with a
try/except
block with trying to access
Client.current()
. We're just running a series of
client.submit(...)
calls with
dask
to use dask futures functionality. This is an internal package used in our pipelines that cannot be run without dask, so it's safe in our case to build that logic in. But I'll keep it at the package level and not at the kedro/node/runner level.
n
Is it possible for the runner to submit the call instead? This is where I would expect a dask runner to do. Does the daskrunner only holds the client and replicate what the Sequential Runner does?
r
The DaskRunner is instantiating a cluster of arbitrary dask workers and generating the client to connect it to. So I believe the answer is yes to what you're asking -- it's just spinning up a compute cluster. This typically does the job for 90% of workflows. It instantiates a client/cluster and the distributed code runs with it. In some workloads we need to leverage the dask client object to submit task graph futures for optimization. (See https://docs.dask.org/en/stable/futures.html). But this is pretty rare and is workflow-dependent
We can't have the runner submit to the client since it's a few steps into the node, within the python code. Some derived data within the node needs to be manually submitted to the dask client to start some futures calculations
👀 1