https://kedro.org/ logo
#user-research
Title
# user-research
j

Juan Luis

12/07/2023, 11:57 AM
open question for #user-research: do you use Kedro for ETL pipelines? if so, what good practices do you follow to enable idempotency and efficiency? for example, let's say I'm (E)xtracting data from some API endpoint that has paginated results and (L)oading it in some database of mine. ideally, if I run the pipeline twice, it would only extract the data starting from the last loaded data point, and then it would append the data at the end. this somehow requires that the pipeline reads the status of the loaded data before starting the extraction. but this creates a cycle in the DAG (
A node cannot have the same inputs and outputs
) so it requires you to define a read-only version of the dataset and an appendable version, both referring to the same underlying storage. any thoughts on this approach?
m

Michaël Jeckmans

12/07/2023, 12:23 PM
This is where create if not exists and update/insert can come in handy if you want the one approach to deal with it all. Alternatively you could leverage a last_updated timestamp for which you'd query its max value and then use that as a reference point for the data you're trying to modify. Will this be a table that you only append to or will you also be changing records?
j

Juan Luis

12/07/2023, 12:24 PM
good question @Michaël Jeckmans - on first approximation, append-only, but of course this is somewhat naïve because it doesn't account for changes in the upstream schema ("change data capture"). for now thought I'm only interested in the simplified case.
m

Michaël Jeckmans

12/07/2023, 12:27 PM
A hacky way would be to store a local file with a timestamp of the last set that was uploaded and modify that each time you upload new data. That local file becomes the reference point, but if others modify the table or if you run into any upload issues that becomes a point of failure real fast.
d

datajoely

12/07/2023, 12:38 PM
I would say there are ways of doing this, but Kedro’s design around reproducibility makes it uncomfortable doing anything like the UD part of CRUD
j

Juan Luis

12/07/2023, 1:31 PM
the tricky thing is that, ideally, the data (E)xtraction, which should be performed in a Dataset (since it's I/O), cannot be made efficient because the cursor, i.e. the starting point of the extraction, is dynamic.
Copy code
raw_statuses:
  type: APIDataset
  args:
    start_id: ???
so, unless I'm missing something, you're forced to always extract everything (inefficient), return some sort of lazy reader that then can receive parameters passed to the node (possibly the most promising option), or hack
d

datajoely

12/07/2023, 1:36 PM
yup - it’s very much designed with the assumption things are static
without breaking the node / catalog seperation it’s hard to broach
I wonder if we could do expose some sort of runtime parameterisation to the catalog load hooks
m

Matthias Roels

12/08/2023, 6:44 AM
Why not set the datestamp you need as an env var and use that in your catalog file?
Or you can wrap your kedro run cmd in a shell script that first appends a datestamp (from a txt file or something) to a globals.yml file?
j

Juan Luis

12/08/2023, 6:58 PM
env vars always seem like a escape hatch 😅
going back to my original reasoning > this somehow requires that the pipeline reads the status of the loaded data before starting the extraction just realised that it's similar to the concept of Singer Tap https://github.com/singer-io/getting-started/blob/master/docs/SPEC.md > A Tap is an application that takes a configuration file and an optional state file as input and produces an ordered stream of record, state and schema messages as output. I've been trying to understand how
IncrementalDataset
can address this with the help of @Deepyaman Datta
👍 2
i

Iñigo Hidalgo

12/18/2023, 9:08 PM
We don't have a clean solution for this, but we build most of our new pipelines using a "runtime" parameter which modifies a lot of node behavior, so we have one pipeline which queries the data and returns a list of dates which need to be updated, then we dynamically run the extraction pipeline for those dates by modifying the runtime parameter. This isn't really scalable, we basically write the logic ad-hoc for each of the pipelines that we need this functionality for. IncrementalDataset or another abstraction like it could probably help.
lol oops I just realized this is 11 days old sorry
j

Juan Luis

12/18/2023, 10:30 PM
11 days old but still pretty much alive 😄 https://github.com/kedro-org/kedro-plugins/issues/471