Hello. Has anyone deployed a Kedro project using t...
# questions
n
Hello. Has anyone deployed a Kedro project using the airflow orchestrator? The tutorial does not give a complete answer, because of this the DAG does not work even it is visible in DAGS.
n
Thank you) I was trying to deploy on standalone airflow instance which is running on my localhost (in container). I followed the tutorial(Also youtube one), but it holds on "split" node((. I thought maybe there should be additional settings in order to run it on local docker images🤷‍♂️
n
Did you manage to fix this? Is there some error when it runs the split node?
n
Task has already been started. But It does not go any further
n
Is there any log on your airflow instance side? I don't see any error in that screenshot.
Can you also provide what kedro/python/os/airflow version you are using? Are you just following the kedro's tutorial?
n
kedro 0.19.2 os: Fedora 39, also docker compose file with airflow image airflow: tried to use 2.7.3, 2.8.1
Another case. When I create simple dag with 2 python operators(and 2 functions for each one), it went well. But when I wanted to run KedroSession in my first function, airflow starts it, but the process kept in the running status
👍🏼 1
image.png,image.png
Interesting observation:
Copy code
if __name__ == "__main__":
    dag.test()
runs the code successfully.
Copy code
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from pathlib import Path
from kedro.framework.startup import bootstrap_project
from kedro.framework.session import KedroSession
from kedro.framework.project import configure_project

# Define default_args dictionary to pass to the DAG
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

# Instantiate a DAG
dag = DAG(
    'simple_python_operator_example',
    default_args=default_args,
    description='A simple example DAG with two PythonOperators',
    schedule_interval='@hourly',  # Set to run once an hour
    catchup=False
)


# Define Python functions for the two tasks
def task_one_function():
    print("Executing Task One-------")
    env = "airflow"
    pipeline_name = "__default__"
    project_path = Path.cwd()

    bootstrap_project(Path.cwd())
    with KedroSession.create(project_path=project_path,
                             env=env) as session:
        session.run(pipeline_name)


def task_two_function():
    print("Executing Task Two-------")
    # Add your Task Two logic here


# Instantiate PythonOperators for the two tasks
task_one = PythonOperator(
    task_id='task_one',
    python_callable=task_one_function,
    dag=dag,
)

task_two = PythonOperator(
    task_id='task_two',
    python_callable=task_two_function,
    dag=dag,
)

# Set the task dependencies
task_one >> task_two

if __name__ == "__main__":
    dag.test()
So what can be the reason? Like airflow creates a thread to run a task with KedroSession, then stucks in between
n
Thanks, that's great debugging. Is it possible to rollback to an older version, say airflow <2.6? I vaguely remember 2.7/2.8 was causing issues in CI, just to eliminate issues from airflow
If you unwrap
KedroOperator
, it not too different from the base one.
Copy code
class KedroOperator(BaseOperator):
    @apply_defaults
    def __init__(
        self,
        package_name: str,
        pipeline_name: str,
        node_name: str,
        project_path: str | Path,
        env: str,
        *args, **kwargs
    ) -> None:
        super().__init__(*args, **kwargs)
        self.package_name = package_name
        self.pipeline_name = pipeline_name
        self.node_name = node_name
        self.project_path = project_path
        self.env = env

    def execute(self, context):
        configure_project(self.package_name)
        with KedroSession.create(project_path=self.project_path,
                                 env=self.env) as session:
            session.run(self.pipeline_name, node_names=[self.node_name])
Would it work if you have your
PythonOperator
running this:
Copy code
configure_project(self.package_name)
        with KedroSession.create(project_path=self.project_path,
                                 env=self.env) as session:
            session.run(self.pipeline_name, node_names=[self.node_name])
before that, do you know if the first node is finished? or it's just stuck at "running" but never get execute?
n
Exactly. It's just stuck at "running".
I will try to run with airflow 2.6. but as remember the problem still existed there. I will write later after test. Thank you
With which configuration can I make tests? E.g. airflow, kedro versions?
n
Are you able to tell if the node has been executed or not? Maybe some log
It's just stuck at "running".
n
Node was executed, but not finished. Script ran until KedroSession (I was able to track some "prints"), then stuck at KedroSession
n
https://github.com/kedro-org/kedro-plugins/issues/13#issuecomment-1088710755 I don't know if it's related, it has been a while. I vaguely remember it causes airflow throw weird SIGKILL error and pipeline will stop at the end of first node. Worth a try
If not, feel free to raise an issue on Github, ideally it's something that we can reproduce on our end.
n
Thank you. I try it))
Hi! The problem was in the version of airflow. It gets stuck starting with airflow version 2.6. So it is better to use version <= 2.5.3. In such way it is working perfectly. Thank you one more time!
1
n
Awesome, thanks for getting back!