Hugo Evers
10/07/2024, 1:35 PMNok Lam Chan
10/07/2024, 2:24 PM2. How do you handle type-checking and validation when combining Ibis transformations with ORM models? Do you leverage Pydantic/Pandera here? (subclassing/using the ORM defs?)SQLModel? if you are using sqlalchemy
Nok Lam Chan
10/07/2024, 2:25 PMibis
make sense, not so sure what do you mean by managing schema with ORM. Can you give an example of this?Hugo Evers
10/07/2024, 3:25 PMCampaigns
where each row can be assigned a version to determine which optimizer it will use. We can define this table with SQLAlchemy (or SQLModel, which extends SQLAlchemy) as follows:
from sqlmodel import SQLModel, Field
class Campaigns(SQLModel, table=True):
campaign_id: str = Field(primary_key=True)
version: Optional[str] = None # Field for assigned optimizer version
# other fields...
After defining the table, we can use SQLAlchemy to load data from the database into a pandas dataframe, perform random assignments, and then save the updates back to the database. Here’s an example of a Kedro node function that operates on the loaded data and assigns optimizer versions:
import numpy as np
import pandas as pd
def assign_campaign_versions(campaigns: pd.DataFrame) -> pd.DataFrame:
campaigns['aim_version'] = np.random.choice(["V1", "V2"], size=len(campaigns), p=[0.8, 0.2])
return campaigns
This function assigns a version to each campaign, which can then be saved back to the database. SQLAlchemy helps manage the schema by allowing you to load, modify, and selectively update just the assigned rows. However, if you’re looking to implement similar functionality with Ibis, you’ll find that Ibis’s _save
method typically supports only full inserts rather than partial updates. For ETL workflows that require such fine-grained control, SQLAlchemy provides the schema persistence and precise update capabilities needed.
Custom Dataset for Loading and Saving Campaigns Data
To integrate this functionality into Kedro, we can define a custom dataset, CampaignsMySQLDataset
, which leverages SQLAlchemy to handle I/O operations efficiently. This dataset will load unassigned campaigns into a dataframe, allow assignments, and then save updated rows back to the database:
from <http://kedro.io|kedro.io> import AbstractDataset
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlmodel import Session
from performance_optimisation_engine.database_orm import Campaigns
class CampaignsMySQLDataset(AbstractDataset[pd.DataFrame, pd.DataFrame]):
def __init__(self, connection_string: str):
"""
Initializes a new CampaignsMySQLDataset instance.
Args:
connection_string: A SQLAlchemy-compatible connection string for MySQL.
"""
self.engine = create_engine(connection_string)
self.Session = sessionmaker(bind=self.engine)
def _load(self) -> pd.DataFrame:
"""
Loads unassigned campaigns from the database into a pandas DataFrame.
Returns:
A pandas DataFrame containing campaigns that have no AIM version assigned.
"""
with self.Session() as session:
# Query campaigns without any assigned AIM version
query = session.query(Campaigns).filter(Campaigns.aim_version == None)
# Read into a pandas DataFrame
campaigns_df = pd.read_sql(query.statement, con=session.bind)
return campaigns_df
def _save(self, data: pd.DataFrame) -> None:
"""
Saves the DataFrame back to the database by updating the aim_version column.
Args:
data: A pandas DataFrame containing campaign data with updated AIM versions.
"""
# Convert DataFrame back into SQLModel instances for bulk saving
with self.Session() as session:
campaign_objects = [Campaigns(**row) for row in data.to_dict(orient='records')]
session.bulk_save_objects(campaign_objects)
session.commit()
This custom dataset class handles the loading and saving of data from MySQL, making the ETL process straightforward and consistent with Kedro’s pipeline structure.
Ibis vs. SQLAlchemy for ETL
On the surface, Ibis offers a convenient API for ETL tasks, especially for bulk transformations and data processing, with syntax similar to pandas. However, i am not sure whether Ibis supports fine-grained update operations (like upserts) as SQLAlchemy does. This limitation makes me think that SQLAlchemy more suited to scenarios where you need to selectively update rows, manage schema migrations, or work with existing data structures directly within a Python-based ORM.
However, with increasing queries, sqlalchemy becomes very difficult to scale, and requires a lot of hard to maintain custom datasets. so i am curious whether other folks run into that issue, and whether IBIS solves it for them?