Emilio Gagliardi
08/16/2023, 2:54 AMDeepyaman Datta
08/16/2023, 2:59 AMEmilio Gagliardi
08/16/2023, 4:14 PMnode(
func=extract_rss_1_feed,
inputs="rss_1_feed_extract", <-- a custom dataset that extracts the RSS feed
outputs="rss_1_feed_for_transforming", <-- memory dataset
name="extract_rss_1_feed",
),
node(
func=transform_rss_1_feed,
inputs=["rss_1_feed_for_transforming", "params:rss_1"],
outputs="rss_1_feed_for_loading", <-- memory dataset
name="transform_rss_1_feed",
),
node(
func=load_rss_1_feed,
inputs="rss_1_feed_for_loading",
outputs="rss_1_intermediate", <-- custom dataset that saves the data to mongo collection
name="load_rss_1_feed",
),
Then in the transformation pipeline, I extract from the mongo collection and process the feed
node(
func=extract_rss_1_data,
inputs="rss_1_intermediate", <-- same custom dataset as above
outputs="rss_1_data_for_augmenting", <-- memory
name="extract_rss_1_data",
),
node(
func=augment_rss_1_data,
inputs=["rss_1_data_for_augmenting", "params:rss_1_augmented"],
outputs="rss_1_data_for_loading", <-- memory
name="augment_rss_1_data",
),
node(
func=load_rss_1_augmented,
inputs=["rss_1_data_for_loading"],
outputs="rss_1_augmented", <-- custom dataset to save to a different db/collection
name="load_rss_1_aug",
),
now my second scrape pipeline, loads files from an azure blob
node(
func=extract_partitioned_json,
inputs="partitioned_cleaned_emails_json", <-- custom dataset
outputs="jsons_for_combining", <-- memory dataset
name="extract_partitioned_json",
),
node(
func=combine_partitioned_json,
inputs="jsons_for_combining",
outputs="jsons_for_cleaning", <-- memory dataset
name="combine_partitioned_json",
),
node(
func=clean_jsons,
inputs="jsons_for_cleaning",
outputs="jsons_for_loading_interm", <-- memory dataset
name="clean_jsons",
),node(
func=load_jsons,
inputs="jsons_for_loading",
outputs="email_jsons_interm_1", <-- custom dataset to save jsons to mongo db/collection
name="load_jsons",
),
so I want to ensure that the pipelines that access the rss_1_feed_extract
and partitioned_cleaned_emails_json
datasets run before the other two pipelines. Did I set up the pipelines/nodes correctly?Deepyaman Datta
08/18/2023, 12:30 AMclean_jsons
returns jsons_for_loading_interm
, but the input to load_jsons
is jsons_for_loading
.)
Have you tried using Kedro-Viz to visualize your pipeline? That can make it visually obvious what order things are guaranteed to run in.Emilio Gagliardi
08/18/2023, 6:18 PM