Hello Kedro team! Just a quick question: I have a ...
# questions
g
Hello Kedro team! Just a quick question: I have a namespace-based incremental dataset and wish to use the
confirms
attribute to trigger CHECKPOINT update further down my pipeline. When I run the pipeline, I get:
Copy code
Dataset 'my_dataset' does not have 'data_increments' method
Without, specifying
confirms
at all OR without using namespaces at all, pipeline runs successfully. Is this intended behavior or did I miss something? Thanks 🙂
r
could u please share the code snippet on how you are using
confirms
attribute?
g
Thanks @Rashida Kanchwala, this is how the pipeline code looks like:
Copy code
from kedro.pipeline import Pipeline, node, pipeline


from .nodes import (
    concatenate_increments,
    concatenate_partitions,
)


def create_pipeline(**kwargs) -> Pipeline:
    template_pipeline = pipeline(
        [
            node(
                concatenate_increments,
                inputs="data_increments",
                outputs="concatenated_data_increments",
                name="concatenate_increments",
            ),
            node(
                concatenate_partitions,
                inputs="partitioned_concatenated_data",
                outputs="extracted_datas",
                name="concatenate_partitions",
                confirms="data_increments",
            ),
        ]
    )

    n = 3
    pipelines = pipeline(
        pipe=template_pipeline,
        namespace="1",
    )
    for id in range(2, n+1):
        pipelines += pipeline(
            pipe=template_pipeline,
            namespace=str(id),
        )

    return pipelines
Actually, on this minimal example, the error I get is different:
Copy code
kedro.pipeline.pipeline.ConfirmNotUniqueError: ['data_increments'] datasets are confirmed by more than one node. Node confirms must be unique.
n
A related topic: https://kedro-org.slack.com/archives/C03RKP2LW64/p1720774795664329 AFAIK
confirm
is not "namespaced", but it's also rare you need both
namespace
and Incremental dataset
g
Thanks @Nok Lam Chan. I'll have a deeper look. Seems like I have the exact same use case.
I think I should give you more detail about my use case as it seems to me it is most practical to have namespaced
confirms
. I'll come up with a more detailed question.
I have many devices that regularly record event files and push it to a S3 bucket. I would like to run a preprocessing pipeline that is different for each device and that would for each device: 1. Load all new files as dataframes and preprocess them and concatenate the preprocessed recorded event and save the results to another S3 bucket 2. Load all preprocessed recorded files computed so far and concatenate them Then , I use the concatenation of all recorded preprocessed event seen so far for data science purposes. The way I achieve this with Kedro is: • For step 1, I use IncrementalDataset and the concatenated dataframe is saved using a versioned ParquetDataset • For step 2, I use a PartionedDataset that is able to find all preprocessed recorded event computer so far (with load_args withdirs and max_depth set accordingly) • To be able to use ParallelRunner, I create a placeholder output of step 1 that I pass as an input of step 2. Those steps are done for each device, so I use namespace to reuse the same logic for all of them varying the S3 bucket path. I need the confirms to be at step 2 because only then I can consider new files to have been processed. Does it make sense to you @Nok Lam Chan and @Rashida Kanchwala? I am fairly new to kedro and I have the feeling my approach is a bit hacky.
n
Step 1&2 make sense to me, but I not sure why Parallel Runner is related to this topic?
g
Thanks @Nok Lam Chan, it's not related. I just wanted to stress that as step 1 and step 2 are not directly linked, I have to add a fake output to step 1 / input to step 2. That, along with my use of versioning which does not seem to be the intended one, makes me feel my kedro pipeline is quite hacky.
n
As for the confirm not unique problem, can you try putting the namespace in the argument? For example confirms=namespace.data I think this all make sense but need a bit of time to think about it. If you can, please open an github issue so the team can discuss. Meanwhile please test if this can be a temporary workaround
g
I confirm your workaround is working. Thank you! I'll open the github issue first thing tomorrow when I have access to my computer.
K 1