Hello all, I have packaged the kedro project as: ...
# questions
m
Hello all, I have packaged the kedro project as:
kedro package
and created dag:
kedro airflow create
This created the .whl and dags. Then using the below docker file, i have built the docker image for the kedro project:
FROM apache/airflow:2.6.3-python3.8
# install project requirements
WORKDIR /opt/test-fi/
COPY src/requirements.txt .
USER root
RUN chmod -R a+rwx /opt/test-fi/
# Install necessary packages
RUN sudo apt-get update && apt-get install -y wget gnupg2 libgomp1 && apt-get -y install git
USER airflow
COPY data/ data/
COPY conf/ conf/
COPY logs/ logs/
COPY src/ src/
COPY output/ output/
COPY dist/ dist/
COPY pyproject.toml .
RUN --mount=type=bind,src=.env,dst=conf/.env . conf/.env && python -m pip install --upgrade pip && python -m pip install -r requirements.txt && python -m pip install dist/test_fi-0.1-py3-none-any.whl
EXPOSE 8888
CMD ["kedro", "run"]
The docker image is built as:
docker build -t test_fi .
Then i have installed airflow using docker-compose.yml file in EC2 instance. And attached the docker image to the worker and scheduler services. Tested the docker image test_fi, by docker exec into the container and ran the command `kedro run`and the project runs as expected. However, with the airflow when the dag is triggered, i get the below error in airflow UI without much information in logs to debug. The below log is using
logging_level = DEBUG
*** Found local files:
***   * /opt/airflow/logs/dag_id=test-fi/run_id=scheduled__2023-06-27T14:37:54.602904+00:00/task_id=define-project-parameters/attempt=1.log
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1037} DEBUG - previous_execution_date was called
[2023-07-27, 14:37:56 UTC] {__init__.py:51} DEBUG - Loading core task runner: StandardTaskRunner
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1037} DEBUG - previous_execution_date was called
[2023-07-27, 14:37:56 UTC] {base_task_runner.py:68} DEBUG - Planning to run as the  user
[2023-07-27, 14:37:56 UTC] {taskinstance.py:789} DEBUG - Refreshing TaskInstance <TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [queued]> from DB
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1112} DEBUG - <TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [queued]> dependency 'Trigger Rule' PASSED: True, The task instance did not have any upstream tasks.
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1112} DEBUG - <TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [queued]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1112} DEBUG - <TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [queued]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1112} DEBUG - <TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [queued]> dependency 'Task Instance State' PASSED: True, Task state queued was valid.
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1112} DEBUG - <TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [queued]> dependency 'Task Instance Not Running' PASSED: True, Task is not in running state.
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [queued]>
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1112} DEBUG - <TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [queued]> dependency 'Trigger Rule' PASSED: True, The task instance did not have any upstream tasks.
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1112} DEBUG - <TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [queued]> dependency 'Task Concurrency' PASSED: True, Task concurrency is not set.
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1112} DEBUG - <TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [queued]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1112} DEBUG - <TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [queued]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1112} DEBUG - <TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [queued]> dependency 'Pool Slots Available' PASSED: True, There are enough open slots in default_pool to execute the task
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [queued]>
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1308} INFO - Starting attempt 1 of 2
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1327} INFO - Executing <Task(KedroOperator): define-project-parameters> on 2023-06-27 14:37:54.602904+00:00
[2023-07-27, 14:37:56 UTC] {standard_task_runner.py:57} INFO - Started process 85 to run task
[2023-07-27, 14:37:56 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'test-fi', 'define-project-parameters', 'scheduled__2023-06-27T14:37:54.602904+00:00', '--job-id', '884', '--raw', '--subdir', 'DAGS_FOLDER/test_fi_dag.py', '--cfg-path', '/tmp/tmpu1fp72mc']
[2023-07-27, 14:37:56 UTC] {standard_task_runner.py:85} INFO - Job 884: Subtask define-project-parameters
[2023-07-27, 14:37:56 UTC] {cli_action_loggers.py:65} DEBUG - Calling callbacks: [<function default_action_log at 0x7f4f6b6038b0>]
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1037} DEBUG - previous_execution_date was called
[2023-07-27, 14:37:56 UTC] {task_command.py:410} INFO - Running <TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [running]> on host e1be34e2e4d4
[2023-07-27, 14:37:56 UTC] {settings.py:353} DEBUG - Disposing DB connection pool (PID 85)
[2023-07-27, 14:37:56 UTC] {settings.py:212} DEBUG - Setting up DB connection pool (PID 85)
[2023-07-27, 14:37:56 UTC] {settings.py:285} DEBUG - settings.prepare_engine_args(): Using NullPool
[2023-07-27, 14:37:56 UTC] {taskinstance.py:789} DEBUG - Refreshing TaskInstance <TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [running]> from DB
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1037} DEBUG - previous_execution_date was called
[2023-07-27, 14:37:56 UTC] {taskinstance.py:868} DEBUG - Clearing XCom data
[2023-07-27, 14:37:56 UTC] {retries.py:80} DEBUG - Running RenderedTaskInstanceFields.write with retries. Try 1 of 3
[2023-07-27, 14:37:56 UTC] {retries.py:80} DEBUG - Running RenderedTaskInstanceFields._do_delete_old_records with retries. Try 1 of 3
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='test-fi' AIRFLOW_CTX_TASK_ID='define-project-parameters' AIRFLOW_CTX_EXECUTION_DATE='2023-06-27T14:37:54.602904+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2023-06-27T14:37:54.602904+00:00'
[2023-07-27, 14:37:56 UTC] {__init__.py:117} DEBUG - Preparing lineage inlets and outlets
[2023-07-27, 14:37:56 UTC] {__init__.py:158} DEBUG - inlets: [], outlets: []
`[2023-07-27, 143757 UTC] {store.py:32} INFO -
read()
not implemented for
BaseSessionStore
. Assuming empty store.`
[2023-07-27, 14:37:57 UTC] {session.py:50} WARNING - Unable to git describe /opt/test-fi
[2023-07-27, 14:37:57 UTC] {logging_mixin.py:150} INFO - Model version 20230727-143757
[2023-07-27, 14:37:57 UTC] {common.py:123} DEBUG - Loading config file: '/opt/test-fi/conf/base/logging.yml'
[2023-07-27, 14:37:57 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code Negsignal.SIGABRT
Can anyone offer help to fix this. It seems to be related to the line ``DEBUG - Loading config file: '/opt/test-fi/conf/base/logging.yml'``
n
Which version of Kedro are you using?
The
Negsignal.SIG
error looks very familiar. You may try this to see if its resolve the problem https://github.com/kedro-org/kedro-plugins/issues/13#issuecomment-1088710755
[2023-07-27, 143757 UTC] {store.py:32} INFO -
read()
not implemented for
BaseSessionStore
. Assuming empty store.
I see this error message, which suggests you are using relatively old kedro version, if it’s possible I’ll advise upgrade the Kedro version
m
@Nok Lam Chan im using kedro, version 0.18.1
n
I haven’t used Airflow for a while, but please let me know if the above fix works for you.
m
im trying as suggested in the thread by setting
"disable_existing_loggers": True
in logging.yml file. I will update if it solves
👍🏼 1
@Nok Lam Chan it did worked. However there are new issues. 1) For each node or task in the dag it picks new model version. Normally it has one model version for the whole project.
Copy code
from kedro.config import TemplatedConfigLoader


class MyTemplatedConfigLoader(TemplatedConfigLoader):
    def __init__(self, conf_source, env, runtime_params):
        self.params = os.environ
        self._update_model_version_to_parameters()
        self._update_project_parameters()
        super().__init__(conf_source=conf_source, env=env, runtime_params=runtime_params, globals_dict=self.params)

    def _update_model_version_to_parameters(self):
        if 'model_version' not in self.params.keys():
            version = datetime.now().strftime('%Y%m%d-%H%M%S')
            self.params['model_version'] = version
            print(f'Model version {version}')
2) I do not see the errors/logs as the pipeline failed at some node, as it now only shows exited with error code 1
👍🏼 1
n
Normally it has one model version for the whole project.
Do you mean when you do
kedro run
locally?
I am not sure how these model version are update. One important thing to note is that when you run Kedro on orchestrator, they are not longer running in the same process. By default
kedro airflow
map each node to a separate task but you will be responsible to make the judgement how it should be organised.
2) I do not see the errors/logs as the pipeline failed at some node, as it now only shows exited with error code 1
If you go into the Airflow UI it doesn’t show any log?
m
no it is not showing any log of the kedro processing steps which it normally does. It shows the same as before but instead of the line ´`INFO - Task exited with return code Negsignal.SIGABRT`´. it shows
[2023-07-27T17:41:31.694+0000] {common.py:123} DEBUG - Loading config file: '/opt/test-fi/conf/base/logging.yml'
[2023-07-27T17:41:36.018+0000] {taskinstance.py:789} DEBUG - Refreshing TaskInstance <TaskInstance: test-fi.train-model manual__2023-07-27T17:38:28.224721+00:00 [running]> from DB
[2023-07-27T17:41:36.039+0000] {job.py:210} DEBUG - [heartbeat]
[2023-07-27T17:41:37.392+0000] {local_task_job_runner.py:225} INFO - Task exited with return code 1
Regarding the first issue, when it is run as
kedro run
in the container it has only 1 model version for the whole project.
n
Regarding to model version, I think this is related. https://kedro-org.slack.com/archives/C03RKP2LW64/p1690447843976229?thread_ts=1690447843.976229&amp;cid=C03RKP2LW64 It is because when you run it distributedly they no longer share the same timestamp
Each node is run in a separate process, thus they won't share the same timestamp.
Regard to the airflow issue, I am afraid I cannot be much help here.
m
Some insights, when i have logging.yml file as below it returns the error
Task exited with return code Negsignal.SIGABRT
. And setting
disable_existing_loggers: True
runs the project, but is not capturing the logs in airflow UI, and it returns only
Task exited with return code 1
or
Task exited with return code 0
Copy code
version: 1
disable_existing_loggers: False
formatters:
    simple:
        format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
    json_formatter:
        format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
        class: pythonjsonlogger.jsonlogger.JsonFormatter

handlers:
    console:
        class: logging.StreamHandler
        level: INFO
        formatter: simple
        stream: <ext://sys.stdout>

    info_file_handler:
        class: logging.handlers.RotatingFileHandler
        level: INFO
        formatter: simple
        filename: logs/info.log
        maxBytes: 10485760 # 10MB
        backupCount: 20
        encoding: utf8
        delay: True

    error_file_handler:
        class: logging.handlers.RotatingFileHandler
        level: ERROR
        formatter: simple
        filename: logs/errors.log
        maxBytes: 10485760 # 10MB
        backupCount: 20
        encoding: utf8
        delay: True

loggers:
    anyconfig:
        level: WARNING
        handlers: [console, info_file_handler, error_file_handler]
        propagate: no

    <http://kedro.io|kedro.io>:
        level: INFO
        handlers: [console, info_file_handler, error_file_handler]
        propagate: no

    kedro.pipeline:
        level: INFO
        handlers: [console, info_file_handler, error_file_handler]
        propagate: no

root:
    level: INFO
    handlers: [console, info_file_handler, error_file_handler]
When the logging.yml is changed as below, even with
disable_existing_loggers: False
, it does not give
Task exited with return code Negsignal.SIGABRT
error unlike the above version of loggin.yml. This still gives the error codes`Task exited with return code 1` or
Task exited with return code 0
in airflow UI.
Copy code
version: 1

disable_existing_loggers: False

formatters:
  simple:
    format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"

handlers:
  console:
    class: logging.StreamHandler
    level: INFO
    formatter: simple
    stream: <ext://sys.stdout>

  info_file_handler:
    class: logging.handlers.RotatingFileHandler
    level: INFO
    formatter: simple
    filename: logs/info.log
    maxBytes: 10485760 # 10MB
    backupCount: 20
    encoding: utf8
    delay: True

  error_file_handler:
    class: logging.handlers.RotatingFileHandler
    level: ERROR
    formatter: simple
    filename: logs/errors.log
    maxBytes: 10485760 # 10MB
    backupCount: 20
    encoding: utf8
    delay: True

loggers:
  kedro:
    level: INFO

  test-fi:
    level: INFO

root:
  handlers: [info_file_handler, error_file_handler]
The actual logs are visible inside the kedro project, logs/info.log and logs/errors.log files but not in the airflow UI. Does this help any to troubleshoot.
Copy code
root:
  handlers: [console, info_file_handler, error_file_handler]
and adding console to the root: again gives
Task exited with return code Negsignal.SIGABRT
it has something to do with logging.yml file