Ruben-S
11/29/2023, 2:00 PMcode2 = """
df = spark.read.option("header",True) \
.parquet("/data/sandbox/my_file.parquet")
df.printSchema()
df.show(10)
"""
with LivySession.create(LIVY_URL, kind=SessionKind.PYSPARK, auth=auth) as session:
msg = session.run(code2)
print(msg)
my hooks.py:
from kedro.framework.hooks import hook_impl
from kedro.config import ConfigLoader
from kedro.framework.project import settings
from kedro.framework.context import KedroContext
import os
from pathlib import Path
from livy import LivySession, SessionKind
import requests
from requests.auth import HTTPBasicAuth
LIVY_URL = '<https://knox.xxxx:8443/gateway/cdp-proxy-api/livy_for_spark3>'
ca_bundle: Path = Path("/etc/pki/tls/cert.pem")
os.environ["REQUESTS_CA_BUNDLE"] = str(ca_bundle)
class LivyHooks:
def _get_credentials(self, key):
conf_path = f"{self.project_path}/{settings.CONF_SOURCE}"
conf_loader = ConfigLoader(conf_source=conf_path, env="local")
return conf_loader.get("credentials*")[key]
def _get_basic_auth(self):
credentials = self._get_credentials("dev_azure")
auth = HTTPBasicAuth(credentials['username'], credentials['password'])
return auth
@hook_impl
def after_context_created(self, context) -> None:
"""Initialises a Livy Session using the config
defined in project's conf folder.
"""
# Load the spark configuration in spark.yaml using the config loader
# parameters = context.config_loader.get("spark*", "spark*/**")
# spark_conf = SparkConf().setAll(parameters.items())
auth = self._get_basic_auth()
# Initialise the livy session
session = LivySession.create(LIVY_URL, kind=SessionKind.PYSPARK, auth=auth)
session.wait()
id = session.session_id