https://kedro.org/ logo
#questions
Title
# questions
o

Olivier Ho

03/17/2023, 11:06 AM
if you use yield in a node to obtain an iterable, how can we store it in a partitioned dataset as the partitioned dataset requires a dictionary
d

datajoely

03/17/2023, 11:12 AM
You would have to cast it to a dictionary somehow before saving
o

Olivier Ho

03/17/2023, 11:13 AM
hum, but does it support incremental saving?
d

datajoely

03/17/2023, 11:14 AM
so do you mean lazy saving in one run or incremental in multiple runs?
d

Deepyaman Datta

03/17/2023, 4:01 PM
What kind of behavior are you looking for @Olivier Ho? As it stands,
ParititionedDataSet
requires a mapping from partition key to partition data/func. So, are you imagining you could return an iterable of
{key: data_or_func}
from a node, and save that to a
PartitionedDataSet
?
o

Olivier Ho

03/17/2023, 4:02 PM
well, yes, that was my first expectation
as my issue is, i'm not sure of what would happen if I don't have enough memory to create the mapping
d

datajoely

03/17/2023, 4:04 PM
so if you use generators you can do this lazily!
o

Olivier Ho

03/17/2023, 4:05 PM
but then, the dataset will have to a pickle dataset
d

Deepyaman Datta

03/17/2023, 4:05 PM
I think (haven't tested it yet) you can't currently save an iterable to
PartitionedDataSet
, as you rightly point out, so this would require some modification. Re memory concern, if you return an iterable of
{key: func_that_generates_data}
rather than
{func: data}
, I think you can avoid this.
d

datajoely

03/17/2023, 4:06 PM
Yes - what @Deepyaman Datta said is what I meant to say
d

Deepyaman Datta

03/17/2023, 4:06 PM
but then, the dataset will have to a pickle dataset
What do you mean by this?
o

Olivier Ho

03/17/2023, 4:08 PM
well, I am familiar with generator, I actually used a generator for another project using kedro, what I mean is that the node function will have to output an Iterator, which can only be serialized using pickleddataset, unless I am wrong
d

Deepyaman Datta

03/17/2023, 4:09 PM
But I thought you're (ideally) asking to output the node to a
PartitionedDataSet
?
o

Olivier Ho

03/17/2023, 4:10 PM
yes, so let's say for example I have a folder of images, I used partioned dataset to make a grep of those images, then I have to parse them and store the intermediate values in another folder
ideally, I want to store in another partitioned dataset without waiting for all processing to be done
btw, now that I think about it, what are the possible dataset type for a generator function?
d

Deepyaman Datta

03/17/2023, 4:26 PM
Let's say you have a pipeline:
Copy code
def process_images(raw_images: dict[str, Callable[[], np.ndarray]]) -> dict[str, Callable[[], np.ndarray]]:
    results = {}
    for partition_id, partition_load_func in raw_images:
        results[partition_id] = lambda: do_stuff(partition_load_func())
    return results


pipeline([node(process_images, "input_images", "output_images")])
I think this is doable, and will process partitions one by one after the
after_node_run
(see https://docs.kedro.org/en/stable/data/kedro_io.html#partitioned-dataset-lazy-saving for a bit more info). So this way, you avoid ever having multiple partitions in memory at once, if that's the goal.
But there's no way to do lazy evaluation of data for partition across nodes (I'm pretty sure), so you still essentially block after each node.
As in, if you have 3 partitions of data (A, B, C) and two nodes (
first
and
second
), no matter how you go about it, you need to wait for
first
to process A, B, C before going on to any processing in
second
.
o

Olivier Ho

03/17/2023, 4:31 PM
yes, that was actually my first idea, inspired by the partitioned dataset code, which btw is brilliant. That doesn't solve my problem though, as I want to avoid to repeat the processing
your last mention is really interesting
so you cannot use the same dataset in the input and output of the same node?
to check for names for example?
d

Deepyaman Datta

03/17/2023, 4:31 PM
If you want something more lazy than this across nodes, this is approaching stream processing, such as what https://github.com/robinhood/faust offers (it's not maintained, there's some community developed fork somewhere, but just for inspiration).
so you cannot use the same dataset in the input and output of the same node?
No, your pipeline wouldn't be a DAG
to check for names for example?
Not sure I understand this
👍 1
o

Olivier Ho

03/17/2023, 4:33 PM
well, for example, to avoid processing, if I could check if the target already exists, i skip
true
d

Deepyaman Datta

03/17/2023, 4:36 PM
Hmm... so there's https://github.com/kedro-org/kedro/issues/2410 and https://docs.kedro.org/en/stable/kedro.io.IncrementalDataSet.html (two different things) I'm not sure if either is applicable, because I don't think I understand quite what issue you're looking to solve here/what you're trying to optimize for. Maybe somebody else understands, but for me a small example code with current behavior + what you want to happen could be helpful.
o

Olivier Ho

03/17/2023, 4:41 PM
I have a folder in s3 containing pdfs, I would like to store the text content of all pdfs in separate file for each page of each pdfs so my node is like this
Copy code
def pdf_extraction(pdf_page_path: Dict[str,Callable]) -> Iterable[Dict[str,bytes]]:
    for name, callable in pdf_page_path.items():
        doc = fitz.open("pdf",stream=io.BytesIO(callable()))  # open PDF input for extraction
        # Extraction
        # get plain text (is in UTF-8)
        for page_id,page in enumerate(doc):
            text = page.get_text().encode("utf8")
            yield {f'{name}_{page_id}':text}
those txts are then used in another process
now, if I don't use the yield, I could create a dataframe and then I could store it simply
if I have memories issues though, I could as you said use something like
{f'{name}_{page_id}':txt_generator()}
but I have to create another function that is really similar to the function above, except that the txt_generator is replaced by a costly operation (in time and money), so I would like to avoid to use generators that have to be evaluated each time, the element
'{name}_{page_id}'
has to be accessed
so my issue is what happens, if the operation is costly and I cannot afford to repeat the same operation for each pdfs...
d

Deepyaman Datta

03/17/2023, 5:03 PM
but I have to create another function that is really similar to the function above, except that the txt_generator is replaced by a costly operation (in time and money), so I would like to avoid to use generators that have to be evaluated each time, the element
'{name}_{page_id}'
has to be accessed
Ah, you mean if you have multiple nodes consuming that
PartitionedDataSet
?
o

Olivier Ho

03/17/2023, 5:03 PM
yes
d

Deepyaman Datta

03/17/2023, 5:12 PM
If you save to a
PartitionedDataSet
instead of returning an iterable, won't it save the bytes result to disk for each partition, so you don't need to worry about performing the expensive conversion? You'll just need to read from disk. --- I was writing this, but think it's unnecessary: If you want a dumb hack workaround, you could add another node after your
pdf_extraction
node:
Copy code
def identity(text_mapping: Dict[str, Callable[[], bytes]]) -> Dict[str, Callable[[], bytes]]:
    return {key, lambda: func() for key, func in text_mapping}
(something like that) so you're force writing to disk after processing, so you wouldn't do the conversion each time but rather read processed data