Abhishek Bhatia
10/10/2024, 6:32 AMdef trigger_query_on_bigquery(
query: str,
):
client = bigquery.Client()
query_job = client.query_and_wait(query)
return True
The kedro dag to schedule multiple queries in order looks as follows:
def create_retail_data_primary_pipeline() -> Pipeline:
nodes = [
node(
func=trigger_prm_customer_on_big_query,
outputs="prm_customer@status",
),
node(
func=trigger_prm_transaction_detail_ecom_on_big_query,
inputs=["prm_product_hierarchy@status"],
outputs="prm_transaction_detail_ecom@status",
),
node(
func=trigger_prm_transaction_detail_retail_on_big_query,
inputs=["prm_product_hierarchy@status"],
outputs="prm_transaction_detail_retail@status",
),
node(
func=trigger_prm_transaction_detail_on_big_query,
inputs=[
"prm_transaction_detail_ecom@status",
"prm_transaction_detail_retail@status",
"prm_product_hierarchy@status",
"prm_customer@status",
],
outputs="prm_transaction_detail@status",
),
node(
func=trigger_prm_incident_on_big_query,
outputs="prm_incident@status",
),
node(
func=trigger_prm_product_hierarchy_on_big_query,
outputs="prm_product_hierarchy@status",
),
]
since the node can't output the dataframe itself, we output a transcoded entry with @status
(which is just True
), and then use the actual bigquery spark.SparkDataset
transcoded entry versions of these datasets in downstream pipeline to enforce the order.
So I will use prm_product_hierarchy@bigquery
dataset in a downstream node, just so that kedro runs the query to trigger bigquery query first.
Is there a better way to do this?datajoely
10/10/2024, 7:34 AMdatajoely
10/10/2024, 7:34 AMdatajoely
10/10/2024, 7:35 AMAbhishek Bhatia
10/10/2024, 8:11 AMibis
for other usecases, and I am 100% with the decision. The only thing being, we now already have 15 SQL queries made, and converting them to ibis
isn't possible in the timeframe.datajoely
10/10/2024, 8:13 AMAbhishek Bhatia
10/10/2024, 9:00 AMibis.Table
or any other object. Let me check if the .sql
just supports just creating a table, and returning a "reference" to the tabledatajoely
10/10/2024, 9:09 AMDeepyaman Datta
10/10/2024, 12:39 PMThe thing is, we really do not want the result of the query to be returned in momory either asIbis will not materialize it in memory; it will just be a reference, unless youor any other object.ibis.Table
.execute()
or something similarAbhishek Bhatia
10/10/2024, 12:44 PMibis.Table
as an output (kedro_datasets.ibis.TableDataset
) from my node, and it will create the table in bigquery, and then I can use the same catalog entry to load from bigquery for other nodes. Although I want to load it as a spark dataframe downstream, so I would have to do a custom dataset on top of it.
Or create my_dataset@ibis
, my_dataset@spark
entries.datajoely
10/10/2024, 12:44 PMAbhishek Bhatia
10/10/2024, 12:46 PMdatajoely
10/10/2024, 12:46 PMdatajoely
10/10/2024, 12:47 PMAbhishek Bhatia
10/10/2024, 12:49 PMibis
is now the way to go. The only thing being, legacy code is still half sql, half pyspark, but if ibis
promises to be maintained, then it will become a no brainer for sure šÆ