Fazil B. Topal
10/11/2023, 1:23 PMimport logging
from pathlib import Path
from hera.auth import ArgoCLITokenGenerator
from hera.shared import global_config
from hera.workflows import (
DAG,
Container,
Env,
Parameter,
RetryPolicy,
RetryStrategy,
Workflow,
)
from kedro.framework.project import pipelines
from kedro.framework.startup import bootstrap_project
from kedro.pipeline import Pipeline
# More info on hera docs
global_config.host = "<https://argo-workflows.io>" # Put YOUR OWN INSTANCE PROFILE
global_config.token = ArgoCLITokenGenerator
global_config.namespace = "NAMESPACE OF THE ARGO WORKFLOWS IN K8s"
global_config.service_account_name = "SERVICE ACCOUNT OF THE ARGO WORKFLOWS IN K8s"
IMAGE_REGISTRY = "TO BE FILLED"
IMAGE = "IMAGE NAME TO ADD HERE"
WORKFLOW_DEFAULTS = {}
PROJECT_DIR = Path(__file__)
logger = logging.getLogger()
def convert_camel_case_to_kebab_case(name: str):
return "".join(["-" + c.lower() if c.isupper() else c for c in name]).lstrip("-")
def get_container(image_tag: str, envs: list[Env] = None) -> Container:
return Container(
name="k8s-pod",
inputs=[
Parameter(name="cmd"),
Parameter(name="memory"),
Parameter(name="cpu"),
],
retry_strategy=RetryStrategy(limit="5", retry_policy=RetryPolicy.on_error),
image=f"{IMAGE_REGISTRY}/{IMAGE}:{image_tag}",
termination_message_policy="FallbackToLogsOnError",
image_pull_policy="Always",
# In order to define pod resources for each task, use podspecpatch
pod_spec_patch="""
{
"containers":[
{
"name":"main",
"resources":{
"limits":{
"cpu": "{{inputs.parameters.cpu}}",
"memory": "{{inputs.parameters.memory}}"
},
"requests":{
"cpu": "{{inputs.parameters.cpu}}",
"memory": "{{inputs.parameters.memory}}"
}
}
}
]
}
""",
env=envs, # Add user specified envs
command=["bash"],
args=["-c", "{{inputs.parameters.cmd}}"],
)
def get_pipeline(pipeline_name: str = None) -> Pipeline:
metadata = bootstrap_project(Path(PROJECT_DIR))
<http://logger.info|logger.info>("Project name: %s", metadata.project_name)
<http://logger.info|logger.info>("Initializing Kedro...")
pipeline_name = pipeline_name or "__default__"
pipeline = pipelines.get(pipeline_name)
return pipeline
def create_workflow(
image_tag: str,
envs: list[Env] = None,
generate_name="kedro-wf-",
kedro_env: str = "staging",
kedro_pipeline_name: str = None,
**extra_params
) -> Workflow:
"""Create a workflow"""
if envs is None:
envs = []
with Workflow(
generate_name=generate_name,
entrypoint="main",
**WORKFLOW_DEFAULTS,
) as w:
k8s_pod = get_container(image_tag=image_tag, envs=envs)
with DAG(name="main"):
for node, deps in get_pipeline(
kedro_pipeline_name
).node_dependencies.items():
# node.name, [d.name for d in deps]
kedro_cmd = (
f"kedro "
f"run "
f"--env "
f"{kedro_env} "
f"--nodes "
f"{node.name} "
)
if extra_params is not None:
kedro_cmd = kedro_cmd + f"--params {extra_params}"
# Use can add tags to kedro nodes for compute heavy task to automatically
# assign more CPU resources. The code below can be changed depending on
# user need. Similar logic can be implemented for memory as well
if "ComputeHeavyTask" in node.tags:
cpu = "6"
else:
cpu = "1"
memory = "5Gi"
k8s_pod(
name=convert_camel_case_to_kebab_case(f"{node.name}"),
arguments={
"cmd": kedro_cmd,
"cpu": cpu,
"memory": memory,
},
dependencies=[
convert_camel_case_to_kebab_case(d.name) for d in deps
],
)
return w
datajoely
10/11/2023, 1:31 PMJuan Luis
10/11/2023, 1:47 PMsince the current documentation is outdated in the webpageupdating the docs! π @Fazil B. Topal would you like to send a PR? similar to what we did recently with Prefect
Fazil B. Topal
10/11/2023, 1:55 PMdatajoely
10/11/2023, 1:56 PM