Guillaume Tauzin
09/10/2024, 10:25 AMIncrementalDataset
and the concatenated dataframe is saved using a versioned ParquetDataset.
⢠For node 2, I use a PartionedDataset
that is able to find all preprocessed recorded event computer so far (with load_args
withdirs
and max_depth
set accordingly)
Node 1 will also return a boolean that node 2 takes an input so that the resulting DAGS has a dependency link from node 1 to node 2. This pipelines runs fine with a sequential runner, but when I try the parallel runner I get a <http://kedro.io|kedro.io>.core.DatasetError: No partitions found
.
It seems that when running with ParalellRunner, the list of files in my partition is created before the whole pipeline is ran, thus making it impossible to find the files created by a prior node. Is it correct? Any workaround?Guillaume Tauzin
09/10/2024, 10:28 AMdef create_pipeline(**kwargs) -> Pipeline:
def get_pipeline(namespace: str):
template_pipeline = pipeline(
[
node(
concatenate_increment,
inputs="data_increment",
outputs=["concatenated_data_increment", "data_increment_concatenated"],
name="concatenate_increment",
confirms=f"{namespace}.data_increment", # This is needed as the incremental dataset is namespaced
),
node(
concatenate_partition,
inputs=[
"partitioned_concatenated_data",
"data_increment_concatenated",
],
outputs="extracted_data",
name="concatenate_partition",
),
],
)
return template_pipeline
pipelines = pipeline(
pipe=get_pipeline(namespace=SOURCES[0]),
namespace=SOURCES[0],
)
for source in SOURCES[1:]:
pipelines += pipeline(
pipe=get_pipeline(namespace=source),
namespace=source,
)
return pipelines
Guillaume Tauzin
09/10/2024, 10:29 AM"{source}.data_increment":
type: partitions.IncrementalDataset
path: data/01_raw//{source}/
dataset:
type: pandas.CSVDataset
filename_suffix: ".csv"
"{source}.data_increment_concatenated":
type: MemoryDataset
"{source}.concatenated_data_increment":
type: pandas.ParquetDataset
filepath: data/02_intermediate/{source}/concatenated_data.pq
versioned: true
"{source}.partitioned_concatenated_data":
type: partitions.PartitionedDataset
path: data/02_intermediate/{source}/concatenated_data.pq/
dataset:
type: pandas.ParquetDataset
load_args:
withdirs: true
max_depth: 1
filename_suffix: ".pq"
"{source}.extracted_data":
type: pandas.ParquetDataset
filepath: data/02_intermediate/{source}/extracted_data.pq
Deepyaman Datta
09/10/2024, 2:15 PMIt seems that when running with ParalellRunner, the list of files in my partition is created before the whole pipeline is ran, thus making it impossible to find the files created by a prior node.
_list_partitions
is cached, so anything that attempts to access it may have hit this code: https://github.com/kedro-org/kedro-plugins/blob/main/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py#L257-L264
There is an exists
check that was introduced in https://github.com/kedro-org/kedro/pull/3332 that is triggered before pipeline run, that can populate this cache.
@Merel may have some more familiarity, since she worked on #3332Deepyaman Datta
09/10/2024, 2:23 PMPartitionedDataset
and remove the cache bit, until there's a better solution)Guillaume Tauzin
09/10/2024, 2:51 PM_list_partitions
method works for me.Guillaume Tauzin
09/13/2024, 9:28 AMDeepyaman Datta
09/13/2024, 6:48 PM@Merel I would happily open an issue on GitHub if you think this is necessary!Think that would be great. š Sorry, I kind of meant to do that, but forgot; would be great if you can open it!
Guillaume Tauzin
09/14/2024, 2:22 PM