Sebastian Cardona Lozano
02/25/2023, 1:21 AMDeepyaman Datta
02/25/2023, 1:11 PMfile_format: bigquery
for your spark.SparkDataSet
.Balachandran Ponnusamy
02/25/2023, 4:39 PMmarrrcin
02/25/2023, 9:13 PMSebastian Cardona Lozano
02/27/2023, 4:12 PMspark.jars: '<gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar>'
2. In src/<package_name>/
, a hooks.py
:
from kedro.framework.hooks import hook_impl
from pyspark import SparkConf
from pyspark.sql import SparkSession
class SparkHooks:
@hook_impl
def after_context_created(self, context) -> None:
"""Initialises a SparkSession using the config
defined in project's conf folder.
"""
# Load the spark configuration in spark.yaml using the config loader
parameters = context.config_loader.get("spark*", "spark*/**")
spark_conf = SparkConf().setAll(parameters.items())
# Initialise the spark session
spark_session_conf = (
SparkSession.builder\
.appName(context._package_name)\
.config(conf=spark_conf)
)
_spark_session = spark_session_conf.getOrCreate()
_spark_session.sparkContext.setLogLevel("WARN")
3. Updated the HOOKS
variable in src/<package_name>/settings.py
as follows:
from <package_name>.hooks import SparkHooks
HOOKS = (SparkHooks(),)
4. In the catalog.yml
I specified the Big Query table as follows:
master_table:
type: spark.SparkDataSet
filepath: gcp_project_name.bigquery_dataset.table_name_in_big_query
file_format: bigquery
When I load the table from the catalog in a Jupyter Notebook and using Python I get this:
df = catalog.load("master_table")
[02/25/23 03:09:47] INFO Loading data from 'master_table' (SparkDataSet)... data_catalog.py:343
โญโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ Traceback (most recent call last) โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโฎ
โ /opt/conda/miniconda3/envs/py39_599/lib/python3.9/site-packages/kedro/io/core.py:186 in load โ
โ โ
โ 183 โ โ self._logger.debug("Loading %s", str(self)) โ
โ 184 โ โ โ
โ 185 โ โ try: โ
โ โฑ 186 โ โ โ return self._load() โ
โ 187 โ โ except DataSetError: โ
โ 188 โ โ โ raise โ
โ 189 โ โ except Exception as exc: โ
โ โ
โ /opt/conda/miniconda3/envs/py39_599/lib/python3.9/site-packages/kedro/extras/datasets/spark/spar โ
โ k_dataset.py:392 in _load โ
โ โ
โ 389 โ โ if self._schema: โ
โ 390 โ โ โ read_obj = read_obj.schema(self._schema) โ
โ 391 โ โ โ
โ โฑ 392 โ โ return read_obj.load(load_path, self._file_format, **self._load_args) โ
โ 393 โ โ
โ 394 โ def _save(self, data: DataFrame) -> None: โ
โ 395 โ โ save_path = _strip_dbfs_prefix(self._fs_prefix + str(self._get_save_path())) โ
โ โ
โ /opt/conda/miniconda3/envs/py39_599/lib/python3.9/site-packages/pyspark/sql/readwriter.py:177 in โ
โ load โ
โ โ
โ 174 โ โ โ self.schema(schema) โ
โ 175 โ โ self.options(**options) โ
โ 176 โ โ if isinstance(path, str): โ
โ โฑ 177 โ โ โ return self._df(self._jreader.load(path)) โ
โ 178 โ โ elif path is not None: โ
โ 179 โ โ โ if type(path) != list: โ
โ 180 โ โ โ โ path = [path] # type: ignore[list-item] โ
โ โ
โ /opt/conda/miniconda3/envs/py39_599/lib/python3.9/site-packages/py4j/java_gateway.py:1321 in โ
โ __call__ โ
โ โ
โ 1318 โ โ โ proto.END_COMMAND_PART โ
โ 1319 โ โ โ
โ 1320 โ โ answer = self.gateway_client.send_command(command) โ
โ โฑ 1321 โ โ return_value = get_return_value( โ
โ 1322 โ โ โ answer, self.gateway_client, self.target_id, self.name) โ
โ 1323 โ โ โ
โ 1324 โ โ for temp_arg in temp_args: โ
โ โ
โ /opt/conda/miniconda3/envs/py39_599/lib/python3.9/site-packages/pyspark/sql/utils.py:190 in deco โ
โ โ
โ 187 def capture_sql_exception(f: Callable[..., Any]) -> Callable[..., Any]: โ
โ 188 โ def deco(*a: Any, **kw: Any) -> Any: โ
โ 189 โ โ try: โ
โ โฑ 190 โ โ โ return f(*a, **kw) โ
โ 191 โ โ except Py4JJavaError as e: โ
โ 192 โ โ โ converted = convert_exception(e.java_exception) โ
โ 193 โ โ โ if not isinstance(converted, UnknownException): โ
โ โ
โ /opt/conda/miniconda3/envs/py39_599/lib/python3.9/site-packages/py4j/protocol.py:326 in โ
โ get_return_value โ
โ โ
โ 323 โ โ โ type = answer[1] โ
โ 324 โ โ โ value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) โ
โ 325 โ โ โ if answer[1] == REFERENCE_TYPE: โ
โ โฑ 326 โ โ โ โ raise Py4JJavaError( โ
โ 327 โ โ โ โ โ "An error occurred while calling {0}{1}{2}.\n". โ
โ 328 โ โ โ โ โ format(target_id, ".", name), value) โ
โ 329 โ โ โ else: โ
โฐโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโฏ
Py4JJavaError: An error occurred while calling o243.load.
: com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryException: Malformed project
resource name: projects//home/bbog-gd-599-profundizacion-ml/bdb-gcp-pr-ac-ba; Expected: projects/<project_id>
Nevertheless, if I use the manually method to initialize the Spark Session and load the data, It works:
# Initialize the SparkSession.
VER = "0.26.0"
FILE_NAME = f"spark-bigquery-with-dependencies_2.12-{VER}.jar"
connector = f"<gs://spark-lib/bigquery/{FILE_NAME}>"
spark = (
SparkSession.builder.appName("599-produndizacion")
.config("spark.jars", connector)
.getOrCreate()
)
# Load data
df = spark.read.format("bigquery")\
.option("table", "gcp_project_name.bigquery_dataset.table_name_in_big_query")\
.load()