Hi kedro experts! :kedro: First of all any help r...
# questions
a
Hi kedro experts! K First of all any help regarding this is much appreciated! πŸ™ I am trying to read a parquet file (exported from BigQuery) from GCS in kedro but facing the following error:
Copy code
[07/12/24 11:22:08] INFO     Loading data from prm_store_sp (SparkDataset)...                                           data_catalog.py:508
24/07/12 11:22:08 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: <gs://aa-dev-crm-primary/sample/prm_store>.
org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "gs"
        at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
        at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
        at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:53)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:366)
        at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
        at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:186)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:568)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.base/java.lang.Thread.run(Thread.java:840)
Just to make sure my credentials were correctly set, I made 4 datasets:
Copy code
prm_store_sp:
  type: spark.SparkDataset
  filepath: <gs://aa-dev-crm-primary/sample/prm_store>
  file_format: parquet
  credentials: gcs_credentials

prm_store_sc:
  type: spark.SparkDataset
  filepath: <gs://aa-dev-crm-primary/sample/prm_store.csv>
  file_format: csv
  credentials: gcs_credentials

prm_store_pp:
  type: pandas.ParquetDataset
  filepath: <gs://aa-dev-crm-primary/sample/prm_store>
  credentials: gcs_credentials


prm_store_pc:
  type: pandas.CSVDataset
  filepath: <gs://aa-dev-crm-primary/sample/prm_store.csv>
  credentials: gcs_credentials
1.
prm_store_sp
❌ 2.
prm_store_sc
❌ 3.
prm_store_pp
βœ… 4.
prm_store_pc
βœ… So credentials are fine. My spark.yml has the following:
Copy code
# GCS settings
spark.hadoop.fs.gs.impl: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem
spark.hadoop.fs.AbstractFileSystem.gs.impl: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS
I am using the following
SparkHooks
Copy code
class SparkHooks:
    """This class contains hooks for initializing a SparkSession using the configuration
    defined in the project's `conf` folder. The `after_context_created` method is
    implemented as a hook that gets triggered after the Kedro context is created.
    """

    @hook_impl
    def after_context_created(self, context) -> None:
        """Initialises a SparkSession using the config
        defined in project's conf folder.
        """
        # Load the spark configuration in spark.yaml using the config loader
        parameters = context.config_loader.get("spark")
        spark_conf = SparkConf().setAll(parameters.items())

        # Initialise the spark session
        spark_session_conf = (
            SparkSession.builder.appName(context.project_path.name)
            .enableHiveSupport()
            .config(conf=spark_conf)
        )
        _spark_session = spark_session_conf.getOrCreate()
        _spark_session.sparkContext.setLogLevel("WARN")
The problem seems to be with
--jars
that are needed to be able to read parquet with spark So I did the following: Created a simple spark app script as follows:
scripts/spark_app.py
Copy code
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.appName("GCS Example")
    .config(
        "spark.hadoop.fs.gs.impl",
        "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
    )
    .config(
        "spark.hadoop.fs.AbstractFileSystem.gs.impl",
        "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
    )
    .config("spark.hadoop.google.cloud.auth.service.account.enable", "true")
    .config(
        "spark.hadoop.google.cloud.auth.service.account.json.keyfile",
        "/path/to/service-account-key.json",
    )
    .getOrCreate()
)

df = spark.read.format("parquet").load("<gs://aa-dev-crm-primary/sample/prm_store>")
print(df.show())
Then submitted the spark job as follows:
Copy code
spark-submit --jars gcs-connector-hadoop3-latest.jar scripts/spark_app.py
This works if I add the local path of
jars
downloaded as follows:
Copy code
gsutil cp <gs://hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar> .
But the following 2 commands fail: With
jars
on gs:
Copy code
spark-submit --jars <gs://hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar> scripts/spark_app.py
Without
jars
:
Copy code
spark-submit scripts/spark_app.py
d
have you tried some of these?
Copy code
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("example") \
    .config("spark.jars.packages", "com.google.cloud.bigdataoss:gcs-connector:hadoop2-2.2.5") \
    .getOrCreate()

spark._jsc.hadoopConfiguration().set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
spark._jsc.hadoopConfiguration().set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
spark._jsc.hadoopConfiguration().set("google.cloud.auth.service.account.enable", "true")
spark._jsc.hadoopConfiguration().set("google.cloud.auth.service.account.json.keyfile", "path_to_your_service_account_key.json")
a
Let me try right now!
Thanks @datajoely as always!
So I was able to partly solve this by stopping an existing spark session and creating another one which has its own problems, but the problem is a bit deeper. Somehow, somewhere kedro is initializing its own
SparkSession
before my
SparkHooks
is invoked. And it's near impossible to figure out where. β€’ It's not the catalog entries initialising it - I removed all of them β€’ it's probably not any other auto registered hook, since I checked the registered hooks in the hook_manager. But something is creating a kedro session before anything else
So, this is exactly this issue where even without any hook, a
SparkSession
is being created: This issue remains unsolved (although it is closed): https://github.com/kedro-org/kedro/issues/3545 To overcome this I have to stop the existing session and create new one
Copy code
# Stop the existing spark session created elsewhere (TODO: if it exists)
        SparkSession.builder.getOrCreate().stop()  # pyspark-shell

        # Create a SparkConf object
        spark_conf = SparkConf().setAll(parameters.items())

        # Initialise the spark session
        spark_session_conf = (
            SparkSession.builder.appName(context.project_path.name)
            .enableHiveSupport()
            .config(conf=spark_conf)
        )
        _spark_session: SparkSession = spark_session_conf.getOrCreate()
Any idea, how to even find out where this existing
SparkSession
is being created by kedro?
d
This is weird
are you using just standard datasets?
a
I am using a lot of datasets (some nonstandard) and I thought so too, that some dataset's
___init___
must be creating the
SparkSession
since the order of execution is 1.
OmegaConfigLoader
-> parses the params and also initializes all catalog datasets 2.
KedroContext
3.
after_context_created
hook So, I removed all datasets from existence and still a
SparkSession
is being created (by just naming
conf/base
to
_bak
)
Maybe I should totally delete all datasets and test this? I did remove all datasets outside the
conf
folder as well though but same thing
This also does mean that if I have
kedro-viz
running (which will spin up it's own 2
SparkSession
- one I stop, one I create), then in another pipeline, the spark operations just exit for no reason
d
I think the source is likely be in the custom datasets, this issue isn’t reported from other users
a
I think I solved it. So, I deleted all datasets, and then found out, that in atleast 10 modules, there was a "free"
SparkSession
being defined (and hence created), since probably kedro loads all modules before starting the pipeline (or a kedro ipython session). Commenting out these "free"
SparkSession
solved the problem of
SparkSession
being created out of nowhere. So maybe a lesson and a best practice for me (or anybody): Do not define
SparkSession
outside of a function (or a class) πŸ™‚
πŸ’ͺ 1