Jannik Wiedenhaupt
11/01/2024, 1:59 AMdatajoely
11/01/2024, 2:23 AMLaurens Vijnck
11/01/2024, 7:42 AMdataset
in this case as a Kedro or BigQuery concept? If you like to use a default BigQuery dataset project wide, you can set that via a global.
# Globals.yml
bq_dataset: dataset_name_here
# Catalog.yaml
dataset:
type: YouBigQuereBigQueryTableDatasetHere
dataset: ${globals:bq_dataset}
table: nodes
filepath: ${globals:paths.int}/rtx_kg2/nodes
save_args:
mode: overwrite
labels:
kg: rtx_kg2
git_sha: ${globals:git_sha}
We have a custom implementation of a BigQuery dataset that uses Spark under the hood, but registers the table in BigQuery as an external dataset for interactive analysis. Happy to share if that helps you.Jannik Wiedenhaupt
11/01/2024, 2:10 PMLaurens Vijnck
11/01/2024, 2:16 PMclass BigQueryTableDataset(SparkDataset):
"""Implementation fo a BigQueryTableDataset.
The class delegates dataset save and load invocations to the native SparkDataset
and registers the dataset into BigQuery through External Data Configuration.
"""
def __init__( # noqa: PLR0913
self,
*,
filepath: str,
project_id: str,
dataset: str,
table: str,
identifier: str,
file_format: str,
load_args: dict[str, Any] = None,
save_args: dict[str, Any] = None,
version: Version = None,
credentials: dict[str, Any] = None,
metadata: dict[str, Any] = None,
**kwargs,
) -> None:
"""Creates a new instance of ``BigQueryTableDataset``.
Args:
project_id: project identifier.
dataset: Name of the BigQuery dataset.
table: name of the table.
identifier: unique identfier of the table.
file_format: file format to use
load_args: Arguments to pass to the load method.
save_args: Arguments to pass to the save
version: Version of the dataset.
credentials: Credentials to connect to the Neo4J instance.
metadata: Metadata to pass to neo4j connector.
kwargs: Keyword Args passed to parent.
"""
self._project_id = project_id
self._path = filepath
self._format = file_format
self._labels = save_args.pop("labels", {})
self._table = self._sanitize_name(f"{table}_{identifier}")
self._dataset_id = f"{self._project_id}.{self._sanitize_name(dataset)}"
self._client = bigquery.Client(project=self._project_id)
super().__init__(
filepath=filepath,
file_format=file_format,
save_args=save_args,
load_args=load_args,
credentials=credentials,
version=version,
metadata=metadata,
**kwargs,
)
def _load(self) -> Any:
SparkHooks._initialize_spark()
return super()._load()
def _save(self, data: DataFrame) -> None:
# Invoke saving of the underlying spark dataset
super()._save(data)
# Ensure dataset exists
self._create_dataset()
# Create external table
external_config = bigquery.ExternalConfig(self._format.upper())
external_config.source_uris = [f"{self._path}/*.{self._format}"]
# Register the external table
table = bigquery.Table(f"{self._dataset_id}.{self._table}")
table.labels = self._labels
table.external_data_configuration = external_config
table = self._client.create_table(table, exists_ok=False)
def _create_dataset(self) -> str:
try:
self._client.get_dataset(self._dataset_id)
print(f"Dataset {self._dataset_id} already exists")
except exceptions.NotFound:
print(f"Dataset {self._dataset_id} is not found, will attempt creating it now.")
# Dataset doesn't exist, so let's create it
dataset = bigquery.Dataset(self._dataset_id)
# dataset.location = "US" # Specify the location, e.g., "US" or "EU"
dataset = self._client.create_dataset(dataset, timeout=30)
print(f"Created dataset {self._project_id}.{dataset.dataset_id}")
@staticmethod
def _sanitize_name(name: str) -> str:
"""Function to sanitise BigQuery table or dataset identifiers.
Args:
name: str
Returns:
Sanitized name
"""
return re.sub(r"[^a-zA-Z0-9_]", "_", str(name))
Laurens Vijnck
11/01/2024, 2:17 PMJannik Wiedenhaupt
11/01/2024, 2:17 PMJannik Wiedenhaupt
11/01/2024, 2:19 PMdatajoely
11/01/2024, 2:32 PMdatajoely
11/01/2024, 2:33 PMLaurens Vijnck
11/01/2024, 2:34 PMenv
that uses Spark datasets all over the place, and then a cloud
env that uses the dataset above to move some stuff to BigQuery (so we don't really use BQ as a query engine).Jannik Wiedenhaupt
11/01/2024, 4:25 PMdatajoely
11/01/2024, 4:32 PMOn a similar note, how do you guys provide parameters in sql queries? It seems like the GBQQueryDataset doesn't have the option compared to pandas.SQLQueryDatasetThis is another argument for Ibis, you'll have to write a custom dataset to do that