Hi all. I'm trying to use Kedro to develop a ML pi...
# questions
s
Hi all. I'm trying to use Kedro to develop a ML pipeline with Spark using a Dataproc cluster in GCP. I'd like to load a table from Big Query in a Spark dataset, how could I define that in the catalog? I know that I can use "plain" PySpark to read the table but I'd like to use the catalog. Thanks!
d
I haven't done this personally, but if you use https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example, then you should just be able to specify
file_format: bigquery
for your
spark.SparkDataSet
.
b
@Sebastian Cardona Lozano Hi Sebastian...I am facing errors while running Kedro in Dataproc, can you pls share the spark.yml configuration to be used for dataproc...the documentation says this "You should modify this code to adapt it to your clusterโ€™s setup, e.g. setting master to yarn" ....any help on this is much appreciated
m
Maybe this repo will help you https://github.com/getindata/kedro-pyspark-dataproc-demo It's a demo of Iris Kedro Pipeline running on Dataproc Batches / Serverless Spark
๐Ÿ‘ 1
s
Hi all. I'm trying to develop the pipeline of the model using Jupyter Lab IDE in the Dataproc cluster (it is a small cluster). I followed the instructions of the documentation and I created these new files: 1. `spark.yml`: Specifying the .jar with spark-bigquery connector
Copy code
spark.jars: '<gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar>'
2. In
src/<package_name>/
, a
hooks.py
:
Copy code
from kedro.framework.hooks import hook_impl
from pyspark import SparkConf
from pyspark.sql import SparkSession


class SparkHooks:
    @hook_impl
    def after_context_created(self, context) -> None:
        """Initialises a SparkSession using the config
        defined in project's conf folder.
        """

        # Load the spark configuration in spark.yaml using the config loader
        parameters = context.config_loader.get("spark*", "spark*/**")
        spark_conf = SparkConf().setAll(parameters.items())

        # Initialise the spark session
        spark_session_conf = (
            SparkSession.builder\
            .appName(context._package_name)\
            .config(conf=spark_conf)
        )
        _spark_session = spark_session_conf.getOrCreate()
        _spark_session.sparkContext.setLogLevel("WARN")
3. Updated the
HOOKS
variable in
src/<package_name>/settings.py
as follows:
Copy code
from <package_name>.hooks import SparkHooks

HOOKS = (SparkHooks(),)
4. In the
catalog.yml
I specified the Big Query table as follows:
Copy code
master_table:
    type: spark.SparkDataSet
    filepath: gcp_project_name.bigquery_dataset.table_name_in_big_query
    file_format: bigquery
When I load the table from the catalog in a Jupyter Notebook and using Python I get this:
Copy code
df = catalog.load("master_table")

[02/25/23 03:09:47] INFO     Loading data from 'master_table' (SparkDataSet)...                 data_catalog.py:343
โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ Traceback (most recent call last) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ
โ”‚ /opt/conda/miniconda3/envs/py39_599/lib/python3.9/site-packages/kedro/io/core.py:186 in load     โ”‚
โ”‚                                                                                                  โ”‚
โ”‚   183 โ”‚   โ”‚   self._logger.debug("Loading %s", str(self))                                        โ”‚
โ”‚   184 โ”‚   โ”‚                                                                                      โ”‚
โ”‚   185 โ”‚   โ”‚   try:                                                                               โ”‚
โ”‚ โฑ 186 โ”‚   โ”‚   โ”‚   return self._load()                                                            โ”‚
โ”‚   187 โ”‚   โ”‚   except DataSetError:                                                               โ”‚
โ”‚   188 โ”‚   โ”‚   โ”‚   raise                                                                          โ”‚
โ”‚   189 โ”‚   โ”‚   except Exception as exc:                                                           โ”‚
โ”‚                                                                                                  โ”‚
โ”‚ /opt/conda/miniconda3/envs/py39_599/lib/python3.9/site-packages/kedro/extras/datasets/spark/spar โ”‚
โ”‚ k_dataset.py:392 in _load                                                                        โ”‚
โ”‚                                                                                                  โ”‚
โ”‚   389 โ”‚   โ”‚   if self._schema:                                                                   โ”‚
โ”‚   390 โ”‚   โ”‚   โ”‚   read_obj = read_obj.schema(self._schema)                                       โ”‚
โ”‚   391 โ”‚   โ”‚                                                                                      โ”‚
โ”‚ โฑ 392 โ”‚   โ”‚   return read_obj.load(load_path, self._file_format, **self._load_args)              โ”‚
โ”‚   393 โ”‚                                                                                          โ”‚
โ”‚   394 โ”‚   def _save(self, data: DataFrame) -> None:                                              โ”‚
โ”‚   395 โ”‚   โ”‚   save_path = _strip_dbfs_prefix(self._fs_prefix + str(self._get_save_path()))       โ”‚
โ”‚                                                                                                  โ”‚
โ”‚ /opt/conda/miniconda3/envs/py39_599/lib/python3.9/site-packages/pyspark/sql/readwriter.py:177 in โ”‚
โ”‚ load                                                                                             โ”‚
โ”‚                                                                                                  โ”‚
โ”‚    174 โ”‚   โ”‚   โ”‚   self.schema(schema)                                                           โ”‚
โ”‚    175 โ”‚   โ”‚   self.options(**options)                                                           โ”‚
โ”‚    176 โ”‚   โ”‚   if isinstance(path, str):                                                         โ”‚
โ”‚ โฑ  177 โ”‚   โ”‚   โ”‚   return self._df(self._jreader.load(path))                                     โ”‚
โ”‚    178 โ”‚   โ”‚   elif path is not None:                                                            โ”‚
โ”‚    179 โ”‚   โ”‚   โ”‚   if type(path) != list:                                                        โ”‚
โ”‚    180 โ”‚   โ”‚   โ”‚   โ”‚   path = [path]  # type: ignore[list-item]                                  โ”‚
โ”‚                                                                                                  โ”‚
โ”‚ /opt/conda/miniconda3/envs/py39_599/lib/python3.9/site-packages/py4j/java_gateway.py:1321 in     โ”‚
โ”‚ __call__                                                                                         โ”‚
โ”‚                                                                                                  โ”‚
โ”‚   1318 โ”‚   โ”‚   โ”‚   proto.END_COMMAND_PART                                                        โ”‚
โ”‚   1319 โ”‚   โ”‚                                                                                     โ”‚
โ”‚   1320 โ”‚   โ”‚   answer = self.gateway_client.send_command(command)                                โ”‚
โ”‚ โฑ 1321 โ”‚   โ”‚   return_value = get_return_value(                                                  โ”‚
โ”‚   1322 โ”‚   โ”‚   โ”‚   answer, self.gateway_client, self.target_id, self.name)                       โ”‚
โ”‚   1323 โ”‚   โ”‚                                                                                     โ”‚
โ”‚   1324 โ”‚   โ”‚   for temp_arg in temp_args:                                                        โ”‚
โ”‚                                                                                                  โ”‚
โ”‚ /opt/conda/miniconda3/envs/py39_599/lib/python3.9/site-packages/pyspark/sql/utils.py:190 in deco โ”‚
โ”‚                                                                                                  โ”‚
โ”‚   187 def capture_sql_exception(f: Callable[..., Any]) -> Callable[..., Any]:                    โ”‚
โ”‚   188 โ”‚   def deco(*a: Any, **kw: Any) -> Any:                                                   โ”‚
โ”‚   189 โ”‚   โ”‚   try:                                                                               โ”‚
โ”‚ โฑ 190 โ”‚   โ”‚   โ”‚   return f(*a, **kw)                                                             โ”‚
โ”‚   191 โ”‚   โ”‚   except Py4JJavaError as e:                                                         โ”‚
โ”‚   192 โ”‚   โ”‚   โ”‚   converted = convert_exception(e.java_exception)                                โ”‚
โ”‚   193 โ”‚   โ”‚   โ”‚   if not isinstance(converted, UnknownException):                                โ”‚
โ”‚                                                                                                  โ”‚
โ”‚ /opt/conda/miniconda3/envs/py39_599/lib/python3.9/site-packages/py4j/protocol.py:326 in          โ”‚
โ”‚ get_return_value                                                                                 โ”‚
โ”‚                                                                                                  โ”‚
โ”‚   323 โ”‚   โ”‚   โ”‚   type = answer[1]                                                               โ”‚
โ”‚   324 โ”‚   โ”‚   โ”‚   value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)                     โ”‚
โ”‚   325 โ”‚   โ”‚   โ”‚   if answer[1] == REFERENCE_TYPE:                                                โ”‚
โ”‚ โฑ 326 โ”‚   โ”‚   โ”‚   โ”‚   raise Py4JJavaError(                                                       โ”‚
โ”‚   327 โ”‚   โ”‚   โ”‚   โ”‚   โ”‚   "An error occurred while calling {0}{1}{2}.\n".                        โ”‚
โ”‚   328 โ”‚   โ”‚   โ”‚   โ”‚   โ”‚   format(target_id, ".", name), value)                                   โ”‚
โ”‚   329 โ”‚   โ”‚   โ”‚   else:                                                                          โ”‚
โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ
Py4JJavaError: An error occurred while calling o243.load.
: com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryException: Malformed project 
resource name: projects//home/bbog-gd-599-profundizacion-ml/bdb-gcp-pr-ac-ba; Expected: projects/<project_id>
Nevertheless, if I use the manually method to initialize the Spark Session and load the data, It works:
Copy code
# Initialize the SparkSession.
VER = "0.26.0"
FILE_NAME = f"spark-bigquery-with-dependencies_2.12-{VER}.jar"
connector = f"<gs://spark-lib/bigquery/{FILE_NAME}>"

spark = (
    SparkSession.builder.appName("599-produndizacion")
    .config("spark.jars", connector)
    .getOrCreate()
)

# Load data
df = spark.read.format("bigquery")\
    .option("table", "gcp_project_name.bigquery_dataset.table_name_in_big_query")\
    .load()
Thanks for your help and sorry for the large comment ๐Ÿ™‚