Hello, I'm revisiting my Kedro pipeline question. ...
# questions
t
Hello, I'm revisiting my Kedro pipeline question. Currently, I have a pipeline
text_extraction_pipeline
that extracts text from a single EML file and returns a JSON output. My challenge is to adapt this pipeline to process a collection of EML files and generate a corresponding collection of JSON outputs. In pure python, I'd just loop over the EML files, what the kedro equivalent? Thanks
j
Hey, To process a collection of EML files with your
text_extraction_pipeline
in Kedro, use
PartitionedDataSet
Docs: https://docs.kedro.org/en/stable/data/data_catalog.html#partitioneddataset Let me know if that is what they’re looking for.
t
Yes but how? I just don't get how to apply the pipeline for a single EML file. Create a new node encapsulating the pipeline for a single EML file? # Iterate over each partition and update the set of common columns for partition_id, load_partition in partitioned_input.items(): msg = load_partition() # Load partition each EML file But from there, I'm not sure where to go. Would the following work? # Iterate over each partition and update the set of common columns for partition_id, load_partition in partitioned_input.items(): msg = load_partition() # Load partition each EML file text = text_extraction_pipeline(msg) But how to save each output (here text) as a collection of JSON? The example in the doc concatenate the partition into a single file, which is not what I want to do. Ideally, I want all the steps for a single EML encapsulated into a pipeline:
msg -> pipeline -> json
and then apply it to the collection of EMLs. Thanks
d
@Thomas Bury do we actually have a user for the Email dataset!!!!! https://docs.kedro.org/en/0.18.14/kedro.extras.datasets.email.EmailMessageDataSet.html
😄 1
just wrap it under a
PartitionedDataset
like @Jitendra Gundaniya says
t
I know there is a eml dataset. since it doesn't seem to clear enough. What I have, for a single eml: The catalog.yml
Copy code
email:
  type: email.EmailMessageDataset
  filepath: data/01_raw/test_email.eml

extracted_msg_dict:
  type: json.JSONDataset
  filepath: data/02_intermediate/extracted_msg_dict.json
The pipeline.py, returning the JSON for the corresponding eml file
Copy code
from kedro.pipeline import Pipeline, pipeline, node

from .nodes import (
    get_email_body,
    clean_html_from_strings,
    extract_headers,
    extract_metadata_from_email,
    split_and_remove_greetings,
    remove_header,
    flatten_and_convert_to_string,
    finalize_processed_text,
    extract_type_email,
    detect_email_subject_markers,
    compile_results,
)

# all the nodes defined here, not shown

def create_pipeline(**kwargs) -> Pipeline:
    return pipeline(
        [
            get_email_body_node,
            clean_html_from_strings_node,
            extract_headers_node,
            extract_metadata_from_email_node,
            split_and_remove_greetings_node,
            remove_header_node,
            flatten_and_convert_to_string_node,
            finalize_processed_text_node,
            extract_type_email_node,
            detect_email_subject_markers_node,
            compile_results_node,
        ]
    )
What I would like is to applied the above pipeline to each eml in
01_raw
and there is where I struggle. For the partition, the
catalog.yml
Copy code
emails:
  type: partitions.PartitionedDataset
  path: data/01_raw/ 
  dataset: 
    type: email.EmailMessageDataset
  filename_suffix: ".eml"

extraction_dicts:
  type: partitions.PartitionedDataset
  path: data/02_intermediate/ 
  dataset: 
    type: json.JSONDataset
But what's next? If I want to apply the same pipeline to each eml of the partition, I need to provide each time a different namespace as in the cook food example in the doc? Which doesn't seem very convenient (especially for kedro-viz) Ofc, I could use a regular function rather than a
kedro pipeline
but I'd like to have each step explicitly visible for more transparency and modularity. Thanks
d