Flavien
07/31/2023, 3:42 PMManagedTableDataset
— which works great too — and run different independent pipelines defined on the same project, but I did not manage to do so.
I modified the databricks_run.py
to account for a --pipeline
option but I think the problem is in packaging the project which does not take into account pipelines created through kedro pipeline create
if I am not mistaken (but I probably am). Would you point me towards my mistake?
Thanks!Jon Cohen
07/31/2023, 6:23 PMEmilio Gagliardi
07/31/2023, 8:32 PMmeharji arumilli
08/01/2023, 8:46 AM*** Found local files:
*** * /opt/airflow/logs/dag_id=test-fi/run_id=scheduled__2023-07-02T08:24:20.451204+00:00/task_id=preprocess/attempt=1.log
[2023-08-01, 08:24:21 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: test-fi.preprocess scheduled__2023-07-02T08:24:20.451204+00:00 [queued]>
[2023-08-01, 08:24:21 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: test-fi.preprocess scheduled__2023-07-02T08:24:20.451204+00:00 [queued]>
[2023-08-01, 08:24:21 UTC] {taskinstance.py:1308} INFO - Starting attempt 1 of 2
[2023-08-01, 08:24:21 UTC] {taskinstance.py:1327} INFO - Executing <Task(KedroOperator): preprocess> on 2023-07-02 08:24:20.451204+00:00
[2023-08-01, 08:24:21 UTC] {standard_task_runner.py:57} INFO - Started process 114 to run task
[2023-08-01, 08:24:21 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'test-fi', 'preprocess', 'scheduled__2023-07-02T08:24:20.451204+00:00', '--job-id', '486', '--raw', '--subdir', 'DAGS_FOLDER/test_fi_dag.py', '--cfg-path', '/tmp/tmpzsz4yrlp']
[2023-08-01, 08:24:21 UTC] {standard_task_runner.py:85} INFO - Job 486: Subtask preprocess
[2023-08-01, 08:24:21 UTC] {task_command.py:410} INFO - Running <TaskInstance: test-fi.preprocess scheduled__2023-07-02T08:24:20.451204+00:00 [running]> on host 829fb522c236
[2023-08-01, 08:24:21 UTC] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='test-fi' AIRFLOW_CTX_TASK_ID='preprocess-rre' AIRFLOW_CTX_EXECUTION_DATE='2023-07-02T08:24:20.451204+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2023-07-02T08:24:20.451204+00:00'
[2023-08-01, 08:24:21 UTC] {test_fi_dag.py:61} INFO - Executing task preprocess, using model version: 20230801
[2023-08-01, 08:37:16 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code 1
Can anyone make a configuration suggestion that could show the complete process log in the airflow UI. Thanks!!Jordan Barlow
08/01/2023, 9:33 AM.sql
file?
shuttle_id_dataset:
type: pandas.SQLQueryDataSet
sql: data/path/to/query.sql
credentials: db_credentials
Elena Mironova
08/01/2023, 1:24 PMkedro-datasets==1.5.0
, our CI started failing during system tests which do a kedro run
for a pipeline with spark (see the screenshot). As far as i can see, SparkDataSet
is still defined with the same name as before. When we used kedro-datasets==1.4.2
the same tests were running smoothly. I also couldn't find anything specific in the release notes - do we have to update our code (mb some import statements or how it is specified within the requirements)?Erwin
08/01/2023, 7:42 PMkedro run,
since _resolve_credentials
fails [i dont have any credential in my project]
AttributeError: 'str' object has no attribute 'items'
meharji arumilli
08/02/2023, 9:12 AMclass MyTemplatedConfigLoader(TemplatedConfigLoader):
def __init__(self, conf_source, env, runtime_params):
os.environ["model_version"] = datetime.now().strftime('%Y%m%d-%H%M%S')
self.params = os.environ
super().__init__(conf_source=conf_source, env=env, runtime_params=runtime_params, globals_dict=self.params)
CONFIG_LOADER_CLASS = MyTemplatedConfigLoader
This generates a unique model_version when the project is run in kedro. This model_version
is used in the filepaths in Catalog to save the outputs from different nodes.
However, when this kedro project is packaged and run in airflow, each node is generating
a new model_version which causes the subsequent nodes to fail as it expects the output (file path with model_version)
from previous node as input.
Does anyone working with kedro and airflow offer a hack for this to keep the model_version unique across all nodes or tasks in airflow?Fazil B. Topal
08/02/2023, 1:14 PMkedro run
or do i have to explicitly use this python object and load the data on my own?
• Is it possible to define some sections in the yaml file and other parts in python? I know i can do something in the hooks but I wanted to check if there is way where this catalog variable would be accessible by the user?
Thanks in advance! 🙂Trevor
08/02/2023, 5:15 PMTrevor
08/02/2023, 5:35 PMdate()
that simply gets the datetime date, can I assign that date to a parameter?Fazil B. Topal
08/03/2023, 4:08 PMAnkit Kansal
08/03/2023, 4:42 PMAnkit Kansal
08/03/2023, 4:43 PMDaniel Kirel
08/03/2023, 8:25 PMkedro-mlflow
?
2. Is there a good way to save input datasets without needing to create separate MLFlow artifact datasets and a node to read and save datasets?
Appreciate any help/guidance on this 🙏Sid Shetty
08/04/2023, 3:29 PMcpa_llm.blocking_output@partitions:
type: PartitionedDataSet
path: data/cpa_llm/blocking_output
overwrite: True
filename_suffix: ".parquet"
dataset:
type: spark.SparkDataSet
file_format: parquet
save_args:
mode: overwrite
When I read the same data as a spark dataset I get the error that AnalysisException: Unable to infer schema for Parquet. It must be specified manually.
but when I read from one of the particular partitions it infers the schema. Was wondering if there maybe a step I am missing here or if you recommend some other data type over parquet to store the files.
Appreciate any help here 😄Emilio Gagliardi
08/04/2023, 5:12 PMEmilio Gagliardi
08/06/2023, 2:52 AM<http://kedro.contrib.io|kedro.contrib.io>.azure.JSONBlobDataSet
which I can't find in the documentation under 18.12, but under 15.6. Did something change in the way kedro organizes contrib.io? GPT 4 also said that the built-in kedro JSON dataset doesn't work on azure. Any guidance is appreciated. THanks kindly,
my_partitioned_dataset:
type: kedro.io.PartitionedDataSet
path: <your_blob_folder_path>
credentials: azure_blob_storage
dataset:
type: kedro.contrib.io.azure.JSONBlobDataSet <- is this valid?
container_name: <your_container_name>
credentials: azure_blob_storage
Jackson
08/07/2023, 3:17 AMdataset
folder which store my defined pytorch Dataset class and another module called model
, I will need to import the dataset and model classes into my kedro nodes. What are the best practices to store these module?Jackson
08/07/2023, 3:34 AMFazil B. Topal
08/07/2023, 9:45 AMload
method, i define outputs as None
in the node. Question is how can I create a Ordered Pipeline in kedro? Im willing to hack the Pipeline class a bit but too many stuff going on there so seeking some help here.
thanks in advance! 🙂Debanjan Banerjee
08/07/2023, 10:40 AMversioned
always points to a new version once writing the data right ? Can we ensure there is a prod
version created that the rest of the datasets always read from in production and we can change it in params or somewhere when we want to?
for eg., we can do this manually by doing this
parameters.yml
run_date: &run_date 20230101
version : *run_date --this can also be prod/dev/uat etc.
catalog.yml
weather:
type: spark.SparkDataSet
filepath: <s3a://your_bucket/data/01_raw/weather/${version}/file.csv>
file_format: csv
but this wont usilise the versioned: True
feature. Any way we can achieve the above functionality from versioned
? That would be much cleaner imoThomas Gölles
08/08/2023, 9:40 AMRosana EL-JURDI
08/08/2023, 9:50 AMRosana EL-JURDI
08/08/2023, 9:50 AMRosana EL-JURDI
08/08/2023, 9:51 AMRosana EL-JURDI
08/08/2023, 9:52 AMRosana EL-JURDI
08/08/2023, 9:53 AMRosana EL-JURDI
08/08/2023, 9:53 AMNok Lam Chan
08/08/2023, 10:40 AMHello everyone, I have a question regarding the usage of environments in combination with the OmegaConfigLoader.
I have a file calledcc @Gerrit Schoettlerin mycatalog_globals.yml
config folder, and also in mybase/
config folder. When I executeprod/
, the settings from the file inkedro run --env=prod
are still used.base/