https://kedro.org/ logo
#plugins-integrations
Title
# plugins-integrations
m

Mark Druffel

02/10/2024, 12:05 AM
Does anyone know if kedro-viz should work with data factories using layers? I was working off of the kedro-viz documentation, but I'm hitting an error. With this data catalog
kedro viz run
completes, but doesn't graph my pipeline w/ layers:
Copy code
"{layer}.{dataset_name}":
  type: plsure.datasets.ibis.TableDataset
  filepath: data/bronze/{layer}/{dataset_name}/*.parquet
  file_format: parquet
  table_name: "{layer}_{dataset_name}"
  connection: ${_duckdb}
  save_args:
    materialized: table
When I add the layers to metadata,
kedro viz run
fails with the error below:
Copy code
"{layer}.{dataset_name}":
  type: plsure.datasets.ibis.TableDataset
  filepath: data/bronze/{layer}/{dataset_name}/*.parquet
  file_format: parquet
  table_name: "{layer}_{dataset_name}"
  connection: ${_duckdb}
  save_args:
    materialized: table
  metadata:
    kedro-viz:
      layer: {layer}
Error:
Copy code
...
  File "~/kedro_viz/models/flowchart.py", line 238, in create_data_node
    return DataNode(
  File "pydantic/main.py", line 341, in pydantic.main.BaseModel.__init__
pydantic.error_wrappers.ValidationError: 1 validation error for DataNode
layer
  str type expected (type=type_error.str)
Traceback (most recent call last):
  File "~/kedro_viz/launchers/cli.py", line 187, in run
    _wait_for(func=_check_viz_up, host=host, port=port)
  File "~/kedro_viz/launchers/utils.py", line 58, in _wait_for
    raise WaitForException(
kedro_viz.launchers.utils.WaitForException: func: <function _check_viz_up at 0x106898e50>, didn't return True within specified timeout: 60
If I don't use data factory at all, it works:
Copy code
"raw.media_meas_campaign_info":
  type: plsure.datasets.ibis.TableDataset
  filepath: data/bronze/raw/media_meas_campaign_info/*.parquet
  file_format: parquet
  table_name: "raw_media_meas_campaign_info"
  connection: ${_duckdb}
  save_args:
    materialized: table
  metadata:
    kedro-viz:
      layer: raw
n

Nok Lam Chan

02/10/2024, 1:32 PM
Just curious I see you are using ibis and duckdb, how was the experience and what are you using it for? It should work but I think there was some issues so it get rollback, I am not 100% sure if viz support factory already. Is the dataset metadata looks fine, does it miss layer only?
m

Mark Druffel

02/10/2024, 6:54 PM
@Nok Lam Chan so far so good with ibis. We're refactoring an old, very disorganized data pipeline at work. I'm testing a number of frameworks including kedro and ibis. Loving both so far, but still pretty early in the process. My main reasoning for ibis is flexibility and consistency. We will probably have to connect to a few different SQL backends which will likely change over time as these things tend to do in large companies. Anything we can do that allows us to minimize the impact of those changes is really helpful. That said, 90+ percent of our data just comes from the lake via pyspark. Our current pipeline is a hodge podge of pyspark. Spark SQL, and pandas depending who wrote the module. It's not necessarily a problem, but does make readability more mental work. Want to minimize that if possible.... We use databricks w/ unity catalog so I will have to use ibis to connect to ManagedTableDataset which I haven't done yet. Using Polars and šŸ¦† locally for development. On kedro viz, I did see a note after I posted that experiments don't work with layers right now so I'm guessing that issue is connected, but I'm not using experiments - just vanilla
kedro viz run
It does not run at all when I include layers, it fails with a pydantic validation error I included above
šŸ”„ 2
@Nok Lam Chan We're moving very slow trying to figure architecture out, learn kedro, and waiting on a bunch of internal (big corporate) stuff. All to say, it's been a minute since you asked but, I did want to share that in about 30 mins of work I was able to get @Deepyaman Dattaā€™s awesome TableDataset working w/ vanilla spark. Maybe it already did out of the box or should have, but I couldn't get it to so did just a couple line tweak and it works great so far. I'm still trying to figure out how to implement credentials and wrap (or just fork) databricks.ManagedTableDatasets since that's what we'll use in prod, but I now have catalog entries that use vanilla pyspark in my databricks environment and polars on my laptop which is pretty slick šŸ”„ Just thought I'd share since I've seen your team has been mentioning ibis a bit.
Copy code
from __future__ import annotations

from copy import deepcopy
from typing import TYPE_CHECKING, Any, ClassVar

import ibis.expr.types as ir
from kedro.io import AbstractDataset, DatasetError

if TYPE_CHECKING:
    from ibis import BaseBackend


class TableDataset(AbstractDataset[ir.Table, ir.Table]):
    DEFAULT_LOAD_ARGS: ClassVar[dict[str, Any]] = {}
    DEFAULT_SAVE_ARGS: ClassVar[dict[str, Any]] = {
        "materialized": "view",
        "overwrite": True,
    }

    _connections: ClassVar[dict[tuple[tuple[str, str]], BaseBackend]] = {}

    def __init__(
        self,
        *,
        filepath: str | None = None,
        file_format: str | None = None,
        table_name: str | None = None,
        connection: dict[str, Any] | None = None,
        load_args: dict[str, Any] | None = None,
        save_args: dict[str, Any] | None = None,
        #credentials: dict[str, Any] = None,
    ) -> None:
        if filepath is None and table_name is None:
            raise DatasetError(
                "Must provide at least one of `filepath` or `table_name`."
            )

        self._filepath = filepath
        self._file_format = file_format
        self._table_name = table_name
        self._connection_config = connection

        # Set load and save arguments, overwriting defaults if provided.
        self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS)
        if load_args is not None:
            self._load_args.update(load_args)

        self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS)
        if save_args is not None:
            self._save_args.update(save_args)

        self._materialized = self._save_args.pop("materialized")
        #self._credentials = deepcopy(credentials) or {}

    @property
    def connection(self) -> BaseBackend:
        cls = type(self)
        key = tuple(sorted(self._connection_config.items()))
        if key not in cls._connections:
            import ibis

            config = deepcopy(self._connection_config)
            backend = getattr(ibis, config.pop("backend"))
            
            # my janky spark connector #################################################

            if config.pop("session") == "_get_spark":
                from kedro_datasets.spark.spark_dataset import _get_spark
                _session = _get_spark()
                cls._connections[key] = backend.connect(session=_session)
            else:
                cls._connections[key] = backend.connect(**config)

        return cls._connections[key]

    def _load(self) -> ir.Table:
        if self._filepath is not None:
            if self._file_format is None:
                raise NotImplementedError

            reader = getattr(self.connection, f"read_{self._file_format}")
            return reader(self._filepath, self._table_name, **self._load_args)
        else:
            return self.connection.table(self._table_name)

    def _save(self, data: ir.Table) -> None:
        if self._table_name is None:
            raise DatasetError("Must provide `table_name` for materialization.")

        writer = getattr(self.connection, f"create_{self._materialized}")
        writer(self._table_name, data, **self._save_args)

    def _describe(self) -> dict[str, Any]:
        return {
            "filepath": self._filepath,
            "file_format": self._file_format,
            "table_name": self._table_name,
            "connection_config": self._connection_config,
            "load_args": self._load_args,
            "save_args": self._save_args,
            "materialized": self._materialized,
        }

    def _exists(self) -> bool:
        return (
            self._table_name is not None and self._table_name in self.connection.tables
        )
d

Deepyaman Datta

03/20/2024, 6:30 AM
That's super cool to hear that the Ibis is enabling you use Polars locally and Spark on Databricks! Always great when things can be used as intended. šŸ˜€ Re the Spark hack, it's true that you need to pass
session
in the dataset config. I'm pretty sure this should be achievable using a custom resolver, rather than modifying the dataset: https://docs.kedro.org/en/stable/configuration/advanced_configuration.html#how-to-use-resolvers-in-the-omegaconfigloader That said, there might be value in finding the session by default for Spark (and maybe some other backends like Flink) in the dataset implementation, to make users' lives simpler.
šŸ’” 1
(Another option could be to connect using URL format, which does create a session; see https://ibis-project.org/backends/pyspark#ibis.connect-url-format)
šŸ’Æ 1
Not very familiar with
ManagedTableDataset
, but it mostly seems to be a wrapper specifically for reading/writing Spark to Delta? I think this should be feasible to support I'll try to look a bit more tomorrow; pretty sure half asleep and may be saying incorrect things now
n

Nok Lam Chan

03/20/2024, 8:12 AM
ManagedTableDataset use the UnityCatalog (a hive metadata ) Databricks provided, not sure if it interacts nicely with ibis
šŸ‘€ 1
d

Deepyaman Datta

03/20/2024, 2:38 PM
Use of catalog should be fine; it's basically just some additional SQL statement to make sure you
USE CATALOG
or always namespace the database table by catalog. Happy to help figure it out.
šŸ’” 1
5 Views