Hi everyone - I have a single output from a node w...
# questions
е
Hi everyone - I have a single output from a node which is a dictionary/. I get an Error of:
"ValueError: Failed to save outputs of node parallel_get_temp_data([dates_to_download]) -> [downloaded_dates].
The node definition contains a list of outputs ['downloaded_dates'], whereas the node function returned a 'dict'."
Could you, please, help me to make it work? Thank you in advance!
Copy code
import logging
from typing import Dict
import typing
import pandas as pd
from sg_api.nodes.kedro_temperature_nodes import get_temp_data, choose_station
from kedro.pipeline import Pipeline, node
from multiprocessing.dummy import Pool


def generate_date_range(
        start_date: str,
        end_date: str,
):
    dates_to_download = {
        str(dt.date()): True
        for dt in pd.date_range(start_date, end_date)
    }
    return dates_to_download


def parallel_get_temp_data(dates_to_download: Dict[str, bool]) -> Dict[str, Dict]:
    """

    Args:
        dates_to_download (object):
    """
    logger = logging.getLogger('parallel_get_temp_data')

    def _get_temp_data(dt):
        <http://logger.info|logger.info>(f"Start  Download {dt}")
        try:
            date_data = get_temp_data(dt)
        except KeyboardInterrupt:
            raise
        except Exception as e:
            logger.error(f"Failed Download {e}")
            date_data = None
        <http://logger.info|logger.info>(f"Finish Download {dt}")
        return dt, date_data

    with Pool(10) as p:
        downloaded_data = p.map(_get_temp_data, dates_to_download.keys())
        downloaded_data = filter(lambda x: x is not None, downloaded_data)

    downloaded_data_dict = dict(downloaded_data)
    return downloaded_data_dict


def parallel_choose_station(
        downloaded_data_dict: Dict,
        station_id: str,
):
    logger = logging.getLogger('parallel_choose_station')

    def _choose_station(item):
        dt = item[0]
        dt_data = item[1]
        <http://logger.info|logger.info>(f"Start Choose Station {dt}")
        station_data = choose_station(dt_data, station_id)
        <http://logger.info|logger.info>(f"Start Choose Station {dt}")
        return dt, station_data

    with Pool(10) as p:
        downloaded_station_data = p.map(_choose_station, downloaded_data_dict.items())

    return dict(downloaded_station_data)


def create_pipeline():

    return Pipeline([
        node(
            generate_date_range,
            inputs=['params:start_date', 'params:end_date'],
            outputs='dates_to_download'
        ),
        node(
            parallel_get_temp_data,
            inputs=['dates_to_download'],
            outputs=['downloaded_dates'],
        ),
        node(
            parallel_choose_station,
            inputs=['downloaded_dates', 'params:station_id'],
            outputs=['downloaded_station_data'],
        )
    ])
m
try removing the brackets from
outputs=['downloaded_dates']
in the second node ->
outputs=downloaded_dates
❤️ 2
е
thank you!