Hello all! I have this small pipeline with a super...
# questions
f
Hello all! I have this small pipeline with a super-node called
prepare_data
. I want the only output of this super-node to be
freq_merged
. However, the code below also outputs
freq_cleaned
and
casa_cleaned
. Yet the only output I specified for the namespace
prepare_data
is
freq_merged
, so I don't understand why it doesn't work. I can't manage to figure it out and I've tried many different things, so your help is precious!
Copy code
def create_pipeline(**kwargs) -> Pipeline:

    clean_data_pipeline = pipeline(
        pipe=[
            node(
                func=clean_freq,
                inputs='freq',
                outputs='freq_cleaned',
                name='clean_freq'
            ),
            node(
                func=clean_casa,
                inputs='casa',
                outputs='casa_cleaned',
                name='clean_casa'
            ),
        ],
        inputs=['freq', 'casa'],
        outputs=['casa_cleaned', 'freq_cleaned'],
        namespace='clean_data'
    )

    merge_data_pipeline = pipeline(
        pipe=[
            node(
                func=summarize_freq_by_coarse_ibc_code,
                inputs='freq_cleaned',
                outputs='freq_summarized',
                name='summarize_freq_by_coarse_ibc_code'
            ),
            node(
                func=merge_casa_freq,
                inputs=['casa_cleaned', 'freq_summarized', 'text_processor'],
                outputs='freq_merged',
                name='merge_casa_freq'
            ),
        ],
        outputs='freq_merged'
    )

    return pipeline(
        pipe=[
            clean_data_pipeline,
            merge_data_pipeline
        ],
        outputs='freq_merged',
        namespace='prepare_data'
    )
IMG_20240115_202859.jpg
Data catalog:
Copy code
casa_cleaned:
  type: pandas.CSVDataset
  filepath: data/02_intermediate/casa_cleaned.csv
  metadata:
    kedro-viz:
      layer: intermediate
      preview_args:
        nrows: 15

freq_cleaned:
  type: pandas.CSVDataset
  filepath: data/02_intermediate/freq_cleaned.csv
  metadata:
    kedro-viz:
      layer: intermediate
      preview_args:
        nrows: 15

freq_summarized:
  type: pandas.CSVDataset
  filepath: data/02_intermediate/freq_summarized.csv
  metadata:
    kedro-viz:
      layer: intermediate
      preview_args:
        nrows: 15

text_processor:
  type: pickle.PickleDataset
  filepath: data/02_intermediate/text_processor.pkl
  backend: pickle

freq_merged:
  type: pandas.CSVDataset
  filepath: data/03_primary/freq_merged.csv
a
These
cleaned
entries are inputs to the
prepare_data
they need to be present, however they should not run and pipeline should fail if you delete them and run only the
prepare_data
namespace. You return the pipeline created as a merge of the 2 in namespace
prepare_data
, so they both are put in
prepare_data
, I believe. Both get run and the first one generates other products.
If you want those separated you need to either import them separtely and define in
pipeline_registry
or separate their folders and merge them if you need in the registry.
f
Thanks Artur! It's weird because in this example, the objects "regressor" are not outputs of "random forest" and "linear regression" super-nodes, but they are registered in the data catalog. https://demo.kedro.org/?pipeline_id=Modelling%20stage&selected_id=10e51dea https://github.com/kedro-org/kedro-viz/blob/main/demo-project/conf/base/catalog_06_models.yml https://github.com/kedro-org/kedro-viz/blob/main/demo-project/src/demo_project/pipelines/modelling/pipeline.py
Ok, somehow I've worked it out:
Copy code
def create_pipeline(**kwargs) -> Pipeline:

    clean_data_pipeline = pipeline(
        [
            node(
                func=clean_freq,
                inputs='freq',
                outputs='freq_cleaned',
                name='clean_freq'
            ),
            node(
                func=clean_casa,
                inputs='casa',
                outputs='casa_cleaned',
                name='clean_casa'
            ),
        ],
        namespace='clean_data',
        inputs={'freq', 'casa'}
    )

    total_pipeline = pipeline(
        [
            clean_data_pipeline,
            node(
                func=summarize_freq_by_coarse_ibc_code,
                inputs='clean_data.freq_cleaned',
                outputs='freq_summarized',
                name='summarize_freq_by_coarse_ibc_code'
            ),
            node(
                func=merge_casa_freq,
                inputs={'clean_data.casa_cleaned', 'freq_summarized', 'text_processor'},
                outputs='freq_merged',
                name='merge_casa_freq'
            ),
        ],
        namespace='freq_preprocessing',  # provide inputs
        inputs={'freq', 'casa', 'text_processor'},  # map inputs outside of namespace
        outputs={'freq_merged'},
    )

    return total_pipeline
And the DataCatalog:
Copy code
clean_data.casa_cleaned:
  type: pandas.CSVDataset
  filepath: data/02_intermediate/casa_cleaned.csv
  metadata:
    kedro-viz:
      layer: intermediate
      preview_args:
        nrows: 15

clean_data.freq_cleaned:
  type: pandas.CSVDataset
  filepath: data/02_intermediate/freq_cleaned.csv
  metadata:
    kedro-viz:
      layer: intermediate
      preview_args:
        nrows: 15
Since
clean_freq
and
clean_casa
are outputs of the namespace
clean_data
, I added the prefix "clean_data" in the DataCatalog.