Hello again, I have a pipeline with two nodes tha...
# questions
g
Hello again, I have a pipeline with two nodes that is applied by source buckets using namespaces. I would like to have it run in parallel (one process per source). • Node 1. Looks up a source bucket for new files concatenating all of them together; • Node 2. looks up all new files concatenations created so far for the source and concatenating them together to get all data collected so far for this source. The way I achieve this with Kedro is: • For node 1, I use
IncrementalDataset
and the concatenated dataframe is saved using a versioned ParquetDataset. • For node 2, I use a
PartionedDataset
that is able to find all preprocessed recorded event computer so far (with
load_args
withdirs
and
max_depth
set accordingly) Node 1 will also return a boolean that node 2 takes an input so that the resulting DAGS has a dependency link from node 1 to node 2. This pipelines runs fine with a sequential runner, but when I try the parallel runner I get a
<http://kedro.io|kedro.io>.core.DatasetError: No partitions found
. It seems that when running with ParalellRunner, the list of files in my partition is created before the whole pipeline is ran, thus making it impossible to find the files created by a prior node. Is it correct? Any workaround?
Here is what the pipeline code looks like:
Copy code
def create_pipeline(**kwargs) -> Pipeline:
    def get_pipeline(namespace: str):
        template_pipeline = pipeline(
            [
                node(
                    concatenate_increment,
                    inputs="data_increment",
                    outputs=["concatenated_data_increment", "data_increment_concatenated"],
                    name="concatenate_increment",
                    confirms=f"{namespace}.data_increment", # This is needed as the incremental dataset is namespaced
                ),
                node(
                    concatenate_partition,
                    inputs=[
                        "partitioned_concatenated_data",
                        "data_increment_concatenated",
                    ],
                    outputs="extracted_data",
                    name="concatenate_partition",
                ),
            ],
        )

        return template_pipeline
 
    pipelines = pipeline(
        pipe=get_pipeline(namespace=SOURCES[0]),
        namespace=SOURCES[0],
    )
    for source in SOURCES[1:]:
        pipelines += pipeline(
            pipe=get_pipeline(namespace=source),
            namespace=source,
        )

    return pipelines
And the catalog:
Copy code
"{source}.data_increment":
  type: partitions.IncrementalDataset
  path: data/01_raw//{source}/
  dataset:
    type: pandas.CSVDataset
  filename_suffix: ".csv"

"{source}.data_increment_concatenated":
  type: MemoryDataset

"{source}.concatenated_data_increment":
  type: pandas.ParquetDataset
  filepath: data/02_intermediate/{source}/concatenated_data.pq
  versioned: true

"{source}.partitioned_concatenated_data":
  type: partitions.PartitionedDataset
  path: data/02_intermediate/{source}/concatenated_data.pq/
  dataset:
    type: pandas.ParquetDataset
  load_args:
    withdirs: true
    max_depth: 1
  filename_suffix: ".pq"

"{source}.extracted_data":
  type: pandas.ParquetDataset
  filepath: data/02_intermediate/{source}/extracted_data.pq
d
It seems that when running with ParalellRunner, the list of files in my partition is created before the whole pipeline is ran, thus making it impossible to find the files created by a prior node.
Can you confirm this is happening, perhaps by logging a message when the partitions are being found? Nevermind, I see it This seems quite possible, though, as
_list_partitions
is cached, so anything that attempts to access it may have hit this code: https://github.com/kedro-org/kedro-plugins/blob/main/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py#L257-L264 There is an
exists
check that was introduced in https://github.com/kedro-org/kedro/pull/3332 that is triggered before pipeline run, that can populate this cache. @Merel may have some more familiarity, since she worked on #3332
šŸ‘ 1
(BTW you can temporarily/dumbly work around this by just creating a copy of the
PartitionedDataset
and remove the cache bit, until there's a better solution)
🄳 1
g
Thanks a lot @Deepyaman Datta. I can confirm that the workaround of removing the cachedmethod decorator on the
_list_partitions
method works for me.
šŸ™Œ 1
@Merel I would happily open an issue on GitHub if you think this is necessary!
d
@Merel I would happily open an issue on GitHub if you think this is necessary!
Think that would be great. šŸ™‚ Sorry, I kind of meant to do that, but forgot; would be great if you can open it!
g