Thomas Bury
05/28/2024, 9:39 PM[create_eml_extraction_pipe_node, apply_text_extraction_pipeline_node]
, which reduces transparency compared to my initial vision. The eml_extraction_pipe
and all its nodes are not illustrated.
At this point, using a regular Python function for text extraction achieves similar results without the overhead of a full Kedro pipeline. This would eliminate some of the benefits Kedro offers:
• Modularity: Breaking down tasks into reusable nodes improves code organization and maintainability.
• Data Management: simplifying data access and versioning.
• Reproducibility: Pipelines promote clear and repeatable workflows.
I still have the impression I'm missing something 😅.
I redefined the "looping" function as (in the nodes.py
)
from kedro.runner import SequentialRunner
from kedro.io import DataCatalog, MemoryDataset
def apply_text_extraction_pipeline(partitioned_input, pipeline):
runner = SequentialRunner()
results = {}
for partition_id, load_partition in partitioned_input.items():
email_msg = load_partition()
# Create a data catalog for the current partition
catalog = DataCatalog({"email": MemoryDataset(email_msg)})
# Run the pipeline using the created catalog
result = runner.run(pipeline, catalog)
results[partition_id] = result
return results
The pipeline.py
is as:
def create_eml_extraction_pipe():
eml_extraction_pipe = pipeline(
[
get_email_body_node,
clean_html_from_strings_node,
extract_headers_node,
extract_metadata_from_email_node,
split_and_remove_greetings_node,
remove_header_node,
flatten_and_convert_to_string_node,
finalize_processed_text_node,
extract_type_email_node,
detect_email_subject_markers_node,
compile_results_node,
]
)
return eml_extraction_pipe
create_eml_extraction_pipe_node = node(
func=create_eml_extraction_pipe,
inputs=None,
outputs="eml_extraction_pipe",
name="create_eml_extraction_pipe_node",
)
apply_text_extraction_pipeline_node = node(
func=apply_text_extraction_pipeline,
inputs=["emails", "eml_extraction_pipe"],
outputs="extracted_msg_dict",
name="apply_text_extraction_pipeline_node",
)
def create_pipeline(**kwargs) -> Pipeline:
return pipeline(
[create_eml_extraction_pipe_node, apply_text_extraction_pipeline_node]
)
datajoely
05/29/2024, 8:04 AMmodular pipelines
which in the future I think will be renamed namespaced
pipelines are your friend here
https://docs.kedro.org/en/stable/nodes_and_pipelines/modular_pipelines.htmlThomas Bury
05/29/2024, 8:07 AMThomas Bury
05/29/2024, 3:24 PMJitendra Gundaniya
05/29/2024, 4:08 PMThomas Bury
05/29/2024, 4:11 PMYolan Honoré-Rougé
05/29/2024, 5:04 PMYolan Honoré-Rougé
05/29/2024, 5:05 PMYolan Honoré-Rougé
05/29/2024, 5:05 PMYolan Honoré-Rougé
05/29/2024, 5:06 PMThomas Bury
05/29/2024, 6:08 PMNok Lam Chan
05/30/2024, 12:47 PM[create_eml_extraction_pipe_node, apply_text_extraction_pipeline_node]
, which reduces transparency compared to my initial vision.
Conceptually, you want to apply a loop over something. On a node level, it will be PartitionedDataset
, on a pipeline level (that consists of multiple loop, and I guess that's what you want to visualise in kedro viz), it will be namespace pipeline.
I have an example of a time series forecast pipeline with namespaceNok Lam Chan
05/30/2024, 12:51 PMdef do_something(partitions):
for i in range(partitions):
run_something()
do_something()
namespace pipeline (apply the loop outside)
for i in range(partitions):
run_something(i)
datajoely
05/30/2024, 12:52 PMNok Lam Chan
05/30/2024, 12:54 PMdatajoely
05/30/2024, 12:56 PMDeepyaman Datta
05/31/2024, 5:59 PMdef create_pipeline(**kwargs) -> Pipeline:
return pipeline([
node(generate_emails, "params:n", "emails"),
pipeline([
node(capitalize_content, "emails", "capitalized_emails"),
])
])
This will essentially unwrap the nodes and include them in the bigger pipeline, not really treat it as a subpipeline. IF you want to make a pipeline not intended for partitioned data work on partitioned data for that subpart, you can write some utility that converts that pipeline into something that works on partitions.Thomas Bury
06/05/2024, 9:38 AMThomas Bury
06/05/2024, 12:14 PM0001.csv --> apply_pipeline --> 0001.json
0002.csv --> apply_pipeline --> 0002.json
...
9999.csv --> apply_pipeline --> 9999.json
So far, the closest "kedronic" solution is the namespace, but isn't perfect because it duplicates the pipeline N times, especially for the data catalog and the Kedro Viz.
That would be a very useful feature to consolidate in kedro 🙂datajoely
06/05/2024, 12:15 PMdatajoely
06/05/2024, 12:15 PMdatajoely
06/05/2024, 12:16 PMtrain evaluation
namespace on demo.kedro.orgThomas Bury
06/05/2024, 12:40 PM_preprocess_partion
by a proper Kedro pipeline that kedro will register and handle as such, without unpacking the nodes.
• Two entries in the data catalog partition_in
and partition_out
• A single pipeline, containing the steps to handle only one file
• Looping/parallelization, applying N times the same pipeline to the N (>>1, up to thousands files) of the partition
As I understand, currently the workarounds are:
• not use a kedro pipeline for handling the file but rather wrap all the steps into a function/class
• run the pipeline "as a function" as I illustrated above
• add an external mechanism to trigger the kedro pipeline (handling a single file) N times, using an API or queuing system, but there is an overhead price to pay.
• use a plugin, as kedro-boot, also an "external" mechanismThomas Bury
06/05/2024, 12:41 PMDeepyaman Datta
06/05/2024, 1:25 PMnode1_for_eml1, node2_for_eml1, node3_for_eml1, node1_for_eml2, node2_for_eml2, node3_for_eml3, etc.
But, what have shared is forcing:
node1_for_eml1, node1_for_eml2, ..., node1_for_emlN, node2_for_eml1, node2_for_eml2, ..., node2_for_emlN, etc.
Or are there other issues?Thomas Bury
06/05/2024, 1:41 PMcatalog.yml
emails:
type: partitions.PartitionedDataset
path: data/01_raw/eml
dataset:
type: email.EmailMessageDataset
filename_suffix: ".eml"
# extracted_msg_dict:
extracted_msg_dict:
type: partitions.PartitionedDataset
path: data/02_intermediate/eml_text_extraction
dataset:
type: json.JSONDataset
filename_suffix: ".json"
On each EML file I need to apply a serie of functions
eml_extraction_pipe = pipeline(
[
get_email_body_node,
clean_html_from_strings_node,
extract_headers_node,
extract_metadata_from_email_node,
split_and_remove_greetings_node,
remove_header_node,
flatten_and_convert_to_string_node,
finalize_processed_text_node,
extract_type_email_node,
detect_email_subject_markers_node,
compile_results_node,
]
)
I have thousand of EMLs (pseudo-code, if a pipeline was callable, is 0001.json = eml_extraction_pipe(0001.eml)
)
The only way so for is either to run the kedro pipeline "as a function":
def apply_text_extraction_pipeline(partitioned_input, pipeline):
runner = SequentialRunner()
results = {}
for partition_id, load_partition in partitioned_input.items():
email_msg = load_partition()
# Create a data catalog for the current partition
catalog = DataCatalog({"email": MemoryDataset(email_msg)})
# Run the pipeline using the created catalog
result = runner.run(pipeline, catalog)
results[partition_id] = result
return results
Or instead of using a pipeline, wrap the steps into a python function and apply it to all EML for creating the output partition as in: https://github.com/kedro-org/kedro/issues/1413Deepyaman Datta
06/05/2024, 2:21 PMget_email_body_node
, clean_html_from_strings_node
, etc. each just take a PartitionedDataset
and return a PartitionedDataset
with the same keys?Thomas Bury
06/05/2024, 2:41 PMDeepyaman Datta
06/05/2024, 2:42 PMThomas Bury
06/05/2024, 2:44 PMThomas Bury
06/07/2024, 8:05 AMINFO Completed 12 out of 12 tasks
.
.
.
INFO Completed 1 out of 12 tasks
INFO Completed 2 out of 12 tasks