Hi team, I have a question about incremental data...
# questions
r
Hi team, I have a question about incremental datasets. In short how can i add confrims to this pipeline such that "collect_month" will take an input Incremental dataset. Currently this works fine for Partitioned Datasets but due to the long processing time (collecting data from s3) I want to add some incremental structure here. Thanks for all the awesome work on Kedro !
Copy code
"""
This is a boilerplate pipeline 'data_processing_pipeline'
generated using Kedro 0.19.1
"""

from kedro.pipeline import Pipeline, pipeline, node

from project.utilities.file_handler import data_loader
from project.utilities.data_modifier import modify_data_list
from project.utilities.model_loader import get_model

from .data_processing.module_a import process_data_step1
from .data_processing.module_b import process_data_step2
from pathlib import Path

from datetime import datetime, timedelta
import logging

logger = logging.getLogger(__name__)

def create_pipeline(**kwargs) -> Pipeline:
    pipeline_collect_data_template = lambda month: pipeline([
        node(
            func=data_loader,
            inputs="data_inputs",
            outputs="output_data"
        )
    ])

    pipeline_extract_features_template = lambda month: pipeline([
        node(
            func=get_model,
            inputs=["params:model_package_arn",
                    "params:profile_name",
                    "params:region_name",
                    "params:local_model_path",
                    "params:unpack_dir"],
            outputs="unpack_dir"),
        node(func=process_data_step1,
             inputs=["input_files","unpack_dir"],
             outputs="processed_results"
        )
    ])

    pipeline_process_data = lambda month: pipeline([
        node(
            func=get_model,
            inputs=["params:model_package_arn",
                    "params:profile_name",
                    "params:region_name",
                    "params:local_model_path",
                    "params:unpack_dir"],
            outputs="unpack_dir"),
        node(func=process_data_step2,
             inputs=["unpack_dir", "input_files", "params:model_type", "params:label_condition"],
             outputs="final_results"
        )
    ])

    start_date = datetime(2019, 2, 1)
    end_date = datetime(2019, 5, 1)
    current_date = start_date

    full_pipeline = Pipeline([])

    while current_date <= end_date:
        year_month = current_date.strftime("%Y.%m")
        logger.info(f"Processing data for {year_month}")
        collect_month = pipeline(
            pipe=pipeline_collect_data_template(year_month),
            inputs={"data_inputs": f"{year_month}#data_source"},
            outputs={"output_data": f"{year_month}#local_data"},
            # confirms={"output_data": f"{year_month}#local_data"},
            namespace=f"data_collector_{year_month}"
        )

        process_months_step1 = pipeline(
            pipe=pipeline_extract_features_template(year_month),
            inputs={"input_files": f"{year_month}#local_data_files"},
            outputs={"processed_results": f"step1.{year_month}#processed_data"},
            namespace="data_processor_step1"
        )

        process_months_step2 = pipeline(
            pipe=pipeline_process_data(year_month),
            inputs={"input_files": f"step1.{year_month}#processed_data"},
            outputs={"final_results": f"step2.{year_month}#final_data"},
            namespace="data_processor_step2"
        )

        full_pipeline += collect_month + process_months_step1 + process_months_step2
        current_date += timedelta(days=31)
        current_date = current_date.replace(day=1)

    return full_pipeline
n
Hey! Thanks for the question and sorry for the late reply. The question is a big long can you help me to understand the issue better: • Have you consider to use
IncrementalDataset
?
In short how can i add confrims to this pipeline such that "collect_month" will take an input Incremental dataset.
What do you want to do?
r
Hey Nok, Thanks for coming back to me here and sorry for the long code snippet and super short question. Basically i wanted to use IncrementalDataset in place of my current Partitioned dataset such that each time i run the pipeline i only process the files which are "new". As i understood the documentation we need to define with the confrims paramater to explicitly tell kedro to perform the checkpoint update ?
Copy code
from kedro.pipeline import node

# process and then confirm `IncrementalDataset` within the same node
node(
    process_partitions,
    inputs="my_partitioned_dataset",
    outputs="my_processed_dataset",
    confirms="my_partitioned_dataset",
)
How can i do this inside the modular pipeline :
Copy code
collect_month = pipeline(
            pipe=pipeline_collect_data_template(year_month),
            inputs={"data_inputs": f"{year_month}#data_source"},
            outputs={"output_data": f"{year_month}#local_data"},
            # confirms={"output_data": f"{year_month}#local_data"},
            namespace=f"data_collector_{year_month}"
        )
n
Thanks for the clarification, I think I need sometime to check on this, I'll try to get back tomorrow. Is there a specific reason you comment out the
confirm
argument in the modular pipeline? What happens when you uncomment it?
Cc @Deepyaman Datta if you know anything about this since you are more experienced with partitionedDataset in general.
d
@Reuben Zotz-Wilson Why are you creating pipelines in a loop, for the
year_month
? The way an incremental dataset works is, you have a single pipeline, and then you run it periodically, and checkpoint the dataset to indicate where you've run up to. You shouldn't need to maintain a
year_month
like this.
Is there a specific reason you comment out the
confirm
argument in the modular pipeline? What happens when you uncomment it?
@Nok Lam Chan
pipeline
doesn't support any mapping behavior for
confirms
I could see this potentially being useful to support (if you need to confirm a dataset that gets namespaced), but I don't see this as being the issue here
šŸ‘€ 1
n
ah I see, namespace only affect
inputs
and
outputs
currently
r
@Deepyaman Datta Thanks for the question and sorry for not being more clear in the original post. I am creating pipelines in the loop since i have a very large set of expensive processing to perform and I want to ensure that intermittent results gets stored with the appropriate datafactory. I understand that i should be able to remove the dynamic creation of separate pipelines from most of the steps here, but that i don't believe is related to my attempt to use IncrementalDataset type. I am using
partitions.PartitionedDataset
as inputs to some of the modular pipelines (i.e. input_files). My intention was to use
partitions.IncrementalDataset
since at least the collect_month pipeline is very time consuming since it collects audio files from s3 and uses a custom dataset class to load and resample them on the fly. I want to avoid having to do this when new files arrive in that location. As @Nok Lam Chan mentions the issue is that the confirms is not namespaces and so we get the error that it is not unique when using it in the modular pipeline.
Perhaps it would make sense to be able to use the mapping to allow this ?
Copy code
collect_month = pipeline(
            pipe=pipeline_collect_data_template(year_month),
            inputs={"data_inputs": f"{year_month}#data_source"},
            outputs={"output_data": f"{year_month}#local_data"},
            # confirms={"output_data": f"{year_month}#local_data"},
            namespace=f"data_collector_{year_month}"
        )
n
I think what @Deepyaman Datta mean is that, to have incremental pipeline, you don't need both
namespace
and
IncrementalDataset
to achieve this. Maybe you are using
namespace
for a different reason? adding
confirm
support make sense to me for namespace, would you mind opening an issue on https://github.com/kedro-org/kedro ?
r
I am using namespace here to be able to separate param. when calling a pipeline (i.e. pipeline_process_data) multiple times but with different inputs (I.e. ["params:model_package_arn", "params:profile_name", "params:region_name", "params:local_model_path", "params:unpack_dir"],) In hindsight i should not have simplified my pipeline before posting here as the example only used the pipeline_process_data once. To be able to achieve this as i understand things we need namespaces. I then wanted to make use of IncrementalDataset type such that each time i run the pipeline i only process new data (i.e. within current month). Note: as pointed out by @Deepyaman Datta it is not required to iteratively create pipelines (this was a design pattern error i have since removed), I will post an issue to track this request.
d
I'm still confused by the fact that you're trying to confirm using
year_month
If your
local_data
is an
IncrementalDataset
, I would expect it to have partitions for all the months, and your confirm would be used to mark the month up until things have been processed.
r
Perhaps i misunderstood how IntrementalDatasets work. The
local_data
points to partitions of files within a single month.
Copy code
"{year}.{month}#collect_local":
  type: partitions.PartitionedDataset
  path: data/01_raw/collect_raw/{year}/{month}/
  dataset: 
    type: asr_pipe.datasets.wav_dataset.AudioDatasetVersion
    versioned: false
    save_args:
      sampling_rate: 16000
      skip_if_exists: true
  filename_suffix: ".wav"
My intention was to use
IncrementalDataset
to be able to run the pipeline multiple times in a "month" only processing new files. Note: the only reason i have split the datafactory like this is for a more efficient development cycle. Once i get partitioned datasets working i will change the datafactory. I hope i have not misunderstood something fundamental here. ;-)
n

https://youtu.be/v7JSSiYgqpg?si=_8u52Z-LypiGnEqNā–¾

I'm in a rush but let see if this clear it up.
šŸ‘ 1
It's old but the concept haven't been changed, factory wasn't introduced at that time
šŸ‘ 1