Abhishek Bhatia
07/12/2024, 11:32 AM[07/12/24 11:22:08] INFO Loading data from prm_store_sp (SparkDataset)... data_catalog.py:508
24/07/12 11:22:08 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: <gs://aa-dev-crm-primary/sample/prm_store>.
org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "gs"
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:53)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:366)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:186)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:840)
Just to make sure my credentials were correctly set, I made 4 datasets:
prm_store_sp:
type: spark.SparkDataset
filepath: <gs://aa-dev-crm-primary/sample/prm_store>
file_format: parquet
credentials: gcs_credentials
prm_store_sc:
type: spark.SparkDataset
filepath: <gs://aa-dev-crm-primary/sample/prm_store.csv>
file_format: csv
credentials: gcs_credentials
prm_store_pp:
type: pandas.ParquetDataset
filepath: <gs://aa-dev-crm-primary/sample/prm_store>
credentials: gcs_credentials
prm_store_pc:
type: pandas.CSVDataset
filepath: <gs://aa-dev-crm-primary/sample/prm_store.csv>
credentials: gcs_credentials
1. prm_store_sp
β
2. prm_store_sc
β
3. prm_store_pp
β
4. prm_store_pc
β
So credentials are fine.
My spark.yml has the following:
# GCS settings
spark.hadoop.fs.gs.impl: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem
spark.hadoop.fs.AbstractFileSystem.gs.impl: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS
I am using the following SparkHooks
class SparkHooks:
"""This class contains hooks for initializing a SparkSession using the configuration
defined in the project's `conf` folder. The `after_context_created` method is
implemented as a hook that gets triggered after the Kedro context is created.
"""
@hook_impl
def after_context_created(self, context) -> None:
"""Initialises a SparkSession using the config
defined in project's conf folder.
"""
# Load the spark configuration in spark.yaml using the config loader
parameters = context.config_loader.get("spark")
spark_conf = SparkConf().setAll(parameters.items())
# Initialise the spark session
spark_session_conf = (
SparkSession.builder.appName(context.project_path.name)
.enableHiveSupport()
.config(conf=spark_conf)
)
_spark_session = spark_session_conf.getOrCreate()
_spark_session.sparkContext.setLogLevel("WARN")
The problem seems to be with --jars
that are needed to be able to read parquet with spark
So I did the following:
Created a simple spark app script as follows:
scripts/spark_app.py
from pyspark.sql import SparkSession
spark = (
SparkSession.builder.appName("GCS Example")
.config(
"spark.hadoop.fs.gs.impl",
"com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
)
.config(
"spark.hadoop.fs.AbstractFileSystem.gs.impl",
"com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
)
.config("spark.hadoop.google.cloud.auth.service.account.enable", "true")
.config(
"spark.hadoop.google.cloud.auth.service.account.json.keyfile",
"/path/to/service-account-key.json",
)
.getOrCreate()
)
df = spark.read.format("parquet").load("<gs://aa-dev-crm-primary/sample/prm_store>")
print(df.show())
Then submitted the spark job as follows:
spark-submit --jars gcs-connector-hadoop3-latest.jar scripts/spark_app.py
This works if I add the local path of jars
downloaded as follows:
gsutil cp <gs://hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar> .
But the following 2 commands fail:
With jars
on gs:
spark-submit --jars <gs://hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar> scripts/spark_app.py
Without jars
:
spark-submit scripts/spark_app.py
datajoely
07/12/2024, 11:37 AMfrom pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("example") \
.config("spark.jars.packages", "com.google.cloud.bigdataoss:gcs-connector:hadoop2-2.2.5") \
.getOrCreate()
spark._jsc.hadoopConfiguration().set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
spark._jsc.hadoopConfiguration().set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
spark._jsc.hadoopConfiguration().set("google.cloud.auth.service.account.enable", "true")
spark._jsc.hadoopConfiguration().set("google.cloud.auth.service.account.json.keyfile", "path_to_your_service_account_key.json")
Abhishek Bhatia
07/12/2024, 11:44 AMAbhishek Bhatia
07/12/2024, 11:44 AMAbhishek Bhatia
07/14/2024, 8:11 AMSparkSession
before my SparkHooks
is invoked. And it's near impossible to figure out where.
β’ It's not the catalog entries initialising it - I removed all of them
β’ it's probably not any other auto registered hook, since I checked the registered hooks in the hook_manager.
But something is creating a kedro session before anything elseAbhishek Bhatia
07/15/2024, 9:57 AMSparkSession
is being created:
This issue remains unsolved (although it is closed):
https://github.com/kedro-org/kedro/issues/3545
To overcome this I have to stop the existing session and create new one
# Stop the existing spark session created elsewhere (TODO: if it exists)
SparkSession.builder.getOrCreate().stop() # pyspark-shell
# Create a SparkConf object
spark_conf = SparkConf().setAll(parameters.items())
# Initialise the spark session
spark_session_conf = (
SparkSession.builder.appName(context.project_path.name)
.enableHiveSupport()
.config(conf=spark_conf)
)
_spark_session: SparkSession = spark_session_conf.getOrCreate()
Any idea, how to even find out where this existing SparkSession
is being created by kedro?datajoely
07/15/2024, 9:59 AMdatajoely
07/15/2024, 9:59 AMAbhishek Bhatia
07/15/2024, 10:02 AM___init___
must be creating the SparkSession
since the order of execution is
1. OmegaConfigLoader
-> parses the params and also initializes all catalog datasets
2. KedroContext
3. after_context_created
hook
So, I removed all datasets from existence and still a SparkSession
is being created (by just naming conf/base
to _bak
)Abhishek Bhatia
07/15/2024, 10:02 AMconf
folder as well though but same thingAbhishek Bhatia
07/15/2024, 10:08 AMkedro-viz
running (which will spin up it's own 2 SparkSession
- one I stop, one I create), then in another pipeline, the spark operations just exit for no reasondatajoely
07/15/2024, 10:27 AMAbhishek Bhatia
07/15/2024, 10:50 AMSparkSession
being defined (and hence created), since probably kedro loads all modules before starting the pipeline (or a kedro ipython session).
Commenting out these "free" SparkSession
solved the problem of SparkSession
being created out of nowhere. So maybe a lesson and a best practice for me (or anybody): Do not define SparkSession
outside of a function (or a class) π