Hugo Barreto
01/10/2025, 4:30 PM_SQLQueryDataset: &SQLquery
type: pandas.SQLQueryDataset
credentials: db_espelho
load_args:
chunksize: 5 # testing only
"DB.table1":
<<: *SQLquery
sql: ...
"01_raw.{dataset}":
type: partitions.PartitionedDataset
path: data/01_raw/{dataset}
dataset:
type: pandas.CSVDataset
save_args:
index: False
filename_suffix: ".csv"
nodes.py
def create_partitions(data_chunks: Iterator[DataFrame]) -> dict[str, Any]:
return {f"part-{i:02d}": data for i, data in enumerate(data_chunks)}
The problem that I see here is that in the create_partions
function all chunks are loaded into memory. Is there a way to avoid that so I can save each chunk at a time?
An alternative solution is to use a custom CSVDataset as in this doc instead of PartitionedDataset. However, I create a huge csv file that I'll have to process it down the line.
I'm open to any suggestions you might have. I preferer using pure Kedro for now, but if there is a plugin for an open-source tool and both of them (plugin and tool) are easy to setup, I'll be glad to try it.
Bonus question:
One of the tables is a transactions table, so I just need to query the previous day entries. Is it possible to do with kedro only?Hall
01/10/2025, 4:30 PMDeepyaman Datta
01/10/2025, 9:53 PMibis.TableDataset
to work with most databases. There's some information on https://kedro.org/blog/building-scalable-data-pipelines-with-kedro-and-ibis (this is outdated; you can use the dataset that is available via Kedro-Datasets now), and other posts.
One of the tables is a transactions table, so I just need to query the previous day entries. Is it possible to do with kedro only?Sure; in the end, this should manifest as a filter in your query that gets executed on the database.