meharji arumilli
07/27/2023, 3:08 PMkedro package
and created dag:
kedro airflow create
This created the .whl and dags.
Then using the below docker file, i have built the docker image for the kedro project:
FROM apache/airflow:2.6.3-python3.8
# install project requirements
WORKDIR /opt/test-fi/
COPY src/requirements.txt .
USER root
RUN chmod -R a+rwx /opt/test-fi/
# Install necessary packages
RUN sudo apt-get update && apt-get install -y wget gnupg2 libgomp1 && apt-get -y install git
USER airflow
COPY data/ data/
COPY conf/ conf/
COPY logs/ logs/
COPY src/ src/
COPY output/ output/
COPY dist/ dist/
COPY pyproject.toml .
RUN --mount=type=bind,src=.env,dst=conf/.env . conf/.env && python -m pip install --upgrade pip && python -m pip install -r requirements.txt && python -m pip install dist/test_fi-0.1-py3-none-any.whl
EXPOSE 8888
CMD ["kedro", "run"]
The docker image is built as:
docker build -t test_fi .
Then i have installed airflow using docker-compose.yml file in EC2 instance. And attached the docker image to the worker and scheduler services.
Tested the docker image test_fi, by docker exec into the container and ran the command `kedro run`and the project runs as expected.
However, with the airflow when the dag is triggered, i get the below error in airflow UI without much information in logs to debug. The below log is using logging_level = DEBUG
*** Found local files:
*** * /opt/airflow/logs/dag_id=test-fi/run_id=scheduled__2023-06-27T14:37:54.602904+00:00/task_id=define-project-parameters/attempt=1.log
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1037} DEBUG - previous_execution_date was called
[2023-07-27, 14:37:56 UTC] {__init__.py:51} DEBUG - Loading core task runner: StandardTaskRunner
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1037} DEBUG - previous_execution_date was called
[2023-07-27, 14:37:56 UTC] {base_task_runner.py:68} DEBUG - Planning to run as the user
[2023-07-27, 14:37:56 UTC] {taskinstance.py:789} DEBUG - Refreshing TaskInstance <TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [queued]> from DB
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1112} DEBUG - <TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [queued]> dependency 'Trigger Rule' PASSED: True, The task instance did not have any upstream tasks.
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1112} DEBUG - <TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [queued]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1112} DEBUG - <TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [queued]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1112} DEBUG - <TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [queued]> dependency 'Task Instance State' PASSED: True, Task state queued was valid.
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1112} DEBUG - <TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [queued]> dependency 'Task Instance Not Running' PASSED: True, Task is not in running state.
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [queued]>
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1112} DEBUG - <TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [queued]> dependency 'Trigger Rule' PASSED: True, The task instance did not have any upstream tasks.
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1112} DEBUG - <TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [queued]> dependency 'Task Concurrency' PASSED: True, Task concurrency is not set.
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1112} DEBUG - <TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [queued]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1112} DEBUG - <TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [queued]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1112} DEBUG - <TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [queued]> dependency 'Pool Slots Available' PASSED: True, There are enough open slots in default_pool to execute the task
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [queued]>
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1308} INFO - Starting attempt 1 of 2
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1327} INFO - Executing <Task(KedroOperator): define-project-parameters> on 2023-06-27 14:37:54.602904+00:00
[2023-07-27, 14:37:56 UTC] {standard_task_runner.py:57} INFO - Started process 85 to run task
[2023-07-27, 14:37:56 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'test-fi', 'define-project-parameters', 'scheduled__2023-06-27T14:37:54.602904+00:00', '--job-id', '884', '--raw', '--subdir', 'DAGS_FOLDER/test_fi_dag.py', '--cfg-path', '/tmp/tmpu1fp72mc']
[2023-07-27, 14:37:56 UTC] {standard_task_runner.py:85} INFO - Job 884: Subtask define-project-parameters
[2023-07-27, 14:37:56 UTC] {cli_action_loggers.py:65} DEBUG - Calling callbacks: [<function default_action_log at 0x7f4f6b6038b0>]
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1037} DEBUG - previous_execution_date was called
[2023-07-27, 14:37:56 UTC] {task_command.py:410} INFO - Running <TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [running]> on host e1be34e2e4d4
[2023-07-27, 14:37:56 UTC] {settings.py:353} DEBUG - Disposing DB connection pool (PID 85)
[2023-07-27, 14:37:56 UTC] {settings.py:212} DEBUG - Setting up DB connection pool (PID 85)
[2023-07-27, 14:37:56 UTC] {settings.py:285} DEBUG - settings.prepare_engine_args(): Using NullPool
[2023-07-27, 14:37:56 UTC] {taskinstance.py:789} DEBUG - Refreshing TaskInstance <TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [running]> from DB
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1037} DEBUG - previous_execution_date was called
[2023-07-27, 14:37:56 UTC] {taskinstance.py:868} DEBUG - Clearing XCom data
[2023-07-27, 14:37:56 UTC] {retries.py:80} DEBUG - Running RenderedTaskInstanceFields.write with retries. Try 1 of 3
[2023-07-27, 14:37:56 UTC] {retries.py:80} DEBUG - Running RenderedTaskInstanceFields._do_delete_old_records with retries. Try 1 of 3
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='test-fi' AIRFLOW_CTX_TASK_ID='define-project-parameters' AIRFLOW_CTX_EXECUTION_DATE='2023-06-27T14:37:54.602904+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2023-06-27T14:37:54.602904+00:00'
[2023-07-27, 14:37:56 UTC] {__init__.py:117} DEBUG - Preparing lineage inlets and outlets
[2023-07-27, 14:37:56 UTC] {__init__.py:158} DEBUG - inlets: [], outlets: []
`[2023-07-27, 143757 UTC] {store.py:32} INFO - read()
not implemented for BaseSessionStore
. Assuming empty store.`
[2023-07-27, 14:37:57 UTC] {session.py:50} WARNING - Unable to git describe /opt/test-fi
[2023-07-27, 14:37:57 UTC] {logging_mixin.py:150} INFO - Model version 20230727-143757
[2023-07-27, 14:37:57 UTC] {common.py:123} DEBUG - Loading config file: '/opt/test-fi/conf/base/logging.yml'
[2023-07-27, 14:37:57 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code Negsignal.SIGABRT
Can anyone offer help to fix this. It seems to be related to the line ``DEBUG - Loading config file: '/opt/test-fi/conf/base/logging.yml'``Nok Lam Chan
07/27/2023, 4:22 PMNegsignal.SIG
error looks very familiar. You may try this to see if its resolve the problem
https://github.com/kedro-org/kedro-plugins/issues/13#issuecomment-1088710755[2023-07-27, 143757 UTC] {store.py:32} INFO -I see this error message, which suggests you are using relatively old kedro version, if it’s possible I’ll advise upgrade the Kedro versionnot implemented forread()
. Assuming empty store.BaseSessionStore
meharji arumilli
07/27/2023, 4:57 PMNok Lam Chan
07/27/2023, 5:01 PMmeharji arumilli
07/27/2023, 5:03 PM"disable_existing_loggers": True
in logging.yml file. I will update if it solvesfrom kedro.config import TemplatedConfigLoader
class MyTemplatedConfigLoader(TemplatedConfigLoader):
def __init__(self, conf_source, env, runtime_params):
self.params = os.environ
self._update_model_version_to_parameters()
self._update_project_parameters()
super().__init__(conf_source=conf_source, env=env, runtime_params=runtime_params, globals_dict=self.params)
def _update_model_version_to_parameters(self):
if 'model_version' not in self.params.keys():
version = datetime.now().strftime('%Y%m%d-%H%M%S')
self.params['model_version'] = version
print(f'Model version {version}')
2) I do not see the errors/logs as the pipeline failed at some node, as it now only shows exited with error code 1Nok Lam Chan
07/27/2023, 6:20 PMNormally it has one model version for the whole project.Do you mean when you do
kedro run
locally?kedro airflow
map each node to a separate task but you will be responsible to make the judgement how it should be organised.2) I do not see the errors/logs as the pipeline failed at some node, as it now only shows exited with error code 1If you go into the Airflow UI it doesn’t show any log?
meharji arumilli
07/27/2023, 7:24 PM[2023-07-27T17:41:31.694+0000] {common.py:123} DEBUG - Loading config file: '/opt/test-fi/conf/base/logging.yml'
[2023-07-27T17:41:36.018+0000] {taskinstance.py:789} DEBUG - Refreshing TaskInstance <TaskInstance: test-fi.train-model manual__2023-07-27T17:38:28.224721+00:00 [running]> from DB
[2023-07-27T17:41:36.039+0000] {job.py:210} DEBUG - [heartbeat]
[2023-07-27T17:41:37.392+0000] {local_task_job_runner.py:225} INFO - Task exited with return code 1
kedro run
in the container it has only 1 model version for the whole project.Nok Lam Chan
07/27/2023, 8:37 PMmeharji arumilli
07/28/2023, 9:30 AMTask exited with return code Negsignal.SIGABRT
. And setting disable_existing_loggers: True
runs the project, but is not capturing the logs in airflow UI, and it returns only Task exited with return code 1
or Task exited with return code 0
version: 1
disable_existing_loggers: False
formatters:
simple:
format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
json_formatter:
format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
class: pythonjsonlogger.jsonlogger.JsonFormatter
handlers:
console:
class: logging.StreamHandler
level: INFO
formatter: simple
stream: <ext://sys.stdout>
info_file_handler:
class: logging.handlers.RotatingFileHandler
level: INFO
formatter: simple
filename: logs/info.log
maxBytes: 10485760 # 10MB
backupCount: 20
encoding: utf8
delay: True
error_file_handler:
class: logging.handlers.RotatingFileHandler
level: ERROR
formatter: simple
filename: logs/errors.log
maxBytes: 10485760 # 10MB
backupCount: 20
encoding: utf8
delay: True
loggers:
anyconfig:
level: WARNING
handlers: [console, info_file_handler, error_file_handler]
propagate: no
<http://kedro.io|kedro.io>:
level: INFO
handlers: [console, info_file_handler, error_file_handler]
propagate: no
kedro.pipeline:
level: INFO
handlers: [console, info_file_handler, error_file_handler]
propagate: no
root:
level: INFO
handlers: [console, info_file_handler, error_file_handler]
When the logging.yml is changed as below, even with disable_existing_loggers: False
, it does not give Task exited with return code Negsignal.SIGABRT
error unlike the above version of loggin.yml. This still gives the error codes`Task exited with return code 1` or Task exited with return code 0
in airflow UI.
version: 1
disable_existing_loggers: False
formatters:
simple:
format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
handlers:
console:
class: logging.StreamHandler
level: INFO
formatter: simple
stream: <ext://sys.stdout>
info_file_handler:
class: logging.handlers.RotatingFileHandler
level: INFO
formatter: simple
filename: logs/info.log
maxBytes: 10485760 # 10MB
backupCount: 20
encoding: utf8
delay: True
error_file_handler:
class: logging.handlers.RotatingFileHandler
level: ERROR
formatter: simple
filename: logs/errors.log
maxBytes: 10485760 # 10MB
backupCount: 20
encoding: utf8
delay: True
loggers:
kedro:
level: INFO
test-fi:
level: INFO
root:
handlers: [info_file_handler, error_file_handler]
The actual logs are visible inside the kedro project, logs/info.log and logs/errors.log files but not in the airflow UI. Does this help any to troubleshoot.root:
handlers: [console, info_file_handler, error_file_handler]
and adding console to the root: again gives Task exited with return code Negsignal.SIGABRT
it has something to do with logging.yml file