any clue what could have been the issue? i could b...
# questions
m
any clue what could have been the issue? i could be missing some minor thing here!! @Rob is it similar to your issue? could u help
r
Hey @meharji arumilli, as I understood as Spark has a lazy execution, if prev your processed data haven't performed an action you should set it as a
MemoryDataSet
. So in you catalog, place something like this:
Copy code
your_pipeline:
  type: MemoryDataSet
  copy_mode : assign
Notice the copy_mode and for more reference, check this doc: Use MemoryDataSet with *copy_mode*="assign" for non-DataFrame Spark objects
Note: If you haven't performed an action an you set the
copy_mode="deecopy"
(default setting I guess?), probably will fail since your data is still part of the Spark's Logical Plan
m
afaik,
Copy code
your_pipeline:
  type: MemoryDataSet
  copy_mode : assign
should be used when “Sometimes, you might want to use Spark objects that aren’t
DataFrame
as inputs and outputs in your pipeline.” as per the docs https://kedro.readthedocs.io/en/0.17.3/11_tools_integration/01_pyspark.html#use-kedro-s-built-in-spark-datasets-to-load-and-save-raw-data
here my object
preprocessed_data:
is a sparkDataFrame and here im trying to save to local storage
Copy code
preprocessed_data:
  type: spark.SparkDataSet
  filepath: data/${project}/05_model_input/df_preprocessed.parquet
  file_format: parquet
should i still place this in catalog?
Copy code
preprocessed_data:
  type: MemoryDataSet
  copy_mode : assign
r
Try doing that if you haven't performed an Spark Action, this works on dataframes that have only been exposed to Spark Transformations Otherwise be sure to execute the action over your dataframe. But you can try both
m
Copy code
preprocessed_data:
  type: MemoryDataSet
  copy_mode: assign
 
preprocessed_data:
  type: spark.SparkDataSet
  filepath: data/${project}/05_model_input/df_preprocessed.parquet
  file_format: parquet
Copy code
but this already has an issue as we use `preprocessed_data` twice in the catalog
r
``preprocessed_data`` is the name of your output I think Place it as:
Copy code
<Pipeline Name>:
  type: MemoryDataSet
  copy_mode: assign
 
preprocessed_data:
  type: spark.SparkDataSet
  filepath: data/${project}/05_model_input/df_preprocessed.parquet
  file_format: parquet
Output* sorry
m
Copy code
feature_engineering:
  type: MemoryDataSet
  copy_mode: assign

preprocessed_data:
  type: spark.SparkDataSet
  filepath: data/${project}/05_model_input/df_preprocessed.parquet
  file_format: parquet
i have added now as u suggested, and the error code is different now
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o742.save.
: java.lang.ClassNotFoundException: <http://org.apache.spark.internal.io|org.apache.spark.internal.io>.cloud.PathOutputCommitProtocol
at <http://java.net|java.net>.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
r
Mmm yes now that a Java issue
Be sure to have the Java JDK 8 and Spark correctly set up
m
$ java -version
openjdk version "1.8.0_292"
OpenJDK Runtime Environment (AdoptOpenJDK)(build 1.8.0_292-b10)
OpenJDK 64-Bit Server VM (AdoptOpenJDK)(build 25.292-b10, mixed mode)
r
I did this using Windows but what worked for me was setting the hadoop.dll file and the winutils.exe according to your spark version (+ Plus setting your system variables) If you are using MacOS or Linux, I'd recommend you to ask the community
m
hmm i did installed hadoop 3.3.4 on mac but probably need to re do this to make sure things are ok!!
but could the java error be related to this?
r
It's better to ask being honest
m
ok, thanks!! BTW do u happen to save and load model/pipeline files in pyspark + kedro framework? would be helpful if u can give a hint
r
I haven't worked with models yet. And you're welcome!
m
🙂
thanks!!