Nurmukhammad Abdul-Qodir
01/23/2024, 6:40 AMmarrrcin
01/23/2024, 8:31 AMNurmukhammad Abdul-Qodir
01/23/2024, 9:33 AMNok Lam Chan
01/23/2024, 10:33 AMNurmukhammad Abdul-Qodir
01/23/2024, 11:00 AMNurmukhammad Abdul-Qodir
01/23/2024, 11:36 AMNok Lam Chan
01/23/2024, 1:06 PMNok Lam Chan
01/23/2024, 1:07 PMNurmukhammad Abdul-Qodir
01/26/2024, 1:36 PMNurmukhammad Abdul-Qodir
01/26/2024, 1:42 PMNurmukhammad Abdul-Qodir
01/26/2024, 1:43 PMNurmukhammad Abdul-Qodir
01/26/2024, 1:45 PMif __name__ == "__main__":
dag.test()
runs the code successfully.Nurmukhammad Abdul-Qodir
01/26/2024, 1:45 PMfrom airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from pathlib import Path
from kedro.framework.startup import bootstrap_project
from kedro.framework.session import KedroSession
from kedro.framework.project import configure_project
# Define default_args dictionary to pass to the DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
# Instantiate a DAG
dag = DAG(
'simple_python_operator_example',
default_args=default_args,
description='A simple example DAG with two PythonOperators',
schedule_interval='@hourly', # Set to run once an hour
catchup=False
)
# Define Python functions for the two tasks
def task_one_function():
print("Executing Task One-------")
env = "airflow"
pipeline_name = "__default__"
project_path = Path.cwd()
bootstrap_project(Path.cwd())
with KedroSession.create(project_path=project_path,
env=env) as session:
session.run(pipeline_name)
def task_two_function():
print("Executing Task Two-------")
# Add your Task Two logic here
# Instantiate PythonOperators for the two tasks
task_one = PythonOperator(
task_id='task_one',
python_callable=task_one_function,
dag=dag,
)
task_two = PythonOperator(
task_id='task_two',
python_callable=task_two_function,
dag=dag,
)
# Set the task dependencies
task_one >> task_two
if __name__ == "__main__":
dag.test()
Nurmukhammad Abdul-Qodir
01/26/2024, 1:56 PMNok Lam Chan
01/26/2024, 2:16 PMNok Lam Chan
01/26/2024, 2:17 PMKedroOperator
, it not too different from the base one.
class KedroOperator(BaseOperator):
@apply_defaults
def __init__(
self,
package_name: str,
pipeline_name: str,
node_name: str,
project_path: str | Path,
env: str,
*args, **kwargs
) -> None:
super().__init__(*args, **kwargs)
self.package_name = package_name
self.pipeline_name = pipeline_name
self.node_name = node_name
self.project_path = project_path
self.env = env
def execute(self, context):
configure_project(self.package_name)
with KedroSession.create(project_path=self.project_path,
env=self.env) as session:
session.run(self.pipeline_name, node_names=[self.node_name])
Nok Lam Chan
01/26/2024, 2:18 PMPythonOperator
running this:
configure_project(self.package_name)
with KedroSession.create(project_path=self.project_path,
env=self.env) as session:
session.run(self.pipeline_name, node_names=[self.node_name])
Nok Lam Chan
01/26/2024, 2:19 PMNurmukhammad Abdul-Qodir
01/26/2024, 2:24 PMNurmukhammad Abdul-Qodir
01/26/2024, 2:26 PMNurmukhammad Abdul-Qodir
01/26/2024, 2:29 PMNok Lam Chan
01/26/2024, 2:30 PMIt's just stuck at "running".
Nurmukhammad Abdul-Qodir
01/26/2024, 2:33 PMNok Lam Chan
01/26/2024, 2:47 PMNok Lam Chan
01/26/2024, 2:48 PMNurmukhammad Abdul-Qodir
01/26/2024, 2:53 PMNurmukhammad Abdul-Qodir
01/29/2024, 10:13 AMNok Lam Chan
01/29/2024, 10:14 AM