Ok. I just don't get how to move from the single E...
# questions
t
Ok. I just don't get how to move from the single EML example above to the partition dataset case. How should I re-use the pipeline here above in the partition case? I came up with something like below but it returns:
TypeError: 'Pipeline' object is not callable
the pipeline cannot be re-used that way, not a function. I could replace the pipeline by a simple function, but that is not kedronic, nor elegant. A mock-up example would indeed help, thank you
Copy code
def create_eml_extraction_pipe
 return pipeline(
        [
            get_email_body_node,
            clean_html_from_strings_node,
            extract_headers_node,
            extract_metadata_from_email_node,
            split_and_remove_greetings_node,
            remove_header_node,
            flatten_and_convert_to_string_node,
            finalize_processed_text_node,
            extract_type_email_node,
            detect_email_subject_markers_node,
            compile_results_node,
        ]
    )



def apply_text_extraction_pipeline(
    partitioned_input: Dict[str, Callable[[], pd.DataFrame]],
    text_extraction_pipeline: Pipeline,
) -> List[str]:

    json_partition = {}
    # Iterate over each partition and update the set of common columns
    for partition_id, load_partition in partitioned_input.items():
        email = load_partition()
        output = text_extraction_pipeline(email)
        json_partition[partition_id] = output
    # Return the list of common columns
    return json_partition
Copy code
create_eml_extraction_pipe_node = node(
    func=create_eml_extraction_pipe,
    inputs=None,
    outputs="eml_extraction_pipe",
    name="create_eml_extraction_pipe_node",
)

apply_text_extraction_pipeline_node = node(
    func=apply_text_extraction_pipeline,
    inputs=["emails", "eml_extraction_pipe"],
    outputs="extraction_dicts",
    name="apply_text_extraction_pipeline_node",
)

def create_pipeline(**kwargs) -> Pipeline:
    return pipeline([create_eml_extraction_pipe_node, apply_text_extraction_pipeline_node])
d
A mock-up example would indeed help, thank you
I will do it tonight or tonight or tomorrow!
šŸ™ 1
Sorry, got delayed due to some disruptions that came up. I started work on a quick demo today, will try to wrap up and share tomorrow.
I don't know if this will be helpful (you have a lot of savvy answers on the other thread), but I created an example of different ways you can pass data to and from
PartitionedDataset
(or in-memory representation): https://github.com/deepyaman/partitioned-dataset-demo In creating this demo, a couple things came to mind: 1. A Kedro pipeline is not like a scikit-learn pipeline, in the sense that you wouldn't apply a Kedro pipeline in a node [1]. When working with partitioned data, each node in that pipeline you would want to apply should be a node itself. 2. Now, let's say you have an existing pipeline (that works on a single partition) that you want to apply. As you can see in the example, there is a repeated pattern of
for partition_id, partition_load_func_or_data in partitions.items(): ...
. If you really want to use an existing pipeline on a partitioned dataset, (I think) you could automatically create such a pipeline by parsing
that_pipeline.nodes
and applying a decorator to each node that makes it work on partitioned data. I didn't have time to explore this yet, but I may in the future. Notes [1] Technically, maybe you could, by constructing a
SequentialRunner()
and executing this in a node, but would highly recommend against it.
šŸ‘ 1
t
Thanks for reviewing my code! While I could wrap all the steps (more than 10) into a Python class or function, a Kedro pipeline would be better, to leverage its features: • Modularity: Breaking down the steps into reusable nodes improves code organization and maintainability. • Data Management: Kedro simplifies data access and versioning. • Reproducibility: Pipelines promote clear and repeatable workflows. There are a lot of usecase we deal with collections of files like CSV, EML, etc., where applying a consistent transformation sequence (a pipeline) is crucial and without concatenating the partition, nor having N >> 1 namespaces (duplicates).
Copy code
0001.csv --> apply_pipeline --> 0001.json
0002.csv --> apply_pipeline --> 0002.json
...
9999.csv --> apply_pipeline --> 9999.json
So far, the closest "kedronic" solution is the namespace, but isn't perfect because it duplicates the pipeline N times, especially for the data catalog and the Kedro Viz. That would be a very useful feature to consolidate in kedro šŸ™‚