Mark Druffel
02/10/2024, 12:05 AMkedro viz run
completes, but doesn't graph my pipeline w/ layers:
"{layer}.{dataset_name}":
type: plsure.datasets.ibis.TableDataset
filepath: data/bronze/{layer}/{dataset_name}/*.parquet
file_format: parquet
table_name: "{layer}_{dataset_name}"
connection: ${_duckdb}
save_args:
materialized: table
When I add the layers to metadata, kedro viz run
fails with the error below:
"{layer}.{dataset_name}":
type: plsure.datasets.ibis.TableDataset
filepath: data/bronze/{layer}/{dataset_name}/*.parquet
file_format: parquet
table_name: "{layer}_{dataset_name}"
connection: ${_duckdb}
save_args:
materialized: table
metadata:
kedro-viz:
layer: {layer}
Error:
...
File "~/kedro_viz/models/flowchart.py", line 238, in create_data_node
return DataNode(
File "pydantic/main.py", line 341, in pydantic.main.BaseModel.__init__
pydantic.error_wrappers.ValidationError: 1 validation error for DataNode
layer
str type expected (type=type_error.str)
Traceback (most recent call last):
File "~/kedro_viz/launchers/cli.py", line 187, in run
_wait_for(func=_check_viz_up, host=host, port=port)
File "~/kedro_viz/launchers/utils.py", line 58, in _wait_for
raise WaitForException(
kedro_viz.launchers.utils.WaitForException: func: <function _check_viz_up at 0x106898e50>, didn't return True within specified timeout: 60
If I don't use data factory at all, it works:
"raw.media_meas_campaign_info":
type: plsure.datasets.ibis.TableDataset
filepath: data/bronze/raw/media_meas_campaign_info/*.parquet
file_format: parquet
table_name: "raw_media_meas_campaign_info"
connection: ${_duckdb}
save_args:
materialized: table
metadata:
kedro-viz:
layer: raw
Nok Lam Chan
02/10/2024, 1:32 PMMark Druffel
02/10/2024, 6:54 PMkedro viz run
It does not run at all when I include layers, it fails with a pydantic validation error I included abovefrom __future__ import annotations
from copy import deepcopy
from typing import TYPE_CHECKING, Any, ClassVar
import ibis.expr.types as ir
from kedro.io import AbstractDataset, DatasetError
if TYPE_CHECKING:
from ibis import BaseBackend
class TableDataset(AbstractDataset[ir.Table, ir.Table]):
DEFAULT_LOAD_ARGS: ClassVar[dict[str, Any]] = {}
DEFAULT_SAVE_ARGS: ClassVar[dict[str, Any]] = {
"materialized": "view",
"overwrite": True,
}
_connections: ClassVar[dict[tuple[tuple[str, str]], BaseBackend]] = {}
def __init__(
self,
*,
filepath: str | None = None,
file_format: str | None = None,
table_name: str | None = None,
connection: dict[str, Any] | None = None,
load_args: dict[str, Any] | None = None,
save_args: dict[str, Any] | None = None,
#credentials: dict[str, Any] = None,
) -> None:
if filepath is None and table_name is None:
raise DatasetError(
"Must provide at least one of `filepath` or `table_name`."
)
self._filepath = filepath
self._file_format = file_format
self._table_name = table_name
self._connection_config = connection
# Set load and save arguments, overwriting defaults if provided.
self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS)
if load_args is not None:
self._load_args.update(load_args)
self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS)
if save_args is not None:
self._save_args.update(save_args)
self._materialized = self._save_args.pop("materialized")
#self._credentials = deepcopy(credentials) or {}
@property
def connection(self) -> BaseBackend:
cls = type(self)
key = tuple(sorted(self._connection_config.items()))
if key not in cls._connections:
import ibis
config = deepcopy(self._connection_config)
backend = getattr(ibis, config.pop("backend"))
# my janky spark connector #################################################
if config.pop("session") == "_get_spark":
from kedro_datasets.spark.spark_dataset import _get_spark
_session = _get_spark()
cls._connections[key] = backend.connect(session=_session)
else:
cls._connections[key] = backend.connect(**config)
return cls._connections[key]
def _load(self) -> ir.Table:
if self._filepath is not None:
if self._file_format is None:
raise NotImplementedError
reader = getattr(self.connection, f"read_{self._file_format}")
return reader(self._filepath, self._table_name, **self._load_args)
else:
return self.connection.table(self._table_name)
def _save(self, data: ir.Table) -> None:
if self._table_name is None:
raise DatasetError("Must provide `table_name` for materialization.")
writer = getattr(self.connection, f"create_{self._materialized}")
writer(self._table_name, data, **self._save_args)
def _describe(self) -> dict[str, Any]:
return {
"filepath": self._filepath,
"file_format": self._file_format,
"table_name": self._table_name,
"connection_config": self._connection_config,
"load_args": self._load_args,
"save_args": self._save_args,
"materialized": self._materialized,
}
def _exists(self) -> bool:
return (
self._table_name is not None and self._table_name in self.connection.tables
)
Deepyaman Datta
03/20/2024, 6:30 AMsession
in the dataset config. I'm pretty sure this should be achievable using a custom resolver, rather than modifying the dataset: https://docs.kedro.org/en/stable/configuration/advanced_configuration.html#how-to-use-resolvers-in-the-omegaconfigloader
That said, there might be value in finding the session by default for Spark (and maybe some other backends like Flink) in the dataset implementation, to make users' lives simpler.ManagedTableDataset
, but it mostly seems to be a wrapper specifically for reading/writing Spark to Delta? I think this should be feasible to support
I'll try to look a bit more tomorrow; pretty sure half asleep and may be saying incorrect things nowNok Lam Chan
03/20/2024, 8:12 AMDeepyaman Datta
03/20/2024, 2:38 PMUSE CATALOG
or always namespace the database table by catalog. Happy to help figure it out.