Jannik Wiedenhaupt
10/30/2024, 7:00 PMdatajoely
10/30/2024, 7:20 PMdatajoely
10/30/2024, 7:20 PMdatajoely
10/30/2024, 7:22 PMkedro run --runner class.path.of.your.AsyncRunner
datajoely
10/30/2024, 7:23 PMimport asyncio
class SequentialAsyncRunner(AbstractRunner):
"""``SequentialAsyncRunner`` is an ``AbstractRunner`` implementation that
can be used to run the ``Pipeline`` asynchronously.
"""
async def _run(
self,
pipeline: Pipeline,
catalog: CatalogProtocol,
hook_manager: PluginManager,
session_id: str | None = None,
) -> None:
"""The method implementing sequential pipeline running asynchronously.
Args:
pipeline: The ``Pipeline`` to run.
catalog: An implemented instance of ``CatalogProtocol`` from which to fetch data.
hook_manager: The ``PluginManager`` to activate hooks.
session_id: The id of the session.
Raises:
Exception: in case of any downstream node failure.
"""
if not self._is_async:
<http://self._logger.info|self._logger.info>(
"Using synchronous mode for loading and saving data. Use the --async flag "
"for potential performance gains. <https://docs.kedro.org/en/stable/nodes_and_pipelines/run_a_pipeline.html#load-and-save-asynchronously>"
)
nodes = pipeline.nodes
done_nodes = set()
load_counts = Counter(chain.from_iterable(n.inputs for n in nodes))
for exec_index, node in enumerate(nodes):
try:
if asyncio.iscoroutinefunction(node.func):
# Await the async function
await run_node(node, catalog, hook_manager, self._is_async, session_id)
else:
# Run as normal if the function is synchronous
run_node(node, catalog, hook_manager, self._is_async, session_id)
done_nodes.add(node)
except Exception:
self._suggest_resume_scenario(pipeline, done_nodes, catalog)
raise
# Decrement load counts and release any datasets we've finished with
for dataset in node.inputs:
load_counts[dataset] -= 1
if load_counts[dataset] < 1 and dataset not in pipeline.inputs():
catalog.release(dataset)
for dataset in node.outputs:
if load_counts[dataset] < 1 and dataset not in pipeline.outputs():
catalog.release(dataset)
<http://self._logger.info|self._logger.info>(
"Completed %d out of %d tasks", exec_index + 1, len(nodes)
)
Jannik Wiedenhaupt
11/01/2024, 1:58 AMAkshata Patel
11/05/2024, 5:38 PMJannik Wiedenhaupt
11/09/2024, 12:01 AMaprocess_data()
. I then have a wrapper function called process_data()
with the same parameters. I call process_data from my pipeline and aprocess_data from my notebookJannik Wiedenhaupt
11/09/2024, 12:02 AMasyncio.run(aprocess_data())
datajoely
11/09/2024, 9:32 AMОлег Литвинов
12/01/2024, 7:33 PMJannik Wiedenhaupt
12/02/2024, 10:06 PM