# questions

Sebastian Cardona Lozano

02/25/2023, 1:21 AM
Hi all. I'm trying to use Kedro to develop a ML pipeline with Spark using a Dataproc cluster in GCP. I'd like to load a table from Big Query in a Spark dataset, how could I define that in the catalog? I know that I can use "plain" PySpark to read the table but I'd like to use the catalog. Thanks!

Deepyaman Datta

02/25/2023, 1:11 PM
I haven't done this personally, but if you use, then you should just be able to specify
file_format: bigquery
for your

Balachandran Ponnusamy

02/25/2023, 4:39 PM
@Sebastian Cardona Lozano Hi Sebastian...I am facing errors while running Kedro in Dataproc, can you pls share the spark.yml configuration to be used for dataproc...the documentation says this "You should modify this code to adapt it to your clusterโ€™s setup, e.g. setting master to yarn" ....any help on this is much appreciated


02/25/2023, 9:13 PM
Maybe this repo will help you It's a demo of Iris Kedro Pipeline running on Dataproc Batches / Serverless Spark
๐Ÿ‘ 1

Sebastian Cardona Lozano

02/27/2023, 4:12 PM
Hi all. I'm trying to develop the pipeline of the model using Jupyter Lab IDE in the Dataproc cluster (it is a small cluster). I followed the instructions of the documentation and I created these new files: 1. `spark.yml`: Specifying the .jar with spark-bigquery connector
Copy code
spark.jars: '<gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar>'
2. In
, a
Copy code
from kedro.framework.hooks import hook_impl
from pyspark import SparkConf
from pyspark.sql import SparkSession

class SparkHooks:
    def after_context_created(self, context) -> None:
        """Initialises a SparkSession using the config
        defined in project's conf folder.

        # Load the spark configuration in spark.yaml using the config loader
        parameters = context.config_loader.get("spark*", "spark*/**")
        spark_conf = SparkConf().setAll(parameters.items())

        # Initialise the spark session
        spark_session_conf = (
        _spark_session = spark_session_conf.getOrCreate()
3. Updated the
variable in
as follows:
Copy code
from <package_name>.hooks import SparkHooks

HOOKS = (SparkHooks(),)
4. In the
I specified the Big Query table as follows:
Copy code
    type: spark.SparkDataSet
    filepath: gcp_project_name.bigquery_dataset.table_name_in_big_query
    file_format: bigquery
When I load the table from the catalog in a Jupyter Notebook and using Python I get this:
Copy code
df = catalog.load("master_table")

[02/25/23 03:09:47] INFO     Loading data from 'master_table' (SparkDataSet)...       
โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ Traceback (most recent call last) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ
โ”‚ /opt/conda/miniconda3/envs/py39_599/lib/python3.9/site-packages/kedro/io/ in load     โ”‚
โ”‚                                                                                                  โ”‚
โ”‚   183 โ”‚   โ”‚   self._logger.debug("Loading %s", str(self))                                        โ”‚
โ”‚   184 โ”‚   โ”‚                                                                                      โ”‚
โ”‚   185 โ”‚   โ”‚   try:                                                                               โ”‚
โ”‚ โฑ 186 โ”‚   โ”‚   โ”‚   return self._load()                                                            โ”‚
โ”‚   187 โ”‚   โ”‚   except DataSetError:                                                               โ”‚
โ”‚   188 โ”‚   โ”‚   โ”‚   raise                                                                          โ”‚
โ”‚   189 โ”‚   โ”‚   except Exception as exc:                                                           โ”‚
โ”‚                                                                                                  โ”‚
โ”‚ /opt/conda/miniconda3/envs/py39_599/lib/python3.9/site-packages/kedro/extras/datasets/spark/spar โ”‚
โ”‚ in _load                                                                        โ”‚
โ”‚                                                                                                  โ”‚
โ”‚   389 โ”‚   โ”‚   if self._schema:                                                                   โ”‚
โ”‚   390 โ”‚   โ”‚   โ”‚   read_obj = read_obj.schema(self._schema)                                       โ”‚
โ”‚   391 โ”‚   โ”‚                                                                                      โ”‚
โ”‚ โฑ 392 โ”‚   โ”‚   return read_obj.load(load_path, self._file_format, **self._load_args)              โ”‚
โ”‚   393 โ”‚                                                                                          โ”‚
โ”‚   394 โ”‚   def _save(self, data: DataFrame) -> None:                                              โ”‚
โ”‚   395 โ”‚   โ”‚   save_path = _strip_dbfs_prefix(self._fs_prefix + str(self._get_save_path()))       โ”‚
โ”‚                                                                                                  โ”‚
โ”‚ /opt/conda/miniconda3/envs/py39_599/lib/python3.9/site-packages/pyspark/sql/ in โ”‚
โ”‚ load                                                                                             โ”‚
โ”‚                                                                                                  โ”‚
โ”‚    174 โ”‚   โ”‚   โ”‚   self.schema(schema)                                                           โ”‚
โ”‚    175 โ”‚   โ”‚   self.options(**options)                                                           โ”‚
โ”‚    176 โ”‚   โ”‚   if isinstance(path, str):                                                         โ”‚
โ”‚ โฑ  177 โ”‚   โ”‚   โ”‚   return self._df(self._jreader.load(path))                                     โ”‚
โ”‚    178 โ”‚   โ”‚   elif path is not None:                                                            โ”‚
โ”‚    179 โ”‚   โ”‚   โ”‚   if type(path) != list:                                                        โ”‚
โ”‚    180 โ”‚   โ”‚   โ”‚   โ”‚   path = [path]  # type: ignore[list-item]                                  โ”‚
โ”‚                                                                                                  โ”‚
โ”‚ /opt/conda/miniconda3/envs/py39_599/lib/python3.9/site-packages/py4j/ in     โ”‚
โ”‚ __call__                                                                                         โ”‚
โ”‚                                                                                                  โ”‚
โ”‚   1318 โ”‚   โ”‚   โ”‚   proto.END_COMMAND_PART                                                        โ”‚
โ”‚   1319 โ”‚   โ”‚                                                                                     โ”‚
โ”‚   1320 โ”‚   โ”‚   answer = self.gateway_client.send_command(command)                                โ”‚
โ”‚ โฑ 1321 โ”‚   โ”‚   return_value = get_return_value(                                                  โ”‚
โ”‚   1322 โ”‚   โ”‚   โ”‚   answer, self.gateway_client, self.target_id,                       โ”‚
โ”‚   1323 โ”‚   โ”‚                                                                                     โ”‚
โ”‚   1324 โ”‚   โ”‚   for temp_arg in temp_args:                                                        โ”‚
โ”‚                                                                                                  โ”‚
โ”‚ /opt/conda/miniconda3/envs/py39_599/lib/python3.9/site-packages/pyspark/sql/ in deco โ”‚
โ”‚                                                                                                  โ”‚
โ”‚   187 def capture_sql_exception(f: Callable[..., Any]) -> Callable[..., Any]:                    โ”‚
โ”‚   188 โ”‚   def deco(*a: Any, **kw: Any) -> Any:                                                   โ”‚
โ”‚   189 โ”‚   โ”‚   try:                                                                               โ”‚
โ”‚ โฑ 190 โ”‚   โ”‚   โ”‚   return f(*a, **kw)                                                             โ”‚
โ”‚   191 โ”‚   โ”‚   except Py4JJavaError as e:                                                         โ”‚
โ”‚   192 โ”‚   โ”‚   โ”‚   converted = convert_exception(e.java_exception)                                โ”‚
โ”‚   193 โ”‚   โ”‚   โ”‚   if not isinstance(converted, UnknownException):                                โ”‚
โ”‚                                                                                                  โ”‚
โ”‚ /opt/conda/miniconda3/envs/py39_599/lib/python3.9/site-packages/py4j/ in          โ”‚
โ”‚ get_return_value                                                                                 โ”‚
โ”‚                                                                                                  โ”‚
โ”‚   323 โ”‚   โ”‚   โ”‚   type = answer[1]                                                               โ”‚
โ”‚   324 โ”‚   โ”‚   โ”‚   value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)                     โ”‚
โ”‚   325 โ”‚   โ”‚   โ”‚   if answer[1] == REFERENCE_TYPE:                                                โ”‚
โ”‚ โฑ 326 โ”‚   โ”‚   โ”‚   โ”‚   raise Py4JJavaError(                                                       โ”‚
โ”‚   327 โ”‚   โ”‚   โ”‚   โ”‚   โ”‚   "An error occurred while calling {0}{1}{2}.\n".                        โ”‚
โ”‚   328 โ”‚   โ”‚   โ”‚   โ”‚   โ”‚   format(target_id, ".", name), value)                                   โ”‚
โ”‚   329 โ”‚   โ”‚   โ”‚   else:                                                                          โ”‚
Py4JJavaError: An error occurred while calling o243.load.
: Malformed project 
resource name: projects//home/bbog-gd-599-profundizacion-ml/bdb-gcp-pr-ac-ba; Expected: projects/<project_id>
Nevertheless, if I use the manually method to initialize the Spark Session and load the data, It works:
Copy code
# Initialize the SparkSession.
VER = "0.26.0"
FILE_NAME = f"spark-bigquery-with-dependencies_2.12-{VER}.jar"
connector = f"<gs://spark-lib/bigquery/{FILE_NAME}>"

spark = (
    .config("spark.jars", connector)

# Load data
df ="bigquery")\
    .option("table", "gcp_project_name.bigquery_dataset.table_name_in_big_query")\
Thanks for your help and sorry for the large comment ๐Ÿ™‚