Nicolas Rosso
03/02/2023, 1:27 PMfrom kedro.pipeline import Pipeline, node, pipeline
from .nodes import medium_posts_extract_file, medium_posts_transform_file, medium_posts_upload_transformed_file_to_gcp, medium_posts_persist_file_in_gcp, delete_files
from datetime import datetime
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
#Defino los nodos dentro del pipeline y el orden de ejecución. Cada nodo puede tener 1 o mas funciones (definidas en nodes.py)
def create_pipeline(**kwargs) -> Pipeline:
return pipeline(
[
node(
func=medium_posts_extract_file,
inputs=None,
outputs="medium_posts_raw_file",
name="medium_posts_extract_file_node",
tags=["extract"]
),
node(
func=medium_posts_transform_file,
inputs="medium_posts_raw_file",
outputs="medium_posts_transformed_file",
name="medium_posts_transform_file_node",
tags=["transform"]
),
node(
func=medium_posts_upload_transformed_file_to_gcp,
inputs="medium_posts_transformed_file",
outputs=None,
name="medium_posts_upload_transformed_file_to_gcp_node",
tags=["upload"]
),
node(
func=medium_posts_persist_file_in_gcp,
inputs="medium_posts_raw_file",
outputs=None,
name="medium_posts_persist_file_in_gcp_node",
tags=["persist"]
),
node(
func=delete_files,
inputs="medium_posts_transformed_file",
outputs=None,
name="delete_files_node",
tags=["delete"]
)
],
tags_hierarchy={
"extract": [],
"transform": ["extract"],
"upload": ["transform"],
"persist": ["upload"],
"delete": ["persist"]
}
)
datajoely
03/02/2023, 1:28 PMpipeline
constructor
tags_hierarchy={
"extract": [],
"transform": ["extract"],
"upload": ["transform"],
"persist": ["upload"],
"delete": ["persist"]
}
that’s not expected by Kedro unless you’ve modified itNicolas Rosso
03/02/2023, 1:29 PMdatajoely
03/02/2023, 1:30 PMNicolas Rosso
03/02/2023, 1:31 PMdatajoely
03/02/2023, 1:31 PMNicolas Rosso
03/02/2023, 1:32 PMdatajoely
03/02/2023, 1:32 PMNicolas Rosso
03/02/2023, 1:33 PMdatajoely
03/02/2023, 1:39 PMNicolas Rosso
03/02/2023, 1:40 PMdatajoely
03/02/2023, 2:01 PM