meharji arumilli
12/29/2022, 11:23 PMfeature_engineering:
type: MemoryDataSet
copy_mode: assign
preprocessed_data:
type: spark.SparkDataSet
filepath: data/${project}/05_model_input/df_preprocessed.parquet
file_format: parquet
And this raises the error: raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o742.save.
: java.lang.ClassNotFoundException: <http://org.apache.spark.internal.io|org.apache.spark.internal.io>.cloud.PathOutputCommitProtocol
at <http://java.net|java.net>.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
Can someone hint to fix this issue?William Caicedo
12/30/2022, 7:48 PMmeharji arumilli
12/30/2022, 8:08 PMpreprocessed_data
is a sparkDataFrame frame from feature_engineering
pipeline. And preprocessed_data
can be read in a different pipeline or node but not able to save to local storagecontext.py
looks like:
def init_spark_session(self) -> None:
"""Initialises a SparkSession using the config
defined in project's conf folder.
"""
parameters = self.config_loader.get("spark*", "spark*/**")
spark_conf = SparkConf()
spark_conf.setAll(parameters.items())
spark_conf.set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.4')
spark_conf.set('spark.hadoop.fs.s3.aws.credentials.provider',
'org.apache.hadoop.fs.s3.TemporaryAWSCredentialsProvider')
spark_conf.set('spark.hadoop.fs.s3.access.key', os.environ.get('AWS_ACCESS_KEY_ID'))
spark_conf.set('spark.hadoop.fs.s3.secret.key', os.environ.get('AWS_SECRET_ACCESS_KEY'))
spark_conf.set('spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled', 'true')
spark_conf.set("com.amazonaws.services.s3.enableV4", "true")
spark_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark_conf.set("fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.InstanceProfileCredentialsProvider,com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
spark_conf.set("fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A")
spark_conf.set("hadoop.fs.s3a.path.style.access", "true")
spark_conf.set("hadoop.fs.s3a.fast.upload", "true")
spark_conf.set("hadoop.fs.s3a.fast.upload.buffer", "bytebuffer")
spark_conf.set("fs.s3a.path.style.access", "true")
spark_conf.set("fs.s3a.multipart.size", "128M")
spark_conf.set("fs.s3a.fast.upload.active.blocks", "4")
spark_conf.set("fs.s3a.committer.name", "partitioned")
spark_conf.set("spark.sql.sources.commitProtocolClass", "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol")
spark_conf.set("spark.sql.parquet.output.committer.class",
"org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter")
# Initialise the spark session
spark_session_conf = (
SparkSession.builder.appName(self._package_name)
.enableHiveSupport()
.config(conf=spark_conf)
)
_spark_session = spark_session_conf.getOrCreate()
_spark_session.sparkContext.setLogLevel("WARN")
William Caicedo
12/30/2022, 9:30 PMspark_conf.set("spark.sql.sources.commitProtocolClass", "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol")
As it seems you do’t have the relevant jars in your classpath.
If you don’t need to interact with S3, try removing all the s3 related conf to see if it works locally. Also, it’s cleaner to put all the spark related conf in spark.yml
under conf/base instead.meharji arumilli
12/30/2022, 9:41 PM