Question: Is there a straight-forward way to retur...
# questions
r
Question: Is there a straight-forward way to return an object from a custom runner to reference later? Or to reference the runner class at runtime? Context: We have a custom
dask
runner. This runner essentially generates a
LocalCluster()
and
distributed.Client
object for our distributed work. This is assigned to
self._client
in the custom runner. In our nodes we need to reference the
Client
object directly to have fine-grained control over submitting task graphs. Trying to figure out how best to expose this within the nodes. Code:
Copy code
class LocalDaskRunner(SequentialRunner):
    """``LocalDaskRunner`` is a ``SequentialRunner`` implementation. It can be
    used to distribute execution of ``Node``s in the ``Pipeline`` across
    a local Dask cluster, relying on dask-native libraries like xarray or pandas.
    """

    log = logging.getLogger(__name__)

    def __init__(self, cluster_args: Dict[str, Any] = None, is_async: bool = False):
        """Instantiates the runner by creating a ``distributed.Client``.

        Args:
            cluster_args: Arguments defining the distributed.deploy.Cluster class
            is_async: If True, the node inputs and outputs are loaded and saved
                asynchronously with threads. Defaults to False.
        """
        if cluster_args is None:
            cluster_args = {}

        super().__init__(is_async=is_async)

        cluster_class = cluster_args.pop("cluster_class")
        cluster_class = load_obj(cluster_class)
        <http://self.log.info|self.log.info>(f"Connecting to Local Dask cluster with args: {cluster_args}")
        self._cluster = cluster_class(**cluster_args)
        <http://self.log.info|self.log.info>(f"Cluster specs: {self._cluster}")
        self._client = Client(address=self._cluster)
        <http://self.log.info|self.log.info>(f"Client specs: {self._client}")
Then of course in the CLI we have something like this under
run
Copy code
with execution_context:
            session.run(
                tags=tag,
                runner=runner_instance,
                node_names=node_names,
                from_nodes=from_nodes,
                to_nodes=to_nodes,
                from_inputs=from_inputs,
                to_outputs=to_outputs,
                load_versions=load_version,
                pipeline_name=pipeline,
            )
Wondering what to do with that
runner_instance
.
Update: discovered
Copy code
from dask.distributed import Client
client = Client.current()
Will test with this, but curious if there's any other thoughts in the meantime.