Hi all, I’m working on a project for a large comp...
# questions
h
Hi all, I’m working on a project for a large company and facing challenges with SQLAlchemy in my Kedro pipelines. The inherited setup involves a combination of raw SQL, SQLAlchemy, and pandas transformations, which has resulted in a codebase that’s not DRY and very challenging to maintain. We handle millions of bids for multiple clients, and I want to simplify the ETL flow while improving developer experience. I recently read about using Ibis with Kedro to streamline SQL processing, as discussed in this Kedro blog post. I’m considering adopting Ibis for its dataframe-oriented syntax but still want to retain ORM capabilities for defining tables and managing migrations through Alembic. My goal is to separate table definitions and transformations more cleanly, leveraging Ibis to handle lazy transformations, while using ORM for type hinting and dependency management. Some specific questions: 1. Has anyone combined Ibis with SQLAlchemy ORM for production systems? If so, what was your experience in terms of code maintainability and performance? 2. How do you handle type-checking and validation when combining Ibis transformations with ORM models? Do you leverage Pydantic/Pandera here? (subclassing/using the ORM defs?) 3. I’d like to specify transformations in a pandas-like syntax (Ibis) and manage table schemas with SQLAlchemy ORM. How feasible is this approach in Kedro, especially for complex, large-scale ETL projects? Any insights or experiences would be greatly appreciated!
👀 1
n
2. How do you handle type-checking and validation when combining Ibis transformations with ORM models? Do you leverage Pydantic/Pandera here? (subclassing/using the ORM defs?)
SQLModel? if you are using sqlalchemy
ETL with
ibis
make sense, not so sure what do you mean by managing schema with ORM. Can you give an example of this?
h
Using an ORM like SQLAlchemy to manage schema can be particularly useful in complex ETL scenarios, such as A/B testing with a pipeline that assigns different optimizers to campaigns. For example, suppose we have a table
Campaigns
where each row can be assigned a version to determine which optimizer it will use. We can define this table with SQLAlchemy (or SQLModel, which extends SQLAlchemy) as follows:
Copy code
from sqlmodel import SQLModel, Field

class Campaigns(SQLModel, table=True):
    campaign_id: str = Field(primary_key=True)
    version: Optional[str] = None  # Field for assigned optimizer version
    # other fields...
After defining the table, we can use SQLAlchemy to load data from the database into a pandas dataframe, perform random assignments, and then save the updates back to the database. Here’s an example of a Kedro node function that operates on the loaded data and assigns optimizer versions:
Copy code
import numpy as np
import pandas as pd

def assign_campaign_versions(campaigns: pd.DataFrame) -> pd.DataFrame:
    campaigns['aim_version'] = np.random.choice(["V1", "V2"], size=len(campaigns), p=[0.8, 0.2])
    return campaigns
This function assigns a version to each campaign, which can then be saved back to the database. SQLAlchemy helps manage the schema by allowing you to load, modify, and selectively update just the assigned rows. However, if you’re looking to implement similar functionality with Ibis, you’ll find that Ibis’s
_save
method typically supports only full inserts rather than partial updates. For ETL workflows that require such fine-grained control, SQLAlchemy provides the schema persistence and precise update capabilities needed. Custom Dataset for Loading and Saving Campaigns Data To integrate this functionality into Kedro, we can define a custom dataset,
CampaignsMySQLDataset
, which leverages SQLAlchemy to handle I/O operations efficiently. This dataset will load unassigned campaigns into a dataframe, allow assignments, and then save updated rows back to the database:
Copy code
from <http://kedro.io|kedro.io> import AbstractDataset
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlmodel import Session
from performance_optimisation_engine.database_orm import Campaigns

class CampaignsMySQLDataset(AbstractDataset[pd.DataFrame, pd.DataFrame]):
    def __init__(self, connection_string: str):
        """
        Initializes a new CampaignsMySQLDataset instance.

        Args:
            connection_string: A SQLAlchemy-compatible connection string for MySQL.
        """
        self.engine = create_engine(connection_string)
        self.Session = sessionmaker(bind=self.engine)

    def _load(self) -> pd.DataFrame:
        """
        Loads unassigned campaigns from the database into a pandas DataFrame.

        Returns:
            A pandas DataFrame containing campaigns that have no AIM version assigned.
        """
        with self.Session() as session:
            # Query campaigns without any assigned AIM version
            query = session.query(Campaigns).filter(Campaigns.aim_version == None)
            # Read into a pandas DataFrame
            campaigns_df = pd.read_sql(query.statement, con=session.bind)
        return campaigns_df

    def _save(self, data: pd.DataFrame) -> None:
        """
        Saves the DataFrame back to the database by updating the aim_version column.

        Args:
            data: A pandas DataFrame containing campaign data with updated AIM versions.
        """
        # Convert DataFrame back into SQLModel instances for bulk saving
        with self.Session() as session:
            campaign_objects = [Campaigns(**row) for row in data.to_dict(orient='records')]
            session.bulk_save_objects(campaign_objects)
            session.commit()
This custom dataset class handles the loading and saving of data from MySQL, making the ETL process straightforward and consistent with Kedro’s pipeline structure. Ibis vs. SQLAlchemy for ETL On the surface, Ibis offers a convenient API for ETL tasks, especially for bulk transformations and data processing, with syntax similar to pandas. However, i am not sure whether Ibis supports fine-grained update operations (like upserts) as SQLAlchemy does. This limitation makes me think that SQLAlchemy more suited to scenarios where you need to selectively update rows, manage schema migrations, or work with existing data structures directly within a Python-based ORM. However, with increasing queries, sqlalchemy becomes very difficult to scale, and requires a lot of hard to maintain custom datasets. so i am curious whether other folks run into that issue, and whether IBIS solves it for them?
👍 1