Reuben Zotz-Wilson
07/12/2024, 8:59 AM"""
This is a boilerplate pipeline 'data_processing_pipeline'
generated using Kedro 0.19.1
"""
from kedro.pipeline import Pipeline, pipeline, node
from project.utilities.file_handler import data_loader
from project.utilities.data_modifier import modify_data_list
from project.utilities.model_loader import get_model
from .data_processing.module_a import process_data_step1
from .data_processing.module_b import process_data_step2
from pathlib import Path
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
def create_pipeline(**kwargs) -> Pipeline:
pipeline_collect_data_template = lambda month: pipeline([
node(
func=data_loader,
inputs="data_inputs",
outputs="output_data"
)
])
pipeline_extract_features_template = lambda month: pipeline([
node(
func=get_model,
inputs=["params:model_package_arn",
"params:profile_name",
"params:region_name",
"params:local_model_path",
"params:unpack_dir"],
outputs="unpack_dir"),
node(func=process_data_step1,
inputs=["input_files","unpack_dir"],
outputs="processed_results"
)
])
pipeline_process_data = lambda month: pipeline([
node(
func=get_model,
inputs=["params:model_package_arn",
"params:profile_name",
"params:region_name",
"params:local_model_path",
"params:unpack_dir"],
outputs="unpack_dir"),
node(func=process_data_step2,
inputs=["unpack_dir", "input_files", "params:model_type", "params:label_condition"],
outputs="final_results"
)
])
start_date = datetime(2019, 2, 1)
end_date = datetime(2019, 5, 1)
current_date = start_date
full_pipeline = Pipeline([])
while current_date <= end_date:
year_month = current_date.strftime("%Y.%m")
logger.info(f"Processing data for {year_month}")
collect_month = pipeline(
pipe=pipeline_collect_data_template(year_month),
inputs={"data_inputs": f"{year_month}#data_source"},
outputs={"output_data": f"{year_month}#local_data"},
# confirms={"output_data": f"{year_month}#local_data"},
namespace=f"data_collector_{year_month}"
)
process_months_step1 = pipeline(
pipe=pipeline_extract_features_template(year_month),
inputs={"input_files": f"{year_month}#local_data_files"},
outputs={"processed_results": f"step1.{year_month}#processed_data"},
namespace="data_processor_step1"
)
process_months_step2 = pipeline(
pipe=pipeline_process_data(year_month),
inputs={"input_files": f"step1.{year_month}#processed_data"},
outputs={"final_results": f"step2.{year_month}#final_data"},
namespace="data_processor_step2"
)
full_pipeline += collect_month + process_months_step1 + process_months_step2
current_date += timedelta(days=31)
current_date = current_date.replace(day=1)
return full_pipeline
Nok Lam Chan
07/16/2024, 1:22 PMIncrementalDataset
?
In short how can i add confrims to this pipeline such that "collect_month" will take an input Incremental dataset.What do you want to do?
Reuben Zotz-Wilson
07/16/2024, 8:34 PMfrom kedro.pipeline import node
# process and then confirm `IncrementalDataset` within the same node
node(
process_partitions,
inputs="my_partitioned_dataset",
outputs="my_processed_dataset",
confirms="my_partitioned_dataset",
)
How can i do this inside the modular pipeline :
collect_month = pipeline(
pipe=pipeline_collect_data_template(year_month),
inputs={"data_inputs": f"{year_month}#data_source"},
outputs={"output_data": f"{year_month}#local_data"},
# confirms={"output_data": f"{year_month}#local_data"},
namespace=f"data_collector_{year_month}"
)
Nok Lam Chan
07/16/2024, 9:55 PMconfirm
argument in the modular pipeline? What happens when you uncomment it?Nok Lam Chan
07/16/2024, 9:56 PMDeepyaman Datta
07/17/2024, 3:33 PMyear_month
? The way an incremental dataset works is, you have a single pipeline, and then you run it periodically, and checkpoint the dataset to indicate where you've run up to. You shouldn't need to maintain a year_month
like this.Deepyaman Datta
07/17/2024, 3:34 PMIs there a specific reason you comment out the@Nok Lam Chanargument in the modular pipeline? What happens when you uncomment it?confirm
pipeline
doesn't support any mapping behavior for confirms
I could see this potentially being useful to support (if you need to confirm a dataset that gets namespaced), but I don't see this as being the issue hereNok Lam Chan
07/17/2024, 3:39 PMinputs
and outputs
currentlyReuben Zotz-Wilson
07/18/2024, 8:17 AMpartitions.PartitionedDataset
as inputs to some of the modular pipelines (i.e. input_files). My intention was to use partitions.IncrementalDataset
since at least the collect_month pipeline is very time consuming since it collects audio files from s3 and uses a custom dataset class to load and resample them on the fly. I want to avoid having to do this when new files arrive in that location.
As @Nok Lam Chan mentions the issue is that the confirms is not namespaces and so we get the error that it is not unique when using it in the modular pipeline.Reuben Zotz-Wilson
07/18/2024, 9:28 AMcollect_month = pipeline(
pipe=pipeline_collect_data_template(year_month),
inputs={"data_inputs": f"{year_month}#data_source"},
outputs={"output_data": f"{year_month}#local_data"},
# confirms={"output_data": f"{year_month}#local_data"},
namespace=f"data_collector_{year_month}"
)
Nok Lam Chan
07/18/2024, 10:45 AMnamespace
and IncrementalDataset
to achieve this. Maybe you are using namespace
for a different reason?
adding confirm
support make sense to me for namespace, would you mind opening an issue on https://github.com/kedro-org/kedro ?Reuben Zotz-Wilson
07/18/2024, 10:55 AMDeepyaman Datta
07/18/2024, 11:51 AMyear_month
Deepyaman Datta
07/18/2024, 11:52 AMlocal_data
is an IncrementalDataset
, I would expect it to have partitions for all the months, and your confirm would be used to mark the month up until things have been processed.Reuben Zotz-Wilson
07/18/2024, 12:00 PMlocal_data
points to partitions of files within a single month.
"{year}.{month}#collect_local":
type: partitions.PartitionedDataset
path: data/01_raw/collect_raw/{year}/{month}/
dataset:
type: asr_pipe.datasets.wav_dataset.AudioDatasetVersion
versioned: false
save_args:
sampling_rate: 16000
skip_if_exists: true
filename_suffix: ".wav"
My intention was to use IncrementalDataset
to be able to run the pipeline multiple times in a "month" only processing new files.
Note: the only reason i have split the datafactory like this is for a more efficient development cycle. Once i get partitioned datasets working i will change the datafactory.
I hope i have not misunderstood something fundamental here. ;-)Nok Lam Chan
07/18/2024, 12:16 PMNok Lam Chan
07/25/2024, 12:01 PM