hey everyone, I've been working some time on usin...
# plugins-integrations
f
hey everyone, I've been working some time on using kedro with Argo and since the current documentation is outdated in the webpage, I have implemented kind of a script using Hera Workflows to convert kedro pipelines to Argo Workflows DAG. The following code converts your kedro nodes to Kubernetes Pods and creates Argo DAGs based on your pipeline dependencies. I hope this is useful to some people. We can also try to put this into the page if the developers find it useful.
Copy code
import logging
from pathlib import Path

from hera.auth import ArgoCLITokenGenerator
from hera.shared import global_config
from hera.workflows import (
    DAG,
    Container,
    Env,
    Parameter,
    RetryPolicy,
    RetryStrategy,
    Workflow,
)
from kedro.framework.project import pipelines
from kedro.framework.startup import bootstrap_project
from kedro.pipeline import Pipeline


# More info on hera docs
global_config.host = "<https://argo-workflows.io>"    # Put YOUR OWN INSTANCE PROFILE
global_config.token = ArgoCLITokenGenerator
global_config.namespace = "NAMESPACE OF THE ARGO WORKFLOWS IN K8s"
global_config.service_account_name = "SERVICE ACCOUNT OF THE ARGO WORKFLOWS IN K8s"


IMAGE_REGISTRY = "TO BE FILLED"
IMAGE = "IMAGE NAME TO ADD HERE"

WORKFLOW_DEFAULTS = {}

PROJECT_DIR = Path(__file__)

logger = logging.getLogger()


def convert_camel_case_to_kebab_case(name: str):
    return "".join(["-" + c.lower() if c.isupper() else c for c in name]).lstrip("-")


def get_container(image_tag: str, envs: list[Env] = None) -> Container:
    return Container(
        name="k8s-pod",
        inputs=[
            Parameter(name="cmd"),
            Parameter(name="memory"),
            Parameter(name="cpu"),
        ],
        retry_strategy=RetryStrategy(limit="5", retry_policy=RetryPolicy.on_error),
        image=f"{IMAGE_REGISTRY}/{IMAGE}:{image_tag}",
        termination_message_policy="FallbackToLogsOnError",
        image_pull_policy="Always",
        # In order to define pod resources for each task, use podspecpatch
        pod_spec_patch="""
            {
                "containers":[
                    {
                        "name":"main",
                        "resources":{
                            "limits":{
                                "cpu": "{{inputs.parameters.cpu}}",
                                "memory": "{{inputs.parameters.memory}}"
                            },
                            "requests":{
                                "cpu": "{{inputs.parameters.cpu}}",
                                "memory": "{{inputs.parameters.memory}}"
                            }
                        }
                    }
                ]
            }
        """,
        env=envs,  # Add user specified envs
        command=["bash"],
        args=["-c", "{{inputs.parameters.cmd}}"],
    )


def get_pipeline(pipeline_name: str = None) -> Pipeline:
    metadata = bootstrap_project(Path(PROJECT_DIR))

    <http://logger.info|logger.info>("Project name: %s", metadata.project_name)

    <http://logger.info|logger.info>("Initializing Kedro...")

    pipeline_name = pipeline_name or "__default__"

    pipeline = pipelines.get(pipeline_name)

    return pipeline


def create_workflow(
    image_tag: str,
    envs: list[Env] = None,
    generate_name="kedro-wf-",
    kedro_env: str = "staging",
    kedro_pipeline_name: str = None,
    **extra_params
) -> Workflow:
    """Create a workflow"""

    if envs is None:
        envs = []

    with Workflow(
        generate_name=generate_name,
        entrypoint="main",
        **WORKFLOW_DEFAULTS,
    ) as w:
        k8s_pod = get_container(image_tag=image_tag, envs=envs)

        with DAG(name="main"):
            for node, deps in get_pipeline(
                kedro_pipeline_name
            ).node_dependencies.items():
                # node.name, [d.name for d in deps]
                kedro_cmd = (
                    f"kedro "
                    f"run "
                    f"--env "
                    f"{kedro_env} "
                    f"--nodes "
                    f"{node.name} "
                )
                if extra_params is not None:
                    kedro_cmd = kedro_cmd + f"--params {extra_params}"
                
                # Use can add tags to kedro nodes for compute heavy task to automatically
                # assign more CPU resources. The code below can be changed depending on 
                # user need. Similar logic can be implemented for memory as well
                if "ComputeHeavyTask" in node.tags:
                    cpu = "6"
                else:
                    cpu = "1"

                memory = "5Gi"

                k8s_pod(
                    name=convert_camel_case_to_kebab_case(f"{node.name}"),
                    arguments={
                        "cmd": kedro_cmd,
                        "cpu": cpu,
                        "memory": memory,
                    },
                    dependencies=[
                        convert_camel_case_to_kebab_case(d.name) for d in deps
                    ],
                )

    return w
d
@Juan Luis what’s the best way to capture this for others?
docs/blogpost/gh discssion?
thank you @Fazil B. Topal for sharing back!
πŸŽ‰ 1
j
since the current documentation is outdated in the webpage
updating the docs! πŸ˜„ @Fazil B. Topal would you like to send a PR? similar to what we did recently with Prefect
and thanks a lot for sharing already!
πŸ‘ 1
f
I could take couple of days but if mostly updating the docs should be fine to raise a PR to replace the outdated page I suppose? πŸ˜„
❀️ 2
d
We would absolutely appreciate that πŸ™
πŸ₯³ 1