Thomas Bury
05/28/2024, 7:58 PMTypeError: 'Pipeline' object is not callable
the pipeline cannot be re-used that way, not a function. I could replace the pipeline by a simple function, but that is not kedronic, nor elegant.
A mock-up example would indeed help, thank you
def create_eml_extraction_pipe
return pipeline(
[
get_email_body_node,
clean_html_from_strings_node,
extract_headers_node,
extract_metadata_from_email_node,
split_and_remove_greetings_node,
remove_header_node,
flatten_and_convert_to_string_node,
finalize_processed_text_node,
extract_type_email_node,
detect_email_subject_markers_node,
compile_results_node,
]
)
def apply_text_extraction_pipeline(
partitioned_input: Dict[str, Callable[[], pd.DataFrame]],
text_extraction_pipeline: Pipeline,
) -> List[str]:
json_partition = {}
# Iterate over each partition and update the set of common columns
for partition_id, load_partition in partitioned_input.items():
email = load_partition()
output = text_extraction_pipeline(email)
json_partition[partition_id] = output
# Return the list of common columns
return json_partition
create_eml_extraction_pipe_node = node(
func=create_eml_extraction_pipe,
inputs=None,
outputs="eml_extraction_pipe",
name="create_eml_extraction_pipe_node",
)
apply_text_extraction_pipeline_node = node(
func=apply_text_extraction_pipeline,
inputs=["emails", "eml_extraction_pipe"],
outputs="extraction_dicts",
name="apply_text_extraction_pipeline_node",
)
def create_pipeline(**kwargs) -> Pipeline:
return pipeline([create_eml_extraction_pipe_node, apply_text_extraction_pipeline_node])
Deepyaman Datta
05/28/2024, 9:15 PMA mock-up example would indeed help, thank youI will do it tonight or tonight or tomorrow!
Deepyaman Datta
05/31/2024, 8:24 AMDeepyaman Datta
05/31/2024, 5:42 PMPartitionedDataset
(or in-memory representation): https://github.com/deepyaman/partitioned-dataset-demo
In creating this demo, a couple things came to mind:
1. A Kedro pipeline is not like a scikit-learn pipeline, in the sense that you wouldn't apply a Kedro pipeline in a node [1]. When working with partitioned data, each node in that pipeline you would want to apply should be a node itself.
2. Now, let's say you have an existing pipeline (that works on a single partition) that you want to apply. As you can see in the example, there is a repeated pattern of for partition_id, partition_load_func_or_data in partitions.items(): ...
. If you really want to use an existing pipeline on a partitioned dataset, (I think) you could automatically create such a pipeline by parsing that_pipeline.nodes
and applying a decorator to each node that makes it work on partitioned data. I didn't have time to explore this yet, but I may in the future.
Notes
[1] Technically, maybe you could, by constructing a SequentialRunner()
and executing this in a node, but would highly recommend against it.Thomas Bury
06/05/2024, 12:11 PM0001.csv --> apply_pipeline --> 0001.json
0002.csv --> apply_pipeline --> 0002.json
...
9999.csv --> apply_pipeline --> 9999.json
So far, the closest "kedronic" solution is the namespace, but isn't perfect because it duplicates the pipeline N times, especially for the data catalog and the Kedro Viz.
That would be a very useful feature to consolidate in kedro š