When using BigQuery Datasets how do you define a d...
# questions
j
When using BigQuery Datasets how do you define a default dataset project wide?
d
So you can define defaults this way https://docs.kedro.org/en/stable/data/kedro_dataset_factories.html But I would encourage Ibis these days over any of the pandas datadets when working with sql
l
@Jannik Wiedenhaupt Are you referring to
dataset
in this case as a Kedro or BigQuery concept? If you like to use a default BigQuery dataset project wide, you can set that via a global.
Copy code
# Globals.yml
bq_dataset: dataset_name_here
Copy code
# Catalog.yaml
dataset:
  type: YouBigQuereBigQueryTableDatasetHere
  dataset: ${globals:bq_dataset}
  table: nodes
  filepath: ${globals:paths.int}/rtx_kg2/nodes
  save_args:
    mode: overwrite
    labels:
      kg: rtx_kg2
      git_sha: ${globals:git_sha}
We have a custom implementation of a BigQuery dataset that uses Spark under the hood, but registers the table in BigQuery as an external dataset for interactive analysis. Happy to share if that helps you.
j
Thank you Laurens, that's what I was looking for!
l
Copy code
class BigQueryTableDataset(SparkDataset):
    """Implementation fo a BigQueryTableDataset.

    The class delegates dataset save and load invocations to the native SparkDataset
    and registers the dataset into BigQuery through External Data Configuration.
    """

    def __init__(  # noqa: PLR0913
        self,
        *,
        filepath: str,
        project_id: str,
        dataset: str,
        table: str,
        identifier: str,
        file_format: str,
        load_args: dict[str, Any] = None,
        save_args: dict[str, Any] = None,
        version: Version = None,
        credentials: dict[str, Any] = None,
        metadata: dict[str, Any] = None,
        **kwargs,
    ) -> None:
        """Creates a new instance of ``BigQueryTableDataset``.

        Args:
            project_id: project identifier.
            dataset: Name of the BigQuery dataset.
            table: name of the table.
            identifier: unique identfier of the table.
            file_format: file format to use
            load_args: Arguments to pass to the load method.
            save_args: Arguments to pass to the save
            version: Version of the dataset.
            credentials: Credentials to connect to the Neo4J instance.
            metadata: Metadata to pass to neo4j connector.
            kwargs: Keyword Args passed to parent.
        """
        self._project_id = project_id
        self._path = filepath
        self._format = file_format
        self._labels = save_args.pop("labels", {})

        self._table = self._sanitize_name(f"{table}_{identifier}")
        self._dataset_id = f"{self._project_id}.{self._sanitize_name(dataset)}"

        self._client = bigquery.Client(project=self._project_id)

        super().__init__(
            filepath=filepath,
            file_format=file_format,
            save_args=save_args,
            load_args=load_args,
            credentials=credentials,
            version=version,
            metadata=metadata,
            **kwargs,
        )

    def _load(self) -> Any:
        SparkHooks._initialize_spark()
        return super()._load()

    def _save(self, data: DataFrame) -> None:
        # Invoke saving of the underlying spark dataset
        super()._save(data)

        # Ensure dataset exists
        self._create_dataset()

        # Create external table
        external_config = bigquery.ExternalConfig(self._format.upper())
        external_config.source_uris = [f"{self._path}/*.{self._format}"]

        # Register the external table
        table = bigquery.Table(f"{self._dataset_id}.{self._table}")
        table.labels = self._labels
        table.external_data_configuration = external_config
        table = self._client.create_table(table, exists_ok=False)

    def _create_dataset(self) -> str:
        try:
            self._client.get_dataset(self._dataset_id)
            print(f"Dataset {self._dataset_id} already exists")
        except exceptions.NotFound:
            print(f"Dataset {self._dataset_id} is not found, will attempt creating it now.")

            # Dataset doesn't exist, so let's create it
            dataset = bigquery.Dataset(self._dataset_id)
            # dataset.location = "US"  # Specify the location, e.g., "US" or "EU"

            dataset = self._client.create_dataset(dataset, timeout=30)
            print(f"Created dataset {self._project_id}.{dataset.dataset_id}")

    @staticmethod
    def _sanitize_name(name: str) -> str:
        """Function to sanitise BigQuery table or dataset identifiers.

        Args:
            name: str
        Returns:
            Sanitized name
        """
        return re.sub(r"[^a-zA-Z0-9_]", "_", str(name))
This is an implementation that loads and saves the table as a plain dataset, but registers in BigQuery for exploratory analysis (which is more cost effective than storing in BQ directly depending on your data access patterns)
j
Thank you!
@datajoely What is the advantage of ibis? Since there are no bigquery specific ibis datasets in kedro, I am assuming you just configure the ibis.TableDataset?
d
Yeah ibis generates sql behind the scenes so there is minimal infrastructure overhead and also the same code works with 20+ alternative backends
I also like the idea of using BigQuery in prod, but duckdb in dev
l
yeah I think that's quite cool and I think it kinda depends on ur pipeline setup. We have a base
env
that uses Spark datasets all over the place, and then a
cloud
env that uses the dataset above to move some stuff to BigQuery (so we don't really use BQ as a query engine).
👍 1
j
On a similar note, how do you guys provide parameters in sql queries? It seems like the GBQQueryDataset doesn't have the option compared to pandas.SQLQueryDataset
d
On a similar note, how do you guys provide parameters in sql queries? It seems like the GBQQueryDataset doesn't have the option compared to pandas.SQLQueryDataset
This is another argument for Ibis, you'll have to write a custom dataset to do that
👍 1