Елена Сидорова
04/12/2023, 5:15 AM"ValueError: Failed to save outputs of node parallel_get_temp_data([dates_to_download]) -> [downloaded_dates].
The node definition contains a list of outputs ['downloaded_dates'], whereas the node function returned a 'dict'."
Could you, please, help me to make it work?
Thank you in advance!
import logging
from typing import Dict
import typing
import pandas as pd
from sg_api.nodes.kedro_temperature_nodes import get_temp_data, choose_station
from kedro.pipeline import Pipeline, node
from multiprocessing.dummy import Pool
def generate_date_range(
start_date: str,
end_date: str,
):
dates_to_download = {
str(dt.date()): True
for dt in pd.date_range(start_date, end_date)
}
return dates_to_download
def parallel_get_temp_data(dates_to_download: Dict[str, bool]) -> Dict[str, Dict]:
"""
Args:
dates_to_download (object):
"""
logger = logging.getLogger('parallel_get_temp_data')
def _get_temp_data(dt):
<http://logger.info|logger.info>(f"Start Download {dt}")
try:
date_data = get_temp_data(dt)
except KeyboardInterrupt:
raise
except Exception as e:
logger.error(f"Failed Download {e}")
date_data = None
<http://logger.info|logger.info>(f"Finish Download {dt}")
return dt, date_data
with Pool(10) as p:
downloaded_data = p.map(_get_temp_data, dates_to_download.keys())
downloaded_data = filter(lambda x: x is not None, downloaded_data)
downloaded_data_dict = dict(downloaded_data)
return downloaded_data_dict
def parallel_choose_station(
downloaded_data_dict: Dict,
station_id: str,
):
logger = logging.getLogger('parallel_choose_station')
def _choose_station(item):
dt = item[0]
dt_data = item[1]
<http://logger.info|logger.info>(f"Start Choose Station {dt}")
station_data = choose_station(dt_data, station_id)
<http://logger.info|logger.info>(f"Start Choose Station {dt}")
return dt, station_data
with Pool(10) as p:
downloaded_station_data = p.map(_choose_station, downloaded_data_dict.items())
return dict(downloaded_station_data)
def create_pipeline():
return Pipeline([
node(
generate_date_range,
inputs=['params:start_date', 'params:end_date'],
outputs='dates_to_download'
),
node(
parallel_get_temp_data,
inputs=['dates_to_download'],
outputs=['downloaded_dates'],
),
node(
parallel_choose_station,
inputs=['downloaded_dates', 'params:station_id'],
outputs=['downloaded_station_data'],
)
])
Michał Madej
04/12/2023, 5:49 AMoutputs=['downloaded_dates']
in the second node -> outputs=downloaded_dates
Елена Сидорова
04/12/2023, 5:51 AM