Ruben-S
11/29/2023, 2:00 PMcode2 = """
df = spark.read.option("header",True) \
.parquet("/data/sandbox/my_file.parquet")
df.printSchema()
df.show(10)
"""
with LivySession.create(LIVY_URL, kind=SessionKind.PYSPARK, auth=auth) as session:
msg = session.run(code2)
print(msg)
my hooks.py:
from kedro.framework.hooks import hook_impl
from kedro.config import ConfigLoader
from kedro.framework.project import settings
from kedro.framework.context import KedroContext
import os
from pathlib import Path
from livy import LivySession, SessionKind
import requests
from requests.auth import HTTPBasicAuth
LIVY_URL = '<https://knox.xxxx:8443/gateway/cdp-proxy-api/livy_for_spark3>'
ca_bundle: Path = Path("/etc/pki/tls/cert.pem")
os.environ["REQUESTS_CA_BUNDLE"] = str(ca_bundle)
class LivyHooks:
def _get_credentials(self, key):
conf_path = f"{self.project_path}/{settings.CONF_SOURCE}"
conf_loader = ConfigLoader(conf_source=conf_path, env="local")
return conf_loader.get("credentials*")[key]
def _get_basic_auth(self):
credentials = self._get_credentials("dev_azure")
auth = HTTPBasicAuth(credentials['username'], credentials['password'])
return auth
@hook_impl
def after_context_created(self, context) -> None:
"""Initialises a Livy Session using the config
defined in project's conf folder.
"""
# Load the spark configuration in spark.yaml using the config loader
# parameters = context.config_loader.get("spark*", "spark*/**")
# spark_conf = SparkConf().setAll(parameters.items())
auth = self._get_basic_auth()
# Initialise the livy session
session = LivySession.create(LIVY_URL, kind=SessionKind.PYSPARK, auth=auth)
session.wait()
id = session.session_id
Giuseppe Ughi
11/29/2023, 3:46 PMafter_context_created
hook where I import the KedroContext. I’m modifying the KedroContext parameters property but the changes are not reflected in the parameters. I’ve seen in the documentation that the “params” property is ReadOnly, however also the config_loader
property should be read only but the changes that I implement in the hook are propagated through the run. Any help is highly appreciated!
Thanks!!!Sarthak Bhatt
11/29/2023, 4:19 PMElaine Resende
11/29/2023, 9:40 PMOmegaConfigLoader
and passing parameters through extra_params
inside KedroSession.create
gives me an error:
InterpolationKeyError: Interpolation key 'test_ids' not found
full_key: query_test_recommender_to_filter.sql
object_type=dict
The given extra_params
are a set of strings, which I use in the catalog.yml
to parameterize a SQL query.
Does anyone know how to help me? Thanks in advanceLanchen ss
11/29/2023, 11:46 PMJustin Mayer
11/30/2023, 6:05 AM"{table}_ds":
type: my_custom_ds
site: "${globals:site}"
file_id: "${globals:file_id[${table}]}"
and in globals.yml
file_id:
users: ""
items: ""
site: ""
Thanks!tom kurian
11/30/2023, 11:08 AM```weather_cleaned:
type: spark.SparkDataSet
filepath: data/02_intermediate/data.parquet
file_format: parquet```
Is it possible to pass multiple fikepaths in the filepath arguement.
Chandan Malla
11/30/2023, 11:11 AM"{NAMESPACE}.ballbyball_final":
type: pandas.SQLTableDataSet
table_name: MODEL_{NAMESPACE}_BALLBYBALL_V2
credentials: db2
save_args:
if_exists: replace
chunksize: 10000
pipelines.py:
node(
func=total_balls_done,
inputs=["ballbyball_final_1","params:MIN_TOTAL_BALLS_MATCH"],
outputs="ballbyball_final",
---> Data is saved to DB over here
name="total_balls_done",tags="ballbyball_preprocessing"
),
node(
func=lambda x: x,
inputs="ballbyball_final",
----> Data is loaded from SQLTableDataSet instead of MemoryDataSet
outputs="cache_ballbyball_final",
name="cache_ballbyball_final",tags="ballbyball_preprocessing"
),
This is how it looks when pipeline is running:Chandan Malla
11/30/2023, 11:12 AMpuneet makhija
11/30/2023, 1:03 PMAna Paula Rojas
11/30/2023, 4:44 PMAna Paula Rojas
11/30/2023, 7:29 PMIceAsher Chew
12/01/2023, 5:55 AMFlorianGD
12/01/2023, 10:28 AMmemory_profiler
, and there is a huge memory usage, before the first node runs. This is critical for me because I am running it on a kubernetes pods, and if I use too much memory, the pod gets killed. Attached is the profile of a dummy
pipeline that I created that prints a parameter and does nothing else. We can see that it uses nearly 200MiB of memory.
I am not sure what I should look for to improve this memory usage. Is it the catalog? If so, how can I reduce it? Is it something else from kedro? Is it outside of kedro?Juan Luis
12/01/2023, 5:56 PMdataset:
filepath: ${custom_kedro_resolver_magic:project_root}/data/01_raw/iris.csv
? has anybody come up with a custom resolver that can do this dynamically, rather than having to, say, hardcode it in globals.yml
?Takieddine Kadiri
12/01/2023, 7:32 PMpackage_name
by getting it’s path with something like
Import package_name
Os.path.dirname(os.path.dirname(package_name.__file__))
Since package_name
is included in pythonpath when kedro run
, this will dynamically return package (or project) path locationPiotr Grabowski
12/04/2023, 12:09 PMHadeel Mustafa
12/04/2023, 7:23 PMMouna Balghouthi
12/05/2023, 9:32 AMsrc
│ ...
└───<package_name>
│ └───pipelines
│ └───data_engineering
| └───nodes
│ │ node_X.py
└───tests
│ └───pipelines
│ └───data_engineering
| └───nodes
│ │ test_node_X.py
In my test file, I have to add the first 2 lines else I will get an Import error:
# Uncommenting the following 2 lines solves the error
# import sys
# sys.path.append("path_to/src")
import pytest
from <package_name>.pipelines.data_engineering.nodes.node_X import (
func_X,
)
Is there a cleaner way than having to append the path for each test file?
ERROR Message:
==================================== ERRORS ======================================================================================
________________________________________________ ERROR collecting src/tests/pipelines/data_engineering/nodes/test_node_X.py ________________________________________________
ImportError while importing test module 'path_to/src/tests/pipelines/data_engineering/nodes/test_node_X.py'.
Hint: make sure your test modules/packages have valid Python names.
Traceback:
/home/ec2-user/miniconda3/envs/python310/lib/python3.10/importlib/__init__.py:126: in import_module
return _bootstrap._gcd_import(name[level:], package, level)
src/tests/pipelines/data_engineering/nodes/test_node_X.py:7: in <module>
from <package_name>.pipelines.data_engineering.nodes.node_X import (
E ModuleNotFoundError: No module named '<package_name>'
Thank you.Pedro Sousa Silva
12/05/2023, 11:32 AMLukas Innig
12/05/2023, 3:58 PMNamit Ohri
12/06/2023, 11:17 AMtom kurian
12/06/2023, 2:07 PMVersionNotFoundError: Did not find any versions for SparkDataset(file_format=parquet, filepath=<s3://bucket/folder/file_name>,
load_args={'header': True}, save_args={'mode': overwrite}, version=Version(load=None, save='2023-12-06T13.06.41.920Z'))
config file:
_pq: &_pq
type: spark.SparkDataSet
file_format: parquet
versioned: True
load_args:
header: True
save_args:
mode: overwrite
model_input.narrow_master.narrow_master:
filepath: ${base}/model_input/master
<<: *_pq
kedro 0.18.14
kedro-datasets 1.8.0
Kedro Versions,
What Am I doing wrong hereGleydson Silva
12/06/2023, 8:37 PMJose Luis Lavado Sánchez
12/07/2023, 4:53 PM_query
(pattern "{name}_query"
) to be save as text on text.TextDataSet
on data/{name}_query.txt
, the code on the catalog is
"{name}_query":
type: text.TextDataSet
filepath: data/{name}_query.txt
and on the pipelineAna Paula Rojas
12/07/2023, 8:50 PMGalen Seilis
12/07/2023, 9:38 PMimport subprocess
PYPY_PATH = ...
SCRIPT_PATH = ...
def run_pypy_script(input_data):
write_to_temp(input_data)
pypy_cmd = [PYPY_PATH, SCRIPT_PATH]
subprocess.run(pypy_cmd, check=True)
return load_results()
Any thoughts on this approach, or a better approach?
https://docs.python.org/3/library/subprocess.html#subprocess.runLukas Innig
12/08/2023, 2:46 PMmeharji arumilli
12/08/2023, 3:25 PMversion: 1
disable_existing_loggers: False
formatters:
simple:
format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
handlers:
console:
class: logging.StreamHandler
level: INFO
formatter: simple
stream: <ext://sys.stdout>
info_file_handler:
class: logging.handlers.RotatingFileHandler
level: INFO
formatter: simple
filename: info.log
maxBytes: 10485760 # 10MB
backupCount: 20
encoding: utf8
delay: True
error_file_handler:
class: logging.handlers.RotatingFileHandler
level: ERROR
formatter: simple
filename: errors.log
maxBytes: 10485760 # 10MB
backupCount: 20
encoding: utf8
delay: True
rich:
class: kedro.logging.RichHandler
rich_tracebacks: True
# Advance options for customisation.
# See <https://docs.kedro.org/en/stable/logging/logging.html#project-side-logging-configuration>
# tracebacks_show_locals: False
loggers:
kedro:
level: INFO
epc_fi:
level: INFO
root:
handlers: [console, info_file_handler, error_file_handler]
Can someone help to make the traceback clickable ?Ana Paula Rojas
12/10/2023, 2:56 AM