if you use yield in a node to obtain an iterable, ...
# questions
o
if you use yield in a node to obtain an iterable, how can we store it in a partitioned dataset as the partitioned dataset requires a dictionary
d
You would have to cast it to a dictionary somehow before saving
o
hum, but does it support incremental saving?
d
so do you mean lazy saving in one run or incremental in multiple runs?
d
What kind of behavior are you looking for @Olivier Ho? As it stands,
ParititionedDataSet
requires a mapping from partition key to partition data/func. So, are you imagining you could return an iterable of
{key: data_or_func}
from a node, and save that to a
PartitionedDataSet
?
o
well, yes, that was my first expectation
as my issue is, i'm not sure of what would happen if I don't have enough memory to create the mapping
d
so if you use generators you can do this lazily!
o
but then, the dataset will have to a pickle dataset
d
I think (haven't tested it yet) you can't currently save an iterable to
PartitionedDataSet
, as you rightly point out, so this would require some modification. Re memory concern, if you return an iterable of
{key: func_that_generates_data}
rather than
{func: data}
, I think you can avoid this.
d
Yes - what @Deepyaman Datta said is what I meant to say
d
but then, the dataset will have to a pickle dataset
What do you mean by this?
o
well, I am familiar with generator, I actually used a generator for another project using kedro, what I mean is that the node function will have to output an Iterator, which can only be serialized using pickleddataset, unless I am wrong
d
But I thought you're (ideally) asking to output the node to a
PartitionedDataSet
?
o
yes, so let's say for example I have a folder of images, I used partioned dataset to make a grep of those images, then I have to parse them and store the intermediate values in another folder
ideally, I want to store in another partitioned dataset without waiting for all processing to be done
btw, now that I think about it, what are the possible dataset type for a generator function?
d
Let's say you have a pipeline:
Copy code
def process_images(raw_images: dict[str, Callable[[], np.ndarray]]) -> dict[str, Callable[[], np.ndarray]]:
    results = {}
    for partition_id, partition_load_func in raw_images:
        results[partition_id] = lambda: do_stuff(partition_load_func())
    return results


pipeline([node(process_images, "input_images", "output_images")])
I think this is doable, and will process partitions one by one after the
after_node_run
(see https://docs.kedro.org/en/stable/data/kedro_io.html#partitioned-dataset-lazy-saving for a bit more info). So this way, you avoid ever having multiple partitions in memory at once, if that's the goal.
But there's no way to do lazy evaluation of data for partition across nodes (I'm pretty sure), so you still essentially block after each node.
As in, if you have 3 partitions of data (A, B, C) and two nodes (
first
and
second
), no matter how you go about it, you need to wait for
first
to process A, B, C before going on to any processing in
second
.
o
yes, that was actually my first idea, inspired by the partitioned dataset code, which btw is brilliant. That doesn't solve my problem though, as I want to avoid to repeat the processing
your last mention is really interesting
so you cannot use the same dataset in the input and output of the same node?
to check for names for example?
d
If you want something more lazy than this across nodes, this is approaching stream processing, such as what https://github.com/robinhood/faust offers (it's not maintained, there's some community developed fork somewhere, but just for inspiration).
so you cannot use the same dataset in the input and output of the same node?
No, your pipeline wouldn't be a DAG
to check for names for example?
Not sure I understand this
👍 1
o
well, for example, to avoid processing, if I could check if the target already exists, i skip
true
d
Hmm... so there's https://github.com/kedro-org/kedro/issues/2410 and https://docs.kedro.org/en/stable/kedro.io.IncrementalDataSet.html (two different things) I'm not sure if either is applicable, because I don't think I understand quite what issue you're looking to solve here/what you're trying to optimize for. Maybe somebody else understands, but for me a small example code with current behavior + what you want to happen could be helpful.
o
I have a folder in s3 containing pdfs, I would like to store the text content of all pdfs in separate file for each page of each pdfs so my node is like this
Copy code
def pdf_extraction(pdf_page_path: Dict[str,Callable]) -> Iterable[Dict[str,bytes]]:
    for name, callable in pdf_page_path.items():
        doc = fitz.open("pdf",stream=io.BytesIO(callable()))  # open PDF input for extraction
        # Extraction
        # get plain text (is in UTF-8)
        for page_id,page in enumerate(doc):
            text = page.get_text().encode("utf8")
            yield {f'{name}_{page_id}':text}
those txts are then used in another process
now, if I don't use the yield, I could create a dataframe and then I could store it simply
if I have memories issues though, I could as you said use something like
{f'{name}_{page_id}':txt_generator()}
but I have to create another function that is really similar to the function above, except that the txt_generator is replaced by a costly operation (in time and money), so I would like to avoid to use generators that have to be evaluated each time, the element
'{name}_{page_id}'
has to be accessed
so my issue is what happens, if the operation is costly and I cannot afford to repeat the same operation for each pdfs...
d
but I have to create another function that is really similar to the function above, except that the txt_generator is replaced by a costly operation (in time and money), so I would like to avoid to use generators that have to be evaluated each time, the element
'{name}_{page_id}'
has to be accessed
Ah, you mean if you have multiple nodes consuming that
PartitionedDataSet
?
o
yes
d
If you save to a
PartitionedDataSet
instead of returning an iterable, won't it save the bytes result to disk for each partition, so you don't need to worry about performing the expensive conversion? You'll just need to read from disk. --- I was writing this, but think it's unnecessary: If you want a dumb hack workaround, you could add another node after your
pdf_extraction
node:
Copy code
def identity(text_mapping: Dict[str, Callable[[], bytes]]) -> Dict[str, Callable[[], bytes]]:
    return {key, lambda: func() for key, func in text_mapping}
(something like that) so you're force writing to disk after processing, so you wouldn't do the conversion each time but rather read processed data