Hi kedroids :kedro: We have a usecase in which we...
# questions
a
Hi kedroids K We have a usecase in which we are scheduling bigquery queries to run in a specific order using a kedro pipeline. We use the bigquery client simply to trigger the SQL query on bigquery as follows:
Copy code
def trigger_query_on_bigquery(
    query: str,
):
    client = bigquery.Client()
    query_job = client.query_and_wait(query)

    return True
The kedro dag to schedule multiple queries in order looks as follows:
Copy code
def create_retail_data_primary_pipeline() -> Pipeline:
    nodes = [
        node(
            func=trigger_prm_customer_on_big_query,
            outputs="prm_customer@status",
        ),
        node(
            func=trigger_prm_transaction_detail_ecom_on_big_query,
            inputs=["prm_product_hierarchy@status"],
            outputs="prm_transaction_detail_ecom@status",
        ),
        node(
            func=trigger_prm_transaction_detail_retail_on_big_query,
            inputs=["prm_product_hierarchy@status"],
            outputs="prm_transaction_detail_retail@status",
        ),
        node(
            func=trigger_prm_transaction_detail_on_big_query,
            inputs=[
                "prm_transaction_detail_ecom@status",
                "prm_transaction_detail_retail@status",
                "prm_product_hierarchy@status",
                "prm_customer@status",
            ],
            outputs="prm_transaction_detail@status",
        ),
        node(
            func=trigger_prm_incident_on_big_query,
            outputs="prm_incident@status",
        ),
        node(
            func=trigger_prm_product_hierarchy_on_big_query,
            outputs="prm_product_hierarchy@status",
        ),

    ]
since the node can't output the dataframe itself, we output a transcoded entry with
@status
(which is just
True
), and then use the actual bigquery
spark.SparkDataset
transcoded entry versions of these datasets in downstream pipeline to enforce the order. So I will use
prm_product_hierarchy@bigquery
dataset in a downstream node, just so that kedro runs the query to trigger bigquery query first. Is there a better way to do this?
d
Yeah so I don't typically like this pattern since it's an 'out of Kedro DAG' flow, you're essentially relying on side effects outside of Kedro. I think these days id use our Ibis dataset which will let you keep dataframes in your flow, but importantly delegate execution to BQ.
It will also mean you don't need to use spark
I'm also super interested in seeing where this library goes as you won't even have to change the syntax https://github.com/eakmanrq/sqlframe
a
Yes, we have implemented
ibis
for other usecases, and I am 100% with the decision. The only thing being, we now already have 15 SQL queries made, and converting them to
ibis
isn't possible in the timeframe.
d
So I think you can still do ibis.sql() which will at least give you objects to pass between nodes https://ibis-project.org/how-to/extending/sql
šŸ‘ 1
a
Ah thanks for this. The thing is, we really do not want the result of the query to be returned in momory either as
ibis.Table
or any other object. Let me check if the
.sql
just supports just creating a table, and returning a "reference" to the table
d
I think that's what it does, but may not do the create table - just temp
d
The thing is, we really do not want the result of the query to be returned in momory either as
ibis.Table
or any other object.
Ibis will not materialize it in memory; it will just be a reference, unless you
.execute()
or something similar
a
Got it. So that means I can return
ibis.Table
as an output (
kedro_datasets.ibis.TableDataset
) from my node, and it will create the table in bigquery, and then I can use the same catalog entry to load from bigquery for other nodes. Although I want to load it as a spark dataframe downstream, so I would have to do a custom dataset on top of it. Or create
my_dataset@ibis
,
my_dataset@spark
entries.
d
• You could handover as parquet • You could in fact do more in BQ without spark because of Ibis
a
Downstream nodes are already heavy pyspark API dependant (though ibis can handle it too). So minimizing any additional changes. Also don't wanna do parquet, since I did a lot of performance scaling tests with different partition strategies and spark bigquery connector on gcp is a tad bit faster than parquet on gcs.
d
makes a lot of sense
the other vote of confidence is that BQ built this with Ibis behind the scenes https://cloud.google.com/python/docs/reference/bigframes/latest
⭐ 1
a
Yes,
ibis
is now the way to go. The only thing being, legacy code is still half sql, half pyspark, but if
ibis
promises to be maintained, then it will become a no brainer for sure šŸ’Æ
ibis 1