Abhishek Bhatia
10/15/2024, 5:38 PMgcloud dataproc batches submit pyspark file:///home/kedro/src/entrypoint.py \
--project my-project \
--region=europe-central2 \
--container-image=europe-central2-docker.pkg.dev/my-project/kedro-dataproc-demo/kedro-dataproc-iris:latest \
--service-account <mailto:dataproc-worker@my-project.iam.gserviceaccount.com|dataproc-worker@my-project.iam.gserviceaccount.com> \
--properties spark.app.name="kedro-pyspark-iris",spark.dynamicAllocation.minExecutors=2,spark.dynamicAllocation.maxExecutors=2 \
-- \
run
Entry point script contains the following:
import os
from kedro.framework import cli
os.chdir("/home/kedro")
cli.main()
I am getting the following error:
[10/15/24 17:30:21] INFO Loading data from data_catalog.py:343
'example_iris_data'
(SparkDataSet)...
[10/15/24 17:30:22] WARNING There are 3 nodes that have not run. runner.py:178
You can resume the pipeline run by
adding the following argument to your
previous command:
╭───────────────────── Traceback (most recent call last) ──────────────────────╮
│ /usr/local/lib/python3.9/site-packages/kedro/io/core.py:186 in load │
│ │
│ 183 │ │ self._logger.debug("Loading %s", str(self)) │
│ 184 │ │ │
│ 185 │ │ try: │
│ ❱ 186 │ │ │ return self._load() │
│ 187 │ │ except DataSetError: │
│ 188 │ │ │ raise │
│ 189 │ │ except Exception as exc: │
│ │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │ message = 'Failed while loading data from data set │ │
│ │ SparkDataSet(file_format=csv, filepath=g'+2319 │ │
│ │ self = <kedro.extras.datasets.spark.spark_dataset.SparkDataSet object │ │
│ │ at 0x7f4163077730> │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /usr/local/lib/python3.9/site-packages/kedro/extras/datasets/spark/spark_dat │
│ aset.py:380 in _load │
│ │
│ 377 │ │
│ 378 │ def _load(self) -> DataFrame: │
│ 379 │ │ load_path = _strip_dbfs_prefix(self._fs_prefix + str(self._get │
│ ❱ 380 │ │ read_obj = self._get_spark().read │
│ 381 │ │ │
│ 382 │ │ # Pass schema if defined │
│ 383 │ │ if self._schema: │
│ │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │ load_path = '<gs://aa-dev-crm-users/abhishek/misc/iris.csv>' │ │
│ │ self = <kedro.extras.datasets.spark.spark_dataset.SparkDataSet │ │
│ │ object at 0x7f4163077730> │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /usr/lib/spark/python/pyspark/sql/session.py:1706 in read │
│ │
│ 1703 │ │ |100|Hyukjin Kwon| │
│ 1704 │ │ +---+------------+ │
│ 1705 │ │ """ │
│ ❱ 1706 │ │ return DataFrameReader(self) │
│ 1707 │ │
│ 1708 │ @property │
│ 1709 │ def readStream(self) -> DataStreamReader: │
│ │
│ ╭────────────────────────────── locals ──────────────────────────────╮ │
│ │ self = <pyspark.sql.session.SparkSession object at 0x7f4174ebcf40> │ │
│ ╰────────────────────────────────────────────────────────────────────╯ │
│ │
│ /usr/lib/spark/python/pyspark/sql/readwriter.py:70 in __init__ │
│ │
│ 67 │ """ │
│ 68 │ │
│ 69 │ def __init__(self, spark: "SparkSession"): │
│ ❱ 70 │ │ self._jreader = spark._jsparkSession.read() │
│ 71 │ │ self._spark = spark │
│ 72 │ │
│ 73 │ def _df(self, jdf: JavaObject) -> "DataFrame": │
│ │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │ self = <pyspark.sql.readwriter.DataFrameReader object at │ │
│ │ 0x7f41631fa700> │ │
│ │ spark = <pyspark.sql.session.SparkSession object at 0x7f4174ebcf40> │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322 in │
│ __call__ │
│ │
│ [Errno 20] Not a directory: │
│ '/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py' │
│ │
│ /usr/lib/spark/python/pyspark/errors/exceptions/captured.py:185 in deco │
│ │
│ 182 │ │ │ if not isinstance(converted, UnknownException): │
│ 183 │ │ │ │ # Hide where the exception came from that shows a non- │
│ 184 │ │ │ │ # JVM exception message. │
│ ❱ 185 │ │ │ │ raise converted from None │
│ 186 │ │ │ else: │
│ 187 │ │ │ │ raise │
│ 188 │
│ │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │ a = ( │ │
│ │ │ 'xro91', │ │
│ │ │ <py4j.clientserver.JavaClient object at 0x7f417cb199d0>, │ │
│ │ │ 'o88', │ │
│ │ │ 'read' │ │
│ │ ) │ │
│ │ converted = IllegalArgumentException() │ │
│ │ f = <function get_return_value at 0x7f417b8c0310> │ │
│ │ kw = {} │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰──────────────────────────────────────────────────────────────────────────────╯
IllegalArgumentException: The value of property spark.app.name must not be null
Almost 100% sure that this error is not due to my any mis-spec in my Dockerfile
or requirements, because it works perfectly if I change the entrpoint script to the following:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
df = spark.read.csv("<gs://aa-dev-crm-users/abhishek/misc/iris.csv>", inferSchema=True, header=True)
print(df.show())
Ravi Kumar Pilla
10/15/2024, 8:18 PMspark.app.name
. Could you try without passing that ? I do not see the git repo passing the same -
gcloud dataproc batches submit pyspark file:///home/kedro/src/entrypoint.py \
--project gid-ml-ops-sandbox \
--region=europe-west1\
--container-image=gcr.io/gid-ml-ops-sandbox/pyspark-tutorial-mb:20220920105 \
--service-account kedro-pyspark-dataproc@gid-ml-ops-sandbox.iam.gserviceaccount.com \
--properties spark.dynamicAllocation.minExecutors=2,spark.dynamicAllocation.maxExecutors=2 -- \
run
Abhishek Bhatia
10/16/2024, 2:01 AMspark.app.name
but got the same error.Ravi Kumar Pilla
10/16/2024, 3:24 PMArtur Dobrogowski
10/16/2024, 3:29 PMAbhishek Bhatia
10/17/2024, 6:27 AM