Olivier Ho
03/17/2023, 11:06 AMdatajoely
03/17/2023, 11:12 AMOlivier Ho
03/17/2023, 11:13 AMdatajoely
03/17/2023, 11:14 AMDeepyaman Datta
03/17/2023, 4:01 PMParititionedDataSet
requires a mapping from partition key to partition data/func. So, are you imagining you could return an iterable of {key: data_or_func}
from a node, and save that to a PartitionedDataSet
?Olivier Ho
03/17/2023, 4:02 PMdatajoely
03/17/2023, 4:04 PMOlivier Ho
03/17/2023, 4:05 PMDeepyaman Datta
03/17/2023, 4:05 PMPartitionedDataSet
, as you rightly point out, so this would require some modification.
Re memory concern, if you return an iterable of {key: func_that_generates_data}
rather than {func: data}
, I think you can avoid this.datajoely
03/17/2023, 4:06 PMDeepyaman Datta
03/17/2023, 4:06 PMbut then, the dataset will have to a pickle datasetWhat do you mean by this?
Olivier Ho
03/17/2023, 4:08 PMDeepyaman Datta
03/17/2023, 4:09 PMPartitionedDataSet
?Olivier Ho
03/17/2023, 4:10 PMDeepyaman Datta
03/17/2023, 4:26 PMdef process_images(raw_images: dict[str, Callable[[], np.ndarray]]) -> dict[str, Callable[[], np.ndarray]]:
results = {}
for partition_id, partition_load_func in raw_images:
results[partition_id] = lambda: do_stuff(partition_load_func())
return results
pipeline([node(process_images, "input_images", "output_images")])
I think this is doable, and will process partitions one by one after the after_node_run
(see https://docs.kedro.org/en/stable/data/kedro_io.html#partitioned-dataset-lazy-saving for a bit more info).
So this way, you avoid ever having multiple partitions in memory at once, if that's the goal.first
and second
), no matter how you go about it, you need to wait for first
to process A, B, C before going on to any processing in second
.Olivier Ho
03/17/2023, 4:31 PMDeepyaman Datta
03/17/2023, 4:31 PMso you cannot use the same dataset in the input and output of the same node?No, your pipeline wouldn't be a DAG
to check for names for example?Not sure I understand this
Olivier Ho
03/17/2023, 4:33 PMDeepyaman Datta
03/17/2023, 4:36 PMOlivier Ho
03/17/2023, 4:41 PMdef pdf_extraction(pdf_page_path: Dict[str,Callable]) -> Iterable[Dict[str,bytes]]:
for name, callable in pdf_page_path.items():
doc = fitz.open("pdf",stream=io.BytesIO(callable())) # open PDF input for extraction
# Extraction
# get plain text (is in UTF-8)
for page_id,page in enumerate(doc):
text = page.get_text().encode("utf8")
yield {f'{name}_{page_id}':text}
those txts are then used in another process{f'{name}_{page_id}':txt_generator()}
'{name}_{page_id}'
has to be accessedDeepyaman Datta
03/17/2023, 5:03 PMbut I have to create another function that is really similar to the function above, except that the txt_generator is replaced by a costly operation (in time and money), so I would like to avoid to use generators that have to be evaluated each time, the elementAh, you mean if you have multiple nodes consuming thathas to be accessed'{name}_{page_id}'
PartitionedDataSet
?Olivier Ho
03/17/2023, 5:03 PMDeepyaman Datta
03/17/2023, 5:12 PMPartitionedDataSet
instead of returning an iterable, won't it save the bytes result to disk for each partition, so you don't need to worry about performing the expensive conversion? You'll just need to read from disk.
---
I was writing this, but think it's unnecessary:
If you want a pdf_extraction
node:
def identity(text_mapping: Dict[str, Callable[[], bytes]]) -> Dict[str, Callable[[], bytes]]:
return {key, lambda: func() for key, func in text_mapping}
(something like that)
so you're force writing to disk after processing, so you wouldn't do the conversion each time but rather read processed data