https://kedro.org/ logo
#questions
Title
# questions
r

Ruben-S

11/29/2023, 2:00 PM
Hi, I'm trying to come up with a way of using a Cloudera Spark cluster from a Kedro project. The limitation is that the spark cluster can only be accessed via Livy (https://livy.apache.org/) and it's HDFS filesystem is exposed via WebHDFS. So the spark context is longliving, does not need to be set-up and is not directly accessible from the outside world. i.e. I can not issue a SparkSession.getOrCreate() from inside the Kedro project. See the diagram I attached. So from the kedro project I need to open a Livy Session and then from the nodes send commands over the Livy session. The idea is that the data is already on the cluster (hdfs) and that the data does not need to be exiting the cluster until the very end where the end result can be "downloaded" over webhdfs. I think I need to create this livy session in the hooks.py (equivalent to the Spark example) - my code below. However I fail to see how I can then access this livy session from the nodes? I also do not (yet) see how the catalog can contain the webhdfs:// references if that data is not really accessed inside the kedro project. Any help is much appreciated. br, Ruben Example livy code (which works):
Copy code
code2 = """
df = spark.read.option("header",True) \
          .parquet("/data/sandbox/my_file.parquet")
df.printSchema()
df.show(10)
"""

with LivySession.create(LIVY_URL, kind=SessionKind.PYSPARK, auth=auth) as session:
    msg = session.run(code2)
    print(msg)
my hooks.py:
Copy code
from kedro.framework.hooks import hook_impl
from kedro.config import ConfigLoader
from kedro.framework.project import settings
from kedro.framework.context import KedroContext
import os
from pathlib import Path
from livy import LivySession, SessionKind
import requests
from requests.auth import HTTPBasicAuth

LIVY_URL = '<https://knox.xxxx:8443/gateway/cdp-proxy-api/livy_for_spark3>'

ca_bundle: Path = Path("/etc/pki/tls/cert.pem")
os.environ["REQUESTS_CA_BUNDLE"] = str(ca_bundle)


class LivyHooks:
    def _get_credentials(self, key):
        conf_path = f"{self.project_path}/{settings.CONF_SOURCE}"
        conf_loader = ConfigLoader(conf_source=conf_path, env="local")
        return conf_loader.get("credentials*")[key]

    def _get_basic_auth(self):

        credentials = self._get_credentials("dev_azure")

        auth = HTTPBasicAuth(credentials['username'], credentials['password'])

        return auth

    @hook_impl
    def after_context_created(self, context) -> None:
        """Initialises a Livy Session using the config
        defined in project's conf folder.
        """

        # Load the spark configuration in spark.yaml using the config loader
        # parameters = context.config_loader.get("spark*", "spark*/**")
        # spark_conf = SparkConf().setAll(parameters.items())

        auth = self._get_basic_auth()        

        # Initialise the livy session
        session = LivySession.create(LIVY_URL, kind=SessionKind.PYSPARK, auth=auth)

        session.wait()
        id = session.session_id