Guillaume Tauzin
09/10/2024, 10:25 AMIncrementalDataset and the concatenated dataframe is saved using a versioned ParquetDataset.
⢠For node 2, I use a PartionedDataset that is able to find all preprocessed recorded event computer so far (with load_args withdirs and max_depth set accordingly)
Node 1 will also return a boolean that node 2 takes an input so that the resulting DAGS has a dependency link from node 1 to node 2. This pipelines runs fine with a sequential runner, but when I try the parallel runner I get a <http://kedro.io|kedro.io>.core.DatasetError: No partitions found.
It seems that when running with ParalellRunner, the list of files in my partition is created before the whole pipeline is ran, thus making it impossible to find the files created by a prior node. Is it correct? Any workaround?Guillaume Tauzin
09/10/2024, 10:28 AMdef create_pipeline(**kwargs) -> Pipeline:
def get_pipeline(namespace: str):
template_pipeline = pipeline(
[
node(
concatenate_increment,
inputs="data_increment",
outputs=["concatenated_data_increment", "data_increment_concatenated"],
name="concatenate_increment",
confirms=f"{namespace}.data_increment", # This is needed as the incremental dataset is namespaced
),
node(
concatenate_partition,
inputs=[
"partitioned_concatenated_data",
"data_increment_concatenated",
],
outputs="extracted_data",
name="concatenate_partition",
),
],
)
return template_pipeline
pipelines = pipeline(
pipe=get_pipeline(namespace=SOURCES[0]),
namespace=SOURCES[0],
)
for source in SOURCES[1:]:
pipelines += pipeline(
pipe=get_pipeline(namespace=source),
namespace=source,
)
return pipelinesGuillaume Tauzin
09/10/2024, 10:29 AM"{source}.data_increment":
type: partitions.IncrementalDataset
path: data/01_raw//{source}/
dataset:
type: pandas.CSVDataset
filename_suffix: ".csv"
"{source}.data_increment_concatenated":
type: MemoryDataset
"{source}.concatenated_data_increment":
type: pandas.ParquetDataset
filepath: data/02_intermediate/{source}/concatenated_data.pq
versioned: true
"{source}.partitioned_concatenated_data":
type: partitions.PartitionedDataset
path: data/02_intermediate/{source}/concatenated_data.pq/
dataset:
type: pandas.ParquetDataset
load_args:
withdirs: true
max_depth: 1
filename_suffix: ".pq"
"{source}.extracted_data":
type: pandas.ParquetDataset
filepath: data/02_intermediate/{source}/extracted_data.pqDeepyaman Datta
09/10/2024, 2:15 PMIt seems that when running with ParalellRunner, the list of files in my partition is created before the whole pipeline is ran, thus making it impossible to find the files created by a prior node.
_list_partitions is cached, so anything that attempts to access it may have hit this code: https://github.com/kedro-org/kedro-plugins/blob/main/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py#L257-L264
There is an exists check that was introduced in https://github.com/kedro-org/kedro/pull/3332 that is triggered before pipeline run, that can populate this cache.
@Merel may have some more familiarity, since she worked on #3332Deepyaman Datta
09/10/2024, 2:23 PMPartitionedDataset and remove the cache bit, until there's a better solution)Guillaume Tauzin
09/10/2024, 2:51 PM_list_partitions method works for me.Guillaume Tauzin
09/13/2024, 9:28 AMDeepyaman Datta
09/13/2024, 6:48 PM@Merel I would happily open an issue on GitHub if you think this is necessary!Think that would be great. š Sorry, I kind of meant to do that, but forgot; would be great if you can open it!
Guillaume Tauzin
09/14/2024, 2:22 PM