```feature_engineering: type: MemoryDataSet co...
# questions
m
Copy code
feature_engineering:
  type: MemoryDataSet
  copy_mode: assign

preprocessed_data:
  type: spark.SparkDataSet
  filepath: data/${project}/05_model_input/df_preprocessed.parquet
  file_format: parquet
And this raises the error:
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o742.save.
: java.lang.ClassNotFoundException: <http://org.apache.spark.internal.io|org.apache.spark.internal.io>.cloud.PathOutputCommitProtocol
at <http://java.net|java.net>.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
Can someone hint to fix this issue?
any help please?
w
Are you trying to save to S3? Seems like you might be missing some jars, according to a similar issue: https://stackoverflow.com/questions/53913660/enabling-the-directory-committer-in-spark
m
not to S3, saving to local mac…to give more context to what is referred in the catalog:
preprocessed_data
is a sparkDataFrame frame from
feature_engineering
pipeline. And
preprocessed_data
can be read in a different pipeline or node but not able to save to local storage
this is how my spark configuration in
context.py
looks like:
Copy code
def init_spark_session(self) -> None:
    """Initialises a SparkSession using the config
    defined in project's conf folder.
    """

    parameters = self.config_loader.get("spark*", "spark*/**")
    spark_conf = SparkConf()
    spark_conf.setAll(parameters.items())
    spark_conf.set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.4')
    spark_conf.set('spark.hadoop.fs.s3.aws.credentials.provider',
                   'org.apache.hadoop.fs.s3.TemporaryAWSCredentialsProvider')
    spark_conf.set('spark.hadoop.fs.s3.access.key', os.environ.get('AWS_ACCESS_KEY_ID'))
    spark_conf.set('spark.hadoop.fs.s3.secret.key', os.environ.get('AWS_SECRET_ACCESS_KEY'))
    spark_conf.set('spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled', 'true')
    spark_conf.set("com.amazonaws.services.s3.enableV4", "true")
    spark_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    spark_conf.set("fs.s3a.aws.credentials.provider",
                   "com.amazonaws.auth.InstanceProfileCredentialsProvider,com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
    spark_conf.set("fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A")
    spark_conf.set("hadoop.fs.s3a.path.style.access", "true")
    spark_conf.set("hadoop.fs.s3a.fast.upload", "true")
    spark_conf.set("hadoop.fs.s3a.fast.upload.buffer", "bytebuffer")
    spark_conf.set("fs.s3a.path.style.access", "true")
    spark_conf.set("fs.s3a.multipart.size", "128M")
    spark_conf.set("fs.s3a.fast.upload.active.blocks", "4")
    spark_conf.set("fs.s3a.committer.name", "partitioned")
    spark_conf.set("spark.sql.sources.commitProtocolClass", "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol")
    spark_conf.set("spark.sql.parquet.output.committer.class",
        "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter")
    # Initialise the spark session
    spark_session_conf = (
        SparkSession.builder.appName(self._package_name)
            .enableHiveSupport()
            .config(conf=spark_conf)
    )
    _spark_session = spark_session_conf.getOrCreate()

    _spark_session.sparkContext.setLogLevel("WARN")
w
The error is related to this line
Copy code
spark_conf.set("spark.sql.sources.commitProtocolClass", "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol")
As it seems you do’t have the relevant jars in your classpath. If you don’t need to interact with S3, try removing all the s3 related conf to see if it works locally. Also, it’s cleaner to put all the spark related conf in
spark.yml
under conf/base instead.
m
Infact, i have added this line from the docs https://spark.apache.org/docs/3.0.0-preview/cloud-integration.html#recommended-settings-for-writing-to-object-stores to see if it fixes the error after but still has the same error. In otherwords, having this line or not does not change the stacktrace.
And the issue is only when writing files to local or s3, and it works when reading from local or s3