I've made progress on the Kedro pipeline! It itera...
# questions
t
I've made progress on the Kedro pipeline! It iterates through the EML files and generates the corresponding JSON partition outputs. However, Kedro Viz currently only shows two nodes
[create_eml_extraction_pipe_node, apply_text_extraction_pipeline_node]
, which reduces transparency compared to my initial vision. The
eml_extraction_pipe
and all its nodes are not illustrated. At this point, using a regular Python function for text extraction achieves similar results without the overhead of a full Kedro pipeline. This would eliminate some of the benefits Kedro offers: • Modularity: Breaking down tasks into reusable nodes improves code organization and maintainability. • Data Management: simplifying data access and versioning. • Reproducibility: Pipelines promote clear and repeatable workflows. I still have the impression I'm missing something 😅. I redefined the "looping" function as (in the
nodes.py
)
Copy code
from kedro.runner import SequentialRunner
from kedro.io import DataCatalog, MemoryDataset


def apply_text_extraction_pipeline(partitioned_input, pipeline):
    runner = SequentialRunner()
    results = {}
    for partition_id, load_partition in partitioned_input.items():
        email_msg = load_partition()
        # Create a data catalog for the current partition
        catalog = DataCatalog({"email": MemoryDataset(email_msg)})
        # Run the pipeline using the created catalog
        result = runner.run(pipeline, catalog)
        results[partition_id] = result
    return results
The
pipeline.py
is as:
Copy code
def create_eml_extraction_pipe():

    eml_extraction_pipe = pipeline(
        [
            get_email_body_node,
            clean_html_from_strings_node,
            extract_headers_node,
            extract_metadata_from_email_node,
            split_and_remove_greetings_node,
            remove_header_node,
            flatten_and_convert_to_string_node,
            finalize_processed_text_node,
            extract_type_email_node,
            detect_email_subject_markers_node,
            compile_results_node,
        ]
    )
    return eml_extraction_pipe


create_eml_extraction_pipe_node = node(
    func=create_eml_extraction_pipe,
    inputs=None,
    outputs="eml_extraction_pipe",
    name="create_eml_extraction_pipe_node",
)

apply_text_extraction_pipeline_node = node(
    func=apply_text_extraction_pipeline,
    inputs=["emails", "eml_extraction_pipe"],
    outputs="extracted_msg_dict",
    name="apply_text_extraction_pipeline_node",
)


def create_pipeline(**kwargs) -> Pipeline:
    return pipeline(
        [create_eml_extraction_pipe_node, apply_text_extraction_pipeline_node]
    )
d
So
modular pipelines
which in the future I think will be renamed
namespaced
pipelines are your friend here https://docs.kedro.org/en/stable/nodes_and_pipelines/modular_pipelines.html
t
as @Deepyaman Datta explained, it's not the solution or required for partition dataset. It would create an instance of the pipeline for each EML file in the partition https://kedro-org.slack.com/archives/C03RKP2LW64/p1716923400992719
Anyone on this? I previously suggested a workaround that involved looping and executing a Kedro sub-pipeline "as a function" (similar to the Notebook interface functionality). However, it isn't an ideal solution. Thanks 🙏
j
Hi Thomas, Thank you for your response. I will look into other possible solutions for your requirement.
👍 1
t
Thank you, I appreciate. Another issue with my approach above, is to pass the parameters.yml to the sub-pipeline when executing it as a function
👍 1
y
This is a use case enabled by kedro-boot through the concept of KedroBootApp
See an example of looping over a pipeline in the Monte Carlo simulation example here:
t
Thanks I will have a look if a pure kedro solution isn't possible
👍 1
n
Great questions, without any context, there are few solutions for these kind of situations: • namespace pipeline (pipeline level) • partition dataset (node level) • dynamic pipeline > However, Kedro Viz currently only shows two nodes
[create_eml_extraction_pipe_node, apply_text_extraction_pipeline_node]
, which reduces transparency compared to my initial vision. Conceptually, you want to apply a loop over something. On a node level, it will be
PartitionedDataset
, on a pipeline level (that consists of multiple loop, and I guess that's what you want to visualise in kedro viz), it will be namespace pipeline. I have an example of a time series forecast pipeline with namespace
In a pseudocode fashion PartitionedDataset:
Copy code
def do_something(partitions):
    for i in range(partitions):
        run_something()

do_something()
namespace pipeline (apply the loop outside)
Copy code
for i in range(partitions):
  run_something(i)
d
I also think there is an argument we should show something like this in KedroViz when working with partitioned data
n
(just thinking out loud) I'd translate that to "I want to to partition datasets but the entire pipeline"
d
I did have this half thought out idea few years ago https://github.com/kedro-org/kedro/issues/1413
d
Consolidating from the other thread, to also get more eyes/opinions: https://kedro-org.slack.com/archives/C03RKP2LW64/p1717177371627939?thread_ts=1716926297.340449&cid=C03RKP2LW64 Also, just to level set, I want to mention that the correct syntax to use a pipeline inside another pipeline is not in a node, but like:
Copy code
def create_pipeline(**kwargs) -> Pipeline:
    return pipeline([
        node(generate_emails, "params:n", "emails"),
        pipeline([
            node(capitalize_content, "emails", "capitalized_emails"),
        ])
    ])
This will essentially unwrap the nodes and include them in the bigger pipeline, not really treat it as a subpipeline. IF you want to make a pipeline not intended for partitioned data work on partitioned data for that subpart, you can write some utility that converts that pipeline into something that works on partitions.
👍 2
t
@Nok Lam Chan it's a nice example 👌 (I'll bookmark it for later use, clever use of namespace) but it's not a solution for this case unfortunately: • I have thousands of EML files, each with an extraction to a JSON file (it would not be practical to have thousands of nodes or pipelines in VIZ 🙂 ) • I want a, ideally, a partitioned JSON as output, especially for the sake of the catalog • I don't need to feed the next pipeline with the output of the previous Conceptually, pretty close to the Monte Carlo usecase, where you would apply the same pipeline over and over again, and for N >>1.
Thanks for reviewing my code! While I could wrap all the steps (more than 10) into a Python class or function, as in the @datajoely discussion on github However, aKedro pipeline would be better than a python function, to leverage its features: • Modularity: Breaking down the steps into reusable nodes improves code organization and maintainability. • Data Management: Kedro simplifies data access and versioning. • Reproducibility: Pipelines promote clear and repeatable workflows. There are a lot of use cases we deal with collections of files like CSV, EML, etc., where applying a consistent transformation sequence (a pipeline) is crucial and without concatenating the partition, nor having N >> 1 namespaces (duplicates).
Copy code
0001.csv --> apply_pipeline --> 0001.json
0002.csv --> apply_pipeline --> 0002.json
...
9999.csv --> apply_pipeline --> 9999.json
So far, the closest "kedronic" solution is the namespace, but isn't perfect because it duplicates the pipeline N times, especially for the data catalog and the Kedro Viz. That would be a very useful feature to consolidate in kedro 🙂
d
you can nest namespaces
so you can simplify kedroviz
have a look at the
train evaluation
namespace on demo.kedro.org
👍 1
t
I understand but it's still not the "kedronic" solution, because everything should be as in https://github.com/kedro-org/kedro/issues/1413 except that it should be possible to apply a pipeline, rather than a function. In this example, replace
_preprocess_partion
by a proper Kedro pipeline that kedro will register and handle as such, without unpacking the nodes. • Two entries in the data catalog
partition_in
and
partition_out
• A single pipeline, containing the steps to handle only one file • Looping/parallelization, applying N times the same pipeline to the N (>>1, up to thousands files) of the partition As I understand, currently the workarounds are: • not use a kedro pipeline for handling the file but rather wrap all the steps into a function/class • run the pipeline "as a function" as I illustrated above • add an external mechanism to trigger the kedro pipeline (handling a single file) N times, using an API or queuing system, but there is an overhead price to pay. • use a plugin, as kedro-boot, also an "external" mechanism
Thanks for having dug into it! I really appreciate your help
d
@Thomas Bury to understand clearly, is the issue with the example I share that you are not processing each file end to end first. That is, say your pipeline you want to apply to each partition is pipe[node1, node2, node3]. You want to do:
Copy code
node1_for_eml1, node2_for_eml1, node3_for_eml1, node1_for_eml2, node2_for_eml2, node3_for_eml3, etc.
But, what have shared is forcing:
Copy code
node1_for_eml1, node1_for_eml2, ..., node1_for_emlN, node2_for_eml1, node2_for_eml2, ..., node2_for_emlN, etc.
Or are there other issues?
t
To recap: in
catalog.yml
Copy code
emails:
  type: partitions.PartitionedDataset
  path: data/01_raw/eml 
  dataset:
    type: email.EmailMessageDataset
  filename_suffix: ".eml"

# extracted_msg_dict:
extracted_msg_dict:
  type: partitions.PartitionedDataset
  path: data/02_intermediate/eml_text_extraction
  dataset:
    type: json.JSONDataset
  filename_suffix: ".json"
On each EML file I need to apply a serie of functions
Copy code
eml_extraction_pipe = pipeline(
        [
            get_email_body_node,
            clean_html_from_strings_node,
            extract_headers_node,
            extract_metadata_from_email_node,
            split_and_remove_greetings_node,
            remove_header_node,
            flatten_and_convert_to_string_node,
            finalize_processed_text_node,
            extract_type_email_node,
            detect_email_subject_markers_node,
            compile_results_node,
        ]
    )
I have thousand of EMLs (pseudo-code, if a pipeline was callable, is
0001.json = eml_extraction_pipe(0001.eml)
) The only way so for is either to run the kedro pipeline "as a function":
Copy code
def apply_text_extraction_pipeline(partitioned_input, pipeline):
    runner = SequentialRunner()
    results = {}
    for partition_id, load_partition in partitioned_input.items():
        email_msg = load_partition()
        # Create a data catalog for the current partition
        catalog = DataCatalog({"email": MemoryDataset(email_msg)})
        # Run the pipeline using the created catalog
        result = runner.run(pipeline, catalog)
        results[partition_id] = result
    return results
Or instead of using a pipeline, wrap the steps into a python function and apply it to all EML for creating the output partition as in: https://github.com/kedro-org/kedro/issues/1413
d
But my question is, why do you specifically need to call each pipeline; why can't
get_email_body_node
,
clean_html_from_strings_node
, etc. each just take a
PartitionedDataset
and return a
PartitionedDataset
with the same keys?
t
I tried that at the beginning but it's cumbersome to create all those partitions and then to recombine each parts. I will probably try harder 😄
d
If it works, I think you can add some decorator magic to make it less cumbersome 🙂 But I would make sure it works, first I don't feel it's THAT cumbersome; I think it's some repeated boilerplate, which is why I think you can convert it into a utility decorator
t
Yep, it makes sense. I'll try if it works, using few files and if it easy to recombine parts into a proper json at the last step
Funny thing: using my workaround, executing the single file pipeline as a function. Kedro logging correctly detect the sub-pipeline, contrary to Kedro Viz
Copy code
INFO     Completed 12 out of 12 tasks                                                                                                                                                                                             
.
.
.
                    INFO     Completed 1 out of 12 tasks                                                                                                                                                                                                                                                                                                                                                                                  
                    INFO     Completed 2 out of 12 tasks