Mark Pinches
12/21/2022, 2:46 PMMark Pinches
12/21/2022, 2:46 PMJordan
12/21/2022, 9:38 PMkedro build-reqs
was changed? It used to build a requirements.txt
file from a <http://requirements.in|requirements.in>
file, and now it builds a requirements.lock
file from a requirements.txt
file.Vladimir Filimonov
12/22/2022, 8:23 AM$(plugin)
is never defined nor I found any instructions in the repo to have it defined prior to running makeSlackbot
12/22/2022, 9:00 AMEugene P
12/22/2022, 2:52 PMpandas.SQLQueryDataSet
. One for each query.
3. I have generic node function to call SQL query, returning empty df like this
def run_sql_script_node(sql_query_dataset: pd.DataFrame,
blank_df_for_nodes_order: pd.DataFrame,):
return pd.DataFrame()
4. I define required nodes controlling the execution order by using consecutive empty_df outputs/inputs
node(
func=run_sql_script_node,
inputs=["create_rropen_cadcost_schema_and_tables_dataset", "empty_cadcost_df0"],
outputs="empty_cadcost_df1",
name="create_rropen_cadcost_schema_and_tables_node",
),
node(
func=run_sql_script_node,
inputs=["create_rropen_cadcost_staging_table_dataset", "empty_cadcost_df1"],
outputs="empty_cadcost_df2",
name="create_rropen_cadcost_staging_table_dataset_node",
),
I do understand that Kedro may be the not-the-100%-appropriate-tool to control SQL workflows, but for the sake of total DS pipeline integrity and my kedro-learning would like to stick to it (it is amazing, btw!).
This workaround works and works correctly, but I was thinking that this approach can be further simplified? May be there is a way to execute sql-queries in particular order without creation of catalog entries for datasets, for example?
Thx in advance for critique and suggestions!Olivier Ho
12/22/2022, 3:33 PMMohammed Samir
12/22/2022, 3:36 PMtrain_model_sagemaker
?Brandon Meek
12/22/2022, 10:20 PMfeatures:
numeric:
x: "x"
categorical:
y:
col: "y"
dropna: True
z:
col: "z"
dropna: True
i:
col: "i"
dropna: False
j:
col: "j"
dropna: False
but when I try to freeze the parameter:
ingestion_pipeline = pipeline(
pipe=ingestion_pipe,
inputs={
"a",
"b",
"c",
"d"
},
parameters="features",
namespace="ingestion"
)
I get Failed to map datasets and/or parameters: params:features
When I namespace features
it works, Am I doing something wrong? I'm using kedro 0.18.3
with sparkSuryansh Soni
12/23/2022, 4:14 PMRob
12/26/2022, 4:21 PMpyspark-iris
starter. So I already setup spark 3.0 on my Windows machine and it's working, and I'm getting this `DataSetError`:
DataSetError: Failed while saving data to data set
SparkDataSet(file_format=parquet,
filepath=C:/Users/rober/PycharmProjects/pyspark-test/data/02_intermediate/X_train.parquet, load_args={'header': True, 'inferSchema': True},
save_args={'header': True, 'mode': overwrite}).
An error occurred while calling o60.save.
So I already checked the copy_mode
of the MemoryDataSet
conf inside the catalog.yml
and it's set as assign, since there are no actions executed in the previous node so I guess it's the only saving mode. Probably it's something simple, but if someone can help me, I'd appreciate your helpElior Cohen
12/27/2022, 7:28 AMA
which does some work and then depending on how much data it produced, it can create multiple parallel executions of B
where each B_i
executes the same logic on a sub set of the data produced by A
Then maybe if any data point in B
has errors they go to C
but the data points that are good go to D
meharji arumilli
12/27/2022, 1:42 PMFor non-spark objects I used to save/read from the catalog as:
lightgbm_model:
type: pickle.PickleDataSet
filepath: <s3://bucket/data/lightgbm_model.pkl>
backend: pickle
How can I save if the 'lightgbm_model' model is from spark pipeline?
Manilson António Lussati
12/27/2022, 6:48 PMPawel Granat
12/28/2022, 4:47 PM2022-12-18 14:08:46,846] {ssh.py:476} INFO - 2022-12-18 14:08:46,845 - kedro.pipeline.node - INFO - Running node: test_node_1: <lambda>([test_1.fake_name.test_data_predictions,params:test_1.predictive_modeling.fake_name.target]) -> [test_1.fake_name.labels,test_1.fake_name.score]
[2022-12-18 14:08:46,846] {ssh.py:476} INFO - 2022-12-18 14:08:46,845 - multi_runner.safeguards - ERROR - Node test_node_1, in the "test_1" run failed with the exception:
'AttributeError' object is not subscriptable
Traceback (most recent call last):
[..]
And further on in the same log:
[2022-12-18 14:08:47,520] {ssh.py:476} INFO - 2022-12-18 14:08:47,510 - multi_runner.safeguards - WARNING - Node fake_name_post_modelling_analysis, in the "test_1"run is skipped due to an upstream error
[2022-12-18 14:08:47,672] {ssh.py:476} INFO - 2022-12-18 14:08:47,671 - kedro.runner.sequential_runner - INFO - Completed 48 out of 48 tasks
[2022-12-18 14:08:47,673] {ssh.py:476} INFO - 2022-12-18 14:08:47,671 - kedro.runner.sequential_runner - INFO - Pipeline execution completed successfully.
[2022-12-18 14:08:47,674] {ssh.py:476} INFO - 2022-12-18 14:08:47,672 - proj.hooks.project_hooks - INFO - fake_name pipeline execution completed successfully.
[2022-12-18 14:08:50,263] {taskinstance.py:859} DEBUG - Refreshing TaskInstance <TaskInstance: test_dag.fake_name manual__2022-12-17T16:06:07.524573+00:00 [running]> from DB
[2022-12-18 14:08:50,282] {base_job.py:226} DEBUG - [heartbeat]
[2022-12-18 14:08:51,673] {channel.py:1212} DEBUG - [chan 0] EOF received (0)
[2022-12-18 14:08:51,711] {_init_.py:107} DEBUG - Lineage called with inlets: [], outlets: []
[2022-12-18 14:08:51,711] {taskinstance.py:859} DEBUG - Refreshing TaskInstance <TaskInstance: test_dag.fake_name manual__2022-12-17T16:06:07.524573+00:00 [running]> from DB
[2022-12-18 14:08:51,734] {taskinstance.py:1406} DEBUG - Clearing next_method and next_kwargs.
[2022-12-18 14:08:51,734] {taskinstance.py:1400} INFO - Marking task as SUCCESS. dag_id=test_dag, task_id=fake_name, execution_date=20221217T160607, start_date=20221218T140430, end_date=20221218T140851
[2022-12-18 14:08:51,735] {taskinstance.py:2336} DEBUG - Task Duration set to 261.096866
[2022-12-18 14:08:51,751] {cli_action_loggers.py:84} DEBUG - Calling callbacks: []
[2022-12-18 14:08:51,822] {local_task_job.py:156} INFO - Task exited with return code 0
As you can see :
fake_name pipeline execution completed successfully.
Run command:
kedro run --pipeline fake_name
Great hearing from you and all the best,
Pawelmeharji arumilli
12/28/2022, 7:17 PMmeharji arumilli
12/28/2022, 7:17 PMpreprocessed_data:
type: spark.SparkDataSet
filepath: data/${project}/05_model_input/df_preprocessed.parquet
file_format: parquet
meharji arumilli
12/28/2022, 7:18 PMraise DataSetError(message) from exc
<http://kedro.io|kedro.io>.core.DataSetError: Failed while saving data to data set SparkDataSet(file_format=parquet, filepath=/Users/data/rre/05_model_input/df_preprocessed.parquet, load_args={}, save_args={}).
An error occurred while calling o727.save.
meharji arumilli
12/28/2022, 7:19 PMSebastian Cardona Lozano
12/29/2022, 2:20 PMSebastian Cardona Lozano
12/29/2022, 2:26 PMkedro info
in CLI appears the next warning:
[12/29/22 14:22:07] WARNING /opt/conda/lib/python3.7/site-packages/plotly/graph_objects/__init__.py:288: warnings.py:110
DeprecationWarning: distutils Version classes are deprecated. Use packaging.version instead.
if LooseVersion(ipywidgets.__version__) >= LooseVersion("7.0.0"):
2. Neither I can use kedro ipython
in CLI:
[12/29/22 14:24:12] INFO Resolved project path as: /home/jupyter/bm-598-onboarding. __init__.py:135
To set a different path, run '%reload_kedro <project_root>'
[TerminalIPythonApp] WARNING | Error in loading extension: kedro.ipython
Check your config files in /home/jupyter/.ipython/profile_default
Traceback (most recent call last):
File "/opt/conda/lib/python3.7/site-packages/IPython/core/shellapp.py", line 301, in init_extensions
self.shell.extension_manager.load_extension(ext)
File "/opt/conda/lib/python3.7/site-packages/IPython/core/extensions.py", line 87, in load_extension
if self._call_load_ipython_extension(mod):
File "/opt/conda/lib/python3.7/site-packages/IPython/core/extensions.py", line 134, in _call_load_ipython_extension
mod.load_ipython_extension(self.shell)
File "/opt/conda/lib/python3.7/site-packages/kedro/ipython/__init__.py", line 40, in load_ipython_extension
reload_kedro()
File "/opt/conda/lib/python3.7/site-packages/kedro/ipython/__init__.py", line 89, in reload_kedro
context = session.load_context()
File "/opt/conda/lib/python3.7/site-packages/kedro/framework/session/session.py", line 259, in load_context
context=context
File "/opt/conda/lib/python3.7/site-packages/pluggy/_hooks.py", line 265, in __call__
return self._hookexec(self.name, self.get_hookimpls(), kwargs, firstresult)
File "/opt/conda/lib/python3.7/site-packages/pluggy/_manager.py", line 80, in _hookexec
return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
File "/opt/conda/lib/python3.7/site-packages/pluggy/_callers.py", line 60, in _multicall
return outcome.get_result()
File "/opt/conda/lib/python3.7/site-packages/pluggy/_result.py", line 60, in get_result
raise ex[1].with_traceback(ex[2])
File "/opt/conda/lib/python3.7/site-packages/pluggy/_callers.py", line 39, in _multicall
res = hook_impl.function(*args)
File "/opt/conda/lib/python3.7/site-packages/kedro_telemetry/plugin.py", line 120, in after_context_created
catalog = context.catalog
File "/opt/conda/lib/python3.7/site-packages/kedro/framework/context/context.py", line 232, in catalog
return self._get_catalog()
File "/opt/conda/lib/python3.7/site-packages/kedro/framework/context/context.py", line 287, in _get_catalog
save_version=save_version,
File "/opt/conda/lib/python3.7/site-packages/kedro/io/data_catalog.py", line 272, in from_config
ds_layer = ds_config.pop("layer", None)
AttributeError: 'str' object has no attribute 'pop'
Sebastian Cardona Lozano
12/29/2022, 2:29 PMmeharji arumilli
12/29/2022, 11:21 PMmeharji arumilli
12/29/2022, 11:23 PMfeature_engineering:
type: MemoryDataSet
copy_mode: assign
preprocessed_data:
type: spark.SparkDataSet
filepath: data/${project}/05_model_input/df_preprocessed.parquet
file_format: parquet
And this raises the error: raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o742.save.
: java.lang.ClassNotFoundException: <http://org.apache.spark.internal.io|org.apache.spark.internal.io>.cloud.PathOutputCommitProtocol
at <http://java.net|java.net>.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
Can someone hint to fix this issue?Sebastian Cardona Lozano
12/30/2022, 2:37 PMuser
01/03/2023, 10:48 AMhttps://i.stack.imgur.com/ImXYi.png▾
Jo Stichbury
01/03/2023, 2:55 PMdef compare_passenger_capacity_go(preprocessed_shuttles: pd.DataFrame):
data_frame = preprocessed_shuttles.groupby(["shuttle_type"]).mean().reset_index()
fig = go.Figure(
[
go.Bar(
x=data_frame["shuttle_type"],
y=data_frame["passenger_capacity"],
)
]
)
return fig
However, the code for Plotly express isn't working in a kedro run
.
def compare_passenger_capacity_exp(preprocessed_shuttles: pd.DataFrame):
fig = px.bar(
data_frame=preprocessed_shuttles.groupby(["shuttle_type"]).mean().reset_index(),
x="shuttle_type",
y="passenger_capacity",
)
return fig
The error returned is
PlotlyDataSet(filepath=/Users/jo_stichbury/Documents/GitHub/stichbury/kedro-projects/kedro-tutorial/data/08_reporting/shuttle_passenger_capacity_plot_exp.json, load_args={},
plotly_args={'fig': {'orientation': h, 'x': shuttle_type, 'y': passenger_capacity}, 'layout': {'title': Shuttle Passenger capacity, 'xaxis_title': Shuttles, 'yaxis_title': Average
passenger capacity}, 'type': bar}, protocol=file, save_args={}, version=Version(load=None, save='2023-01-03T14.43.36.537Z')).
Value of 'x' is not the name of a column in 'data_frame'. Expected one of [0] but received: shuttle_type
Before the holiday, I did a fair amount of trial and error to re-write the function according to various stack overflow searches, but I couldn't find a way to fix it.
🚨 Please could I get some help from anyone who knows this code (maybe @Rashida Kanchwala?) or anyone who is familiar with Plotly to get the compare_passenger_capacity_exp
method working? 🚨
My example is here so I hope it's just a matter of taking it and revising the method in the nodes.py
file for the reporting pipeline. I should point out that it doesn't currently work on 0.18.4 (see this issue) so it's necessary to test against 0.18.3 (using the 'old' dataset notation) for now. Everything in my example is working apart from this node.Sasha Collin
01/03/2023, 9:17 PMnode(func=func, inputs="partitioned_dataset_name:dataset_name", ....)
thanks!tingting wan
01/04/2023, 4:25 PMuser
01/04/2023, 5:28 PM