Hi! Anyone using pyspark + OmegaConfigLoader? I ...
# questions
e
Hi! Anyone using pyspark + OmegaConfigLoader? I have an issue: I cannot even do a
kedro run,
since
_resolve_credentials
fails [i dont have any credential in my project]
AttributeError: 'str' object has no attribute 'items'
1
Traceback:
Copy code
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/01 14:39:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
│ /Users/Erwin_Paillacan/miniconda3/envs/de/bin/kedro:8 in <module>                          │
│                                                                                                  │
│ /Users/Erwin_Paillacan/miniconda3/envs/de/lib/python3.10/site-packages/kedro/framework/cli │
│ /cli.py:211 in main                                                                              │
│                                                                                                  │
│   208 │   """                                                                                    │
│   209 │   _init_plugins()                                                                        │
│   210 │   cli_collection = KedroCLI(project_path=Path.cwd())                                     │
│ ❱ 211 │   cli_collection()                                                                       │
│   212                                                                                            │
│                                                                                                  │
│ /Users/Erwin_Paillacan/miniconda3/envs/de/lib/python3.10/site-packages/click/core.py:1157  │
│ in __call__                                                                                      │
│                                                                                                  │
│ /Users/Erwin_Paillacan/miniconda3/envs/de/lib/python3.10/site-packages/kedro/framework/cli │
│ /cli.py:139 in main                                                                              │
│                                                                                                  │
│   136 │   │   )                                                                                  │
│   137 │   │                                                                                      │
│   138 │   │   try:                                                                               │
│ ❱ 139 │   │   │   super().main(                                                                  │
│   140 │   │   │   │   args=args,                                                                 │
│   141 │   │   │   │   prog_name=prog_name,                                                       │
│   142 │   │   │   │   complete_var=complete_var,                                                 │
│                                                                                                  │
│ /Users/Erwin_Paillacan/miniconda3/envs/de/lib/python3.10/site-packages/click/core.py:1078  │
│ in main                                                                                          │
│                                                                                                  │
│ /Users/Erwin_Paillacan/miniconda3/envs/de/lib/python3.10/site-packages/click/core.py:1688  │
│ in invoke                                                                                        │
│                                                                                                  │
│ /Users/Erwin_Paillacan/miniconda3/envs/de/lib/python3.10/site-packages/click/core.py:1434  │
│ in invoke                                                                                        │
│                                                                                                  │
│ /Users/Erwin_Paillacan/miniconda3/envs/de/lib/python3.10/site-packages/click/core.py:783   │
│ in invoke                                                                                        │
│                                                                                                  │
│ /Users/Erwin_Paillacan/miniconda3/envs/de/lib/python3.10/site-packages/kedro/framework/cli │
│ /project.py:453 in run                                                                           │
│                                                                                                  │
│   450 │   with KedroSession.create(                                                              │
│   451 │   │   env=env, conf_source=conf_source, extra_params=params                              │
│   452 │   ) as session:                                                                          │
│ ❱ 453 │   │   session.run(                                                                       │
│   454 │   │   │   tags=tag,                                                                      │
│   455 │   │   │   runner=runner(is_async=is_async),                                              │
│   456 │   │   │   node_names=node_names,                                                         │
│                                                                                                  │
│ /Users/Erwin_Paillacan/miniconda3/envs/de/lib/python3.10/site-packages/kedro/framework/ses │
│ sion/session.py:417 in run                                                                       │
│                                                                                                  │
│   414 │   │   │   "runner": getattr(runner, "__name__", str(runner)),                            │
│   415 │   │   }                                                                                  │
│   416 │   │                                                                                      │
│ ❱ 417 │   │   catalog = context._get_catalog(  # noqa: protected-access                          │
│   418 │   │   │   save_version=save_version,                                                     │
│   419 │   │   │   load_versions=load_versions,                                                   │
│   420 │   │   )                                                                                  │
│                                                                                                  │
│ /Users/Erwin_Paillacan/miniconda3/envs/de/lib/python3.10/site-packages/kedro/framework/con │
│ text/context.py:277 in _get_catalog                                                              │
│                                                                                                  │
│   274 │   │   )                                                                                  │
│   275 │   │   conf_creds = self._get_config_credentials()                                        │
│   276 │   │                                                                                      │
│ ❱ 277 │   │   catalog = settings.DATA_CATALOG_CLASS.from_config(                                 │
│   278 │   │   │   catalog=conf_catalog,                                                          │
│   279 │   │   │   credentials=conf_creds,                                                        │
│   280 │   │   │   load_versions=load_versions,                                                   │
│                                                                                                  │
│ /Users/Erwin_Paillacan/miniconda3/envs/de/lib/python3.10/site-packages/kedro/io/data_catal │
│ og.py:288 in from_config                                                                         │
│                                                                                                  │
│   285 │   │   layers: dict[str, set[str]] = defaultdict(set)                                     │
│   286 │   │                                                                                      │
│   287 │   │   for ds_name, ds_config in catalog.items():                                         │
│ ❱ 288 │   │   │   ds_config = _resolve_credentials(  # noqa: redefined-loop-name                 │
│   289 │   │   │   │   ds_config, credentials                                                     │
│   290 │   │   │   )                                                                              │
│   291 │   │   │   if cls._is_pattern(ds_name):                                                   │
│                                                                                                  │
│ /Users/Erwin_Paillacan/miniconda3/envs/de/lib/python3.10/site-packages/kedro/io/data_catal │
│ og.py:86 in _resolve_credentials                                                                 │
│                                                                                                  │
│    83 │   │   │   return {k: _map_value(k, v) for k, v in value.items()}                         │
│    84 │   │   return value                                                                       │
│    85 │                                                                                          │
│ ❱  86 │   return {k: _map_value(k, v) for k, v in config.items()}                                │
│    87                                                                                            │
│    88                                                                                            │
│    89 def _sub_nonword_chars(data_set_name: str) -> str:                                         │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
AttributeError: 'str' object has no attribute 'items'
hooks.py
Copy code
from kedro.framework.hooks import hook_impl
from pyspark import SparkConf
from pyspark.sql import SparkSession


class SparkHooks:
    @hook_impl
    def after_context_created(self, context) -> None:
        """Initialises a SparkSession using the config
        defined in project's conf folder.
        """

        # Load the spark configuration in spark.yaml using the config loader
        #breakpoint()
        parameters = context.config_loader.get("spark")

        # parameters = context.config_loader.get("spark*", "spark*/**")
        spark_conf = SparkConf().setAll(parameters.items())
        # spark_conf = SparkConf().setAll(parameters.items())

        # Initialise the spark session
        spark_session_conf = (
            SparkSession.builder.appName(context.project_path.name)
            .enableHiveSupport()
            .config(conf=spark_conf)
        )

        _spark_session = spark_session_conf.getOrCreate()
        _spark_session.sparkContext.setLogLevel("WARN")
settings.py
Copy code
"""Project settings. There is no need to edit this file unless you want to change values
from the Kedro defaults. For further information, including these default values, see
<https://kedro.readthedocs.io/en/stable/kedro_project_setup/settings.html>."""

# Instantiated project hooks.
from ml_minsur_de_lingo.hooks import SparkHooks

HOOKS = (SparkHooks(),)

# Installed plugins for which to disable hook auto-registration.
# DISABLE_HOOKS_FOR_PLUGINS = ("kedro-viz",)

# Class that manages storing KedroSession data.
# from kedro.framework.session.store import BaseSessionStore
# SESSION_STORE_CLASS = BaseSessionStore
# Keyword arguments to pass to the `SESSION_STORE_CLASS` constructor.
# SESSION_STORE_ARGS = {
#     "path": "./sessions"
# }

# Directory that holds configuration.
# CONF_SOURCE = "conf"

# Class that manages how configuration is loaded.
from kedro.config import OmegaConfigLoader

CONFIG_LOADER_CLASS = OmegaConfigLoader
# Keyword arguments to pass to the `CONFIG_LOADER_CLASS` constructor.
CONFIG_LOADER_ARGS = {
    "config_patterns": {
        "spark": ["spark*/"],
        "parameters": ["parameters*", "parameters*/**", "**/parameters*"],
    }
}


# Class that manages Kedro's library components.
# from kedro.framework.context import KedroContext
# CONTEXT_CLASS = KedroContext

# Class that manages the Data Catalog.
# from <http://kedro.io|kedro.io> import DataCatalog
# DATA_CATALOG_CLASS = DataCatalog
update. Solved: everything was due to the name of my global variable, for some reason needs to have a underscore. Before: catalog_globals.yml ---
env: dev
something_catalog.yml ----
filepath: /dbfs/mnt/${env}/sandbox/debugging/source/dim/Shift.parquet
After: catalog_globals.yml ---
_env: dev
something_catalog.yml ----
filepath: /dbfs/mnt/${_env}/sandbox/debugging/source/dim/Shift.parquet
👍 1