Thomas Bury
05/25/2024, 8:19 PMfrom kedro.pipeline import Pipeline, node
from .nodes import load_data, clean_data, transform_data
def create_eml_processing_pipeline(**kwargs):
return Pipeline(
[
node(clean_data_step1, "loaded_data", "cleaned_data_step1"),
node(clean_data_step2, "cleaned_data_step1", "cleaned_data_step2"),
]
)
from kedro.pipeline import Pipeline, node, pipeline
from .pipeline_eml_processing import create_eml_processing_pipeline
def create_pipeline(**kwargs):
eml_processing_pipeline = create_eml_processing_pipeline()
return Pipeline(
[
node(
func=lambda partition: partition,
inputs="input_data@partition",
outputs="partition_data",
name="load_partition_node"
),
pipeline(
eml_processing_pipeline,
inputs="partition_data",
outputs="processed_partition_data",
namespace="eml_processing"
),
node(
func=lambda partition: partition,
inputs="processed_partition_data",
outputs="processed_data@partition",
name="save_partition_node"
),
]
)
Jitendra Gundaniya
05/27/2024, 8:16 AM@partition
part)? https://docs.kedro.org/en/stable/data/data_catalog_yaml_examples.html#read-the-same-file-using-two-different-datasets
For more details, refer to the Kedro documentation on data catalog, pipelines, and partitioned datasets:
https://docs.kedro.org/en/stable/data/data_catalog.html#partitioneddataset
https://docs.kedro.org/en/stable/nodes_and_pipelines/pipeline_introduction.htmlJuan Luis
05/27/2024, 8:26 AMJitendra Gundaniya
05/27/2024, 9:49 AMThomas Bury
05/27/2024, 10:02 AM