Nelson Zambrano
07/23/2023, 8:25 PM_validate_unique_outputs(nodes)
via hooks or by implementing a modified Pipeline
class?Baden Ashford
07/24/2023, 9:59 AMsrc/my_repo/lambdas/
next to src/my_repo/pipelines/
and have some 3rd directory with shared code src/my_repo/shared/
, but thought there may be a different way to go about this!
Thanks!Aleksander Jaworski
07/24/2023, 11:22 AMhello_world.py
will take several seconds to run when the configuration is large enough, as first you will see all the logs and all the setup will be done for the data catalog etc, none of which would actually end up being used in a hello_world.py
2. When setting up the project for someone, it is impossible to provide a credentials file with just the required credentials. In kedro all of them need to be filled right now as it is all validated at once. In a sort of lazy version, only the dependencies that follow from the pipeline would need to be evaluated.
Are there any solutions or modifications I could use to improve my approaches here? Thanks in advance! :)Sid Shetty
07/24/2023, 2:13 PMJon Cohen
07/24/2023, 3:15 PMJon Cohen
07/24/2023, 3:17 PMEmilio Gagliardi
07/24/2023, 5:40 PMrss_feed_extract:
type: kedro_workbench.extras.datasets.RSSDataSet.RSSFeedExtract
url: <https://api.msrc.microsoft.com/update-guide/rss>
rss_feed_load:
type: kedro_workbench.extras.datasets.RSSDataSet.RSSFeedLoad
mongo_url: "mongodb+srv://<username>:<password>@bighatcluster.wamzrdr.mongodb.net/"
mongo_db: "TBD"
mongo_collection: "TBD"
mongo_table: "TBD"
credentials: mongo_atlas
nodes.py
def extract_rss_feed() -> Dict[str, Any]:
raw_rss_feed = RSSFeedExtract() # Q. how does the catalog 'url' value get passed to the __init__ method?
raw_rss_feed.load()
return {'key_1':'value_1', 'key_2': 'value_2'}
def transform_rss_feed(raw_rss_feed: Dict[str, Any]) -> List[Dict[str, Any]]:
return [{'key_1_T':'value_1_T', 'key_2_T': 'value_2_T'}]
def load_rss_feed(prepped_rss_items: List[Dict[str, Any]]) -> None:
rss_feed_load = RSSFeedLoad(prepped_rss_items) # not clear how to create the custom dataset that takes data from catalog and credentials and the previous node
rss_feed_load.save()
pipeline.py
pipeline([
node(
func=extract_rss_feed,
inputs=None,
outputs='rss_feed_for_transforming',
name="extract_rss_feed",
),
node(
func=transform_rss_feed,
inputs="rss_feed_for_transforming",
outputs='rss_for_loading',
name="transform_rss_items",
),
node(
func=load_rss_feed,
inputs="rss_for_loading",
outputs="rss_feed_load",
name="load_rss_items",
),
])
custom datasets
class RSSFeedExtract(AbstractDataSet):
def __init__(self, url: str):
self._url = URL
class RSSFeedLoad(AbstractDataSet):
def __init__(self, mongo_url: str, mongo_db: str, mongo_collection: str, mongo_table: str, credentials: Dict[str, Any], data: Any = None):
self._data = data # comes from the previous node
self._mongo_url = mongo_url
self._mongo_db = mongo_db
self._mongo_collection = mongo_collection
self._mongo_table = mongo_table
self._username = credentials['username']
self._password = credentials['password']
Jon Cohen
07/24/2023, 6:17 PMJon Cohen
07/24/2023, 6:25 PMJon Cohen
07/24/2023, 8:10 PMVIOLETA MARĂA RIVERA
07/25/2023, 10:55 PMSuyash Shrivastava
07/26/2023, 3:23 PMFile "/usr/local/lib/python3.6/site-packages/matplotlib/backends/qt_compat.py", line 175, in <module>
"Matplotlib qt-based backends require an external PyQt4, PyQt5,\n"
ImportError: Matplotlib qt-based backends require an external PyQt4, PyQt5,
or PySide package to be installed, but it was not found.
Sid Shetty
07/26/2023, 5:24 PMnulls
. Is there any workaround here that avoids me having to add another node to put these partitions together and ideally just read as a pandas.ParquetDataSet? Perhaps passing the schema of the original dataframe or even specifying it explicitly?Lim H.
07/26/2023, 6:51 PMtest:
type: CachedDataset
versioned: true
dataset:
type: pandas.JSONDataSet
filepath: ...
load_args:
lines: True
credentials: ...
doesnât work but this works
test:
type: pandas.JSONDataSet
filepath: ...
load_args:
lines: True
credentials: ...
I thought this was working at some point? I might be hallucinating though. Just want to double check quickly before I create my own CachedDataSetJ. Camilo V. Tieck
07/26/2023, 7:21 PMEmilio Gagliardi
07/26/2023, 8:57 PMlogging.yml
handlers:
...other built-in kedro handlers...
debug_file_handler:
class: logging.handlers.RotatingFileHandler
level: DEBUG
formatter: simple
filename: logs/debug.log
maxBytes: 10485760 # 10MB
backupCount: 20
encoding: utf8
delay: True
loggers:
kedro:
level: INFO
kedro_workbench:
level: INFO
DataSets:
level: DEBUG
handlers: [debug_file_handler]
root:
handlers: [rich, info_file_handler, error_file_handler]
in my module I used:
import logging
logger = logging.getLogger('DataSets')
logger.debug(output)
but when I run the pipeline, the contents of output are still written to the console. What am I missing here? thanks kindly!Fazil B. Topal
07/27/2023, 8:50 AMhooks
to be nice but without high level order of execution, not sure if i can do what i want.
Context: I am trying to play around with the data versioning to change it a bit since I would run each nodes in a different k8s pod ideally. That means, dataset versioning should match. From what i gather Session
class has this info but Im trying to find a proper how to make sure same code version + some envs ends up using the same data versioning etc.
Any help is appreciated đRahul Kumar
07/27/2023, 8:57 AMmeharji arumilli
07/27/2023, 3:08 PMkedro package
and created dag:
kedro airflow create
This created the .whl and dags.
Then using the below docker file, i have built the docker image for the kedro project:
FROM apache/airflow:2.6.3-python3.8
# install project requirements
WORKDIR /opt/test-fi/
COPY src/requirements.txt .
USER root
RUN chmod -R a+rwx /opt/test-fi/
# Install necessary packages
RUN sudo apt-get update && apt-get install -y wget gnupg2 libgomp1 && apt-get -y install git
USER airflow
COPY data/ data/
COPY conf/ conf/
COPY logs/ logs/
COPY src/ src/
COPY output/ output/
COPY dist/ dist/
COPY pyproject.toml .
RUN --mount=type=bind,src=.env,dst=conf/.env . conf/.env && python -m pip install --upgrade pip && python -m pip install -r requirements.txt && python -m pip install dist/test_fi-0.1-py3-none-any.whl
EXPOSE 8888
CMD ["kedro", "run"]
The docker image is built as:
docker build -t test_fi .
Then i have installed airflow using docker-compose.yml file in EC2 instance. And attached the docker image to the worker and scheduler services.
Tested the docker image test_fi, by docker exec into the container and ran the command `kedro run`and the project runs as expected.
However, with the airflow when the dag is triggered, i get the below error in airflow UI without much information in logs to debug. The below log is using logging_level = DEBUG
*** Found local files:
*** * /opt/airflow/logs/dag_id=test-fi/run_id=scheduled__2023-06-27T14:37:54.602904+00:00/task_id=define-project-parameters/attempt=1.log
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1037} DEBUG - previous_execution_date was called
[2023-07-27, 14:37:56 UTC] {__init__.py:51} DEBUG - Loading core task runner: StandardTaskRunner
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1037} DEBUG - previous_execution_date was called
[2023-07-27, 14:37:56 UTC] {base_task_runner.py:68} DEBUG - Planning to run as the user
[2023-07-27, 14:37:56 UTC] {taskinstance.py:789} DEBUG - Refreshing TaskInstance <TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [queued]> from DB
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1112} DEBUG - <TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [queued]> dependency 'Trigger Rule' PASSED: True, The task instance did not have any upstream tasks.
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1112} DEBUG - <TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [queued]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1112} DEBUG - <TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [queued]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1112} DEBUG - <TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [queued]> dependency 'Task Instance State' PASSED: True, Task state queued was valid.
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1112} DEBUG - <TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [queued]> dependency 'Task Instance Not Running' PASSED: True, Task is not in running state.
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [queued]>
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1112} DEBUG - <TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [queued]> dependency 'Trigger Rule' PASSED: True, The task instance did not have any upstream tasks.
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1112} DEBUG - <TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [queued]> dependency 'Task Concurrency' PASSED: True, Task concurrency is not set.
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1112} DEBUG - <TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [queued]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1112} DEBUG - <TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [queued]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1112} DEBUG - <TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [queued]> dependency 'Pool Slots Available' PASSED: True, There are enough open slots in default_pool to execute the task
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [queued]>
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1308} INFO - Starting attempt 1 of 2
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1327} INFO - Executing <Task(KedroOperator): define-project-parameters> on 2023-06-27 14:37:54.602904+00:00
[2023-07-27, 14:37:56 UTC] {standard_task_runner.py:57} INFO - Started process 85 to run task
[2023-07-27, 14:37:56 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'test-fi', 'define-project-parameters', 'scheduled__2023-06-27T14:37:54.602904+00:00', '--job-id', '884', '--raw', '--subdir', 'DAGS_FOLDER/test_fi_dag.py', '--cfg-path', '/tmp/tmpu1fp72mc']
[2023-07-27, 14:37:56 UTC] {standard_task_runner.py:85} INFO - Job 884: Subtask define-project-parameters
[2023-07-27, 14:37:56 UTC] {cli_action_loggers.py:65} DEBUG - Calling callbacks: [<function default_action_log at 0x7f4f6b6038b0>]
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1037} DEBUG - previous_execution_date was called
[2023-07-27, 14:37:56 UTC] {task_command.py:410} INFO - Running <TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [running]> on host e1be34e2e4d4
[2023-07-27, 14:37:56 UTC] {settings.py:353} DEBUG - Disposing DB connection pool (PID 85)
[2023-07-27, 14:37:56 UTC] {settings.py:212} DEBUG - Setting up DB connection pool (PID 85)
[2023-07-27, 14:37:56 UTC] {settings.py:285} DEBUG - settings.prepare_engine_args(): Using NullPool
[2023-07-27, 14:37:56 UTC] {taskinstance.py:789} DEBUG - Refreshing TaskInstance <TaskInstance: test-fi.define-project-parameters scheduled__2023-06-27T14:37:54.602904+00:00 [running]> from DB
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1037} DEBUG - previous_execution_date was called
[2023-07-27, 14:37:56 UTC] {taskinstance.py:868} DEBUG - Clearing XCom data
[2023-07-27, 14:37:56 UTC] {retries.py:80} DEBUG - Running RenderedTaskInstanceFields.write with retries. Try 1 of 3
[2023-07-27, 14:37:56 UTC] {retries.py:80} DEBUG - Running RenderedTaskInstanceFields._do_delete_old_records with retries. Try 1 of 3
[2023-07-27, 14:37:56 UTC] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='test-fi' AIRFLOW_CTX_TASK_ID='define-project-parameters' AIRFLOW_CTX_EXECUTION_DATE='2023-06-27T14:37:54.602904+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2023-06-27T14:37:54.602904+00:00'
[2023-07-27, 14:37:56 UTC] {__init__.py:117} DEBUG - Preparing lineage inlets and outlets
[2023-07-27, 14:37:56 UTC] {__init__.py:158} DEBUG - inlets: [], outlets: []
`[2023-07-27, 143757 UTC] {store.py:32} INFO - read()
not implemented for BaseSessionStore
. Assuming empty store.`
[2023-07-27, 14:37:57 UTC] {session.py:50} WARNING - Unable to git describe /opt/test-fi
[2023-07-27, 14:37:57 UTC] {logging_mixin.py:150} INFO - Model version 20230727-143757
[2023-07-27, 14:37:57 UTC] {common.py:123} DEBUG - Loading config file: '/opt/test-fi/conf/base/logging.yml'
[2023-07-27, 14:37:57 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code Negsignal.SIGABRT
Can anyone offer help to fix this. It seems to be related to the line ``DEBUG - Loading config file: '/opt/test-fi/conf/base/logging.yml'``jyoti goyal
07/27/2023, 6:20 PMEmilio Gagliardi
07/28/2023, 4:52 AMpipeline([
node(
func=extract_rss_feed,
inputs='rss_feed_extract',
outputs='rss_feed_for_transforming',
name="extract_rss_feed",
),
node(
func=transform_rss_feed,
inputs=['rss_feed_for_transforming', 'params:rss_1'],
outputs='rss_feed_for_loading',
name="transform_rss_feed",
),
node(
func=load_rss_feed,
inputs='rss_feed_for_loading', <- incoming data (in memory)
outputs='rss_feed_load', <- calls the _save() of the class
name="load_rss_feed",
),
])
nodes.py
If all the save logic is in the class, then there's nothing for the function to do...what am I missing here? what typically goes in the function whose output is a dataset?
def load_rss_feed(preprocessed_rss_feed):
pass
When I try to run the pipeline, I get the following error:
DatasetError: Saving 'None' to a 'Dataset' is not allowed
thanks for your thoughts!Rachid Cherqaoui
07/28/2023, 8:54 AM_mysql : &mysql
type: pandas.SQLQueryDataSet
credentials:
con: mysql+mysqlconnector://${mysql_connect.username}:${mysql_connect.password}@${mysql_connect.host}:${mysql_connect.port}/${mysql_connect.database}
table_insurers:
<<: *mysql
sql: select * from underwriter_insurers
table_ccns:
<<: *mysql
sql: select * from underwriter_ccns
table_departments:
<<: *mysql
sql: select * from underwriter_departments
and this is the error produced :
2023-07-26 09:33:48 - src.api.tarificateur_compte - ERROR - An error occurred in tarificateur_compte():
Traceback (most recent call last):
File "/home/debian/anaconda3/envs/env_tarificateur/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1808, in _execute_context
context = constructor(
File "/home/debian/anaconda3/envs/env_tarificateur/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 1346, in _init_statement
self.cursor = self.create_cursor()
File "/home/debian/anaconda3/envs/env_tarificateur/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 1530, in create_cursor
return self.create_default_cursor()
File "/home/debian/anaconda3/envs/env_tarificateur/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 1533, in create_default_cursor
return self._dbapi_connection.cursor()
File "/home/debian/anaconda3/envs/env_tarificateur/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 1494, in cursor
return self.dbapi_connection.cursor(*args, **kwargs)
File "/home/debian/anaconda3/envs/env_tarificateur/lib/python3.10/site-packages/mysql/connector/connection_cext.py", line 678, in cursor
raise OperationalError("MySQL Connection not available.")
mysql.connector.errors.OperationalError: MySQL Connection not available.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/debian/anaconda3/envs/env_tarificateur/lib/python3.10/site-packages/kedro/io/core.py", line 210, in load
return self._load()
File "/home/debian/anaconda3/envs/env_tarificateur/lib/python3.10/site-packages/kedro_datasets/pandas/sql_dataset.py", line 512, in _load
return pd.read_sql_query(con=engine, **load_args)
File "/home/debian/anaconda3/envs/env_tarificateur/lib/python3.10/site-packages/pandas/io/sql.py", line 467, in read_sql_query
return pandas_sql.read_query(
File "/home/debian/anaconda3/envs/env_tarificateur/lib/python3.10/site-packages/pandas/io/sql.py", line 1736, in read_query
result = self.execute(sql, params)
File "/home/debian/anaconda3/envs/env_tarificateur/lib/python3.10/site-packages/pandas/io/sql.py", line 1560, in execute
return self.con.exec_driver_sql(sql, *args)
File "/home/debian/anaconda3/envs/env_tarificateur/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1772, in exec_driver_sql
ret = self._execute_context(
File "/home/debian/anaconda3/envs/env_tarificateur/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1814, in _execute_context
self._handle_dbapi_exception(
File "/home/debian/anaconda3/envs/env_tarificateur/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2326, in _handle_dbapi_exception
raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
File "/home/debian/anaconda3/envs/env_tarificateur/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1808, in _execute_context
context = constructor(
File "/home/debian/anaconda3/envs/env_tarificateur/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 1346, in _init_statement
self.cursor = self.create_cursor()
File "/home/debian/anaconda3/envs/env_tarificateur/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 1530, in create_cursor
return self.create_default_cursor()
File "/home/debian/anaconda3/envs/env_tarificateur/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 1533, in create_default_cursor
return self._dbapi_connection.cursor()
File "/home/debian/anaconda3/envs/env_tarificateur/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 1494, in cursor
return self.dbapi_connection.cursor(*args, **kwargs)
File "/home/debian/anaconda3/envs/env_tarificateur/lib/python3.10/site-packages/mysql/connector/connection_cext.py", line 678, in cursor
raise OperationalError("MySQL Connection not available.")
sqlalchemy.exc.OperationalError: (mysql.connector.errors.OperationalError) MySQL Connection not available.
Can anyone help me fix this problem because I have tried everything I can but I have not managed to solve it, thank you in advance.Mate Scharnitzky
07/28/2023, 11:21 AM/notebook
directory. Iâm trying to load some nodes that I created locally but it doesnât find path. Two questions:
âą How can load kedro context to this notebook?
âą How can I load python modules developed as part of the kedro project?
Thank you!
I looked into this but somehow I canât make it work:
https://docs.kedro.org/en/0.18.11/notebooks_and_ipython/kedro_and_notebooks.htmlHygor Xavier AraĂșjo
07/28/2023, 5:59 PMJ. Camilo V. Tieck
07/28/2023, 8:28 PMdocker buildx build --platform=linux/amd64 -t <image-name> .
Is there a âkedro dockerâ way of doing this?
thanks!Erwin
07/29/2023, 2:36 AM# Enable automatic schema evolution
spark.sql("SET spark.databricks.delta.schema.autoMerge.enabled = true")
Daniel Lee
07/31/2023, 5:44 AMbrew install libomp
, Iâm encountering an error where it says (mach-o file, but is an incompatible architecture (have 'arm64', need 'x86_64'))
. Do you know how I can go around this issue if itâs related to the architecture? And normally how is this being resolved?LinenBot
07/31/2023, 9:04 AMtynan.debold
joined #questions.Baden Ashford
07/31/2023, 11:16 AMFazil B. Topal
07/31/2023, 11:46 AM