Hi everyone, I've been making progress on my first...
# questions
e
Hi everyone, I've been making progress on my first project and I have a question for how to run a kedro project when you want some pipelines to run before others. I have two pipelines that scrape data from the internet and each one of those pipelines conceptually flows into a transformation pipeline. So I have 4 pipelines but I want to control which ones go first and second and so on. I'm not clear what happens if I just type "kedro run" what is the default ordering based on? Do I specify the relationships somewhere? I just want the two extracting pipelines to run before the transformation pipelines. THanks kindly :)
d
Ordering is based entirely on the DAG (the full structure of which node outputs are fed in as inputs to other nodes). So, it's less about controlling which pipeline runs first, but rather making sure that, for example: • Assume pipeline scrape1 produces output1 • Assume pipeline scrape2 produces output2 • Some node in pipeline transformation consumes output1 • Another node in pipeline transformation consumes output2 Now, logically, those nodes in transformation would need to run after the producing nodes in scrape1 and scrape2
To be less abstract perhaps, have you run either the Iris example or the Spaceflights example? I can show you how the pipeline order is determined there.
e
Thank you @Deepyaman Datta I guess I might not have built my pipelines correctly because at the end of my scape pipelines I save the scraped data raw to a mongo collection and then my transformation pipeline loads the data from the mongo collection, So what I don't understand is how does kedro know which dataset is the first one as opposed to an intermediate one? So in my first scrape pipeline, I scrape from 4 RSS feeds, so I have 3 nodes per feed (12 in total) like the following:
Copy code
node(
                func=extract_rss_1_feed,
                inputs="rss_1_feed_extract", <-- a custom dataset that extracts the RSS feed
                outputs="rss_1_feed_for_transforming", <-- memory dataset
                name="extract_rss_1_feed",
            ),
            node(
                func=transform_rss_1_feed,
                inputs=["rss_1_feed_for_transforming", "params:rss_1"],
                outputs="rss_1_feed_for_loading", <-- memory dataset
                name="transform_rss_1_feed",
            ),
            node(
                func=load_rss_1_feed,
                inputs="rss_1_feed_for_loading",
                outputs="rss_1_intermediate", <-- custom dataset that saves the data to mongo collection
                name="load_rss_1_feed",
            ),
Then in the transformation pipeline, I extract from the mongo collection and process the feed
Copy code
node(
                func=extract_rss_1_data,
                inputs="rss_1_intermediate", <-- same custom dataset as above
                outputs="rss_1_data_for_augmenting", <-- memory
                name="extract_rss_1_data",
            ),
            node(
                func=augment_rss_1_data,
                inputs=["rss_1_data_for_augmenting", "params:rss_1_augmented"],
                outputs="rss_1_data_for_loading", <-- memory
                name="augment_rss_1_data",
            ),
            node(
                func=load_rss_1_augmented,
                inputs=["rss_1_data_for_loading"],
                outputs="rss_1_augmented", <-- custom dataset to save to a different db/collection
                name="load_rss_1_aug",
            ),
now my second scrape pipeline, loads files from an azure blob
Copy code
node(
                func=extract_partitioned_json,
                inputs="partitioned_cleaned_emails_json", <-- custom dataset
                outputs="jsons_for_combining", <-- memory dataset
                name="extract_partitioned_json",
            ),
            node(
                func=combine_partitioned_json,
                inputs="jsons_for_combining",
                outputs="jsons_for_cleaning", <-- memory dataset
                name="combine_partitioned_json",
            ),
            node(
                func=clean_jsons,
                inputs="jsons_for_cleaning",
                outputs="jsons_for_loading_interm", <-- memory dataset
                name="clean_jsons",
            ),node(
                func=load_jsons,
                inputs="jsons_for_loading",
                outputs="email_jsons_interm_1", <-- custom dataset to save jsons to mongo db/collection
                name="load_jsons",
            ),
so I want to ensure that the pipelines that access the
rss_1_feed_extract
and
partitioned_cleaned_emails_json
datasets run before the other two pipelines. Did I set up the pipelines/nodes correctly?
d
What are the inputs for the other two pipelines? From what you've shared, it seems correct that the transformation pipeline will run after the first scrape pipeline, and the second scrape pipeline has no datasets in common, so you don't know if it'll run before/after/during the other pipelines. (One thing I noticed is
clean_jsons
returns
jsons_for_loading_interm
, but the input to
load_jsons
is
jsons_for_loading
.) Have you tried using Kedro-Viz to visualize your pipeline? That can make it visually obvious what order things are guaranteed to run in.
e
i keep forgetting to use kedro-viz, I'm not totally sure what its helpful for but you just poked me to try it out :)
👍 1