Dotun O
04/11/2023, 7:27 PMЕлена Сидорова
04/12/2023, 5:15 AM"ValueError: Failed to save outputs of node parallel_get_temp_data([dates_to_download]) -> [downloaded_dates].
The node definition contains a list of outputs ['downloaded_dates'], whereas the node function returned a 'dict'."
Could you, please, help me to make it work?
Thank you in advance!
import logging
from typing import Dict
import typing
import pandas as pd
from sg_api.nodes.kedro_temperature_nodes import get_temp_data, choose_station
from kedro.pipeline import Pipeline, node
from multiprocessing.dummy import Pool
def generate_date_range(
start_date: str,
end_date: str,
):
dates_to_download = {
str(dt.date()): True
for dt in pd.date_range(start_date, end_date)
}
return dates_to_download
def parallel_get_temp_data(dates_to_download: Dict[str, bool]) -> Dict[str, Dict]:
"""
Args:
dates_to_download (object):
"""
logger = logging.getLogger('parallel_get_temp_data')
def _get_temp_data(dt):
<http://logger.info|logger.info>(f"Start Download {dt}")
try:
date_data = get_temp_data(dt)
except KeyboardInterrupt:
raise
except Exception as e:
logger.error(f"Failed Download {e}")
date_data = None
<http://logger.info|logger.info>(f"Finish Download {dt}")
return dt, date_data
with Pool(10) as p:
downloaded_data = p.map(_get_temp_data, dates_to_download.keys())
downloaded_data = filter(lambda x: x is not None, downloaded_data)
downloaded_data_dict = dict(downloaded_data)
return downloaded_data_dict
def parallel_choose_station(
downloaded_data_dict: Dict,
station_id: str,
):
logger = logging.getLogger('parallel_choose_station')
def _choose_station(item):
dt = item[0]
dt_data = item[1]
<http://logger.info|logger.info>(f"Start Choose Station {dt}")
station_data = choose_station(dt_data, station_id)
<http://logger.info|logger.info>(f"Start Choose Station {dt}")
return dt, station_data
with Pool(10) as p:
downloaded_station_data = p.map(_choose_station, downloaded_data_dict.items())
return dict(downloaded_station_data)
def create_pipeline():
return Pipeline([
node(
generate_date_range,
inputs=['params:start_date', 'params:end_date'],
outputs='dates_to_download'
),
node(
parallel_get_temp_data,
inputs=['dates_to_download'],
outputs=['downloaded_dates'],
),
node(
parallel_choose_station,
inputs=['downloaded_dates', 'params:station_id'],
outputs=['downloaded_station_data'],
)
])
viveca
04/12/2023, 3:45 PMfig.write_html()
. So I made a simple custom dataset
class PlotlyHTMLDataSet(JSONDataSet):
"""Export plotly figure to html"""
def _save(self, data: go.Figure) -> None:
save_path = get_filepath_str(self._get_save_path(), self._protocol)
with self._fs.open(save_path, **self._fs_open_args_save) as fs_file:
data.write_html(fs_file, **self._save_args)
self._invalidate_cache()
This worked fine… except the content-type of the html file on s3 “ends up” being “binary/octet-stream”, but should be “text/html”. This becomes a problem when trying to display this in a browser. Anyone got experience of args you could pass here to manually set the content type? Not my area of expertise.
Thanks,
VivecaFilip Wójcik
04/13/2023, 7:51 AMmarrrcin
04/13/2023, 10:41 AM.get
method of the config loader behaves differently between ConfigLoader
and OmegaConfigLoader
?
cl = ConfigLoader("conf")
cl.get("custom*")
{'test': ':)'}
cl = OmegaConfigLoader("conf")
cl.get("custom*")
None
FlorianGD
04/13/2023, 12:57 PMAPIDataSet
. Would you accept a PR to add a _save
method so we can save data with a POST request to an endpoint ? We sometimes have to update data on a django app (or any rest api), and we create a custom dataset, but it feels like this could be integrated to the generic APIDataSet
Juan Diego
04/13/2023, 3:07 PMbefore_pipeline_run
hook is the way to go.
Can you advise us on the bests to achieve this?
What we tried so far is condensed is this code.
class ParamsHook:
@hook_impl
def before_pipeline_run(
self, run_params: Dict[str, Any], pipeline: Any, catalog: DataCatalog
) -> None:
catalog.add_feed_dict({"params:country": MemoryDataSet("ESP")}, replace=True)
In the hook:
1. In run_params
we can see: 'extra_params': {'country': 'USA'}
2. In catalog.list()
this entry: 'params:country'
,before and after invoking add_feed_dict
But when params are printed in the node he value persist with the original value parsed by: kedro run --params country=USA
Many thanks in advance!
NOTE: The objective here is to be able to parse a list from the CLI, let’s say: --params countries="ESP<>USA"
and do the split in the hook.Vladislav Stepanov
04/13/2023, 3:46 PMProjectMetadata(
config_file=PosixPath('/Workspace/Repos/.../.../.../modeling/.../.../scalability/pyproject.toml'),
package_name='scalability',
project_name='scalability',
project_path=PosixPath('/Workspace/Repos/.../.../.../modeling/.../.../scalability'),
project_version='0.18.4',
source_dir=PosixPath('/Workspace/Repos/.../.../.../modeling/.../.../scalability/src')
)
but when session.run() is executed it gives me an error:
ModuleNotFoundError: No module named 'scalability.pipeline_registry'
and I have this file under /Workspace/Repos/.../.../.../modeling/.../.../scalability/src/scalability/pipeline_registry
even though source_dir from ProjectMetadata shows the correct path, why it gives me this error?
thanks in advance!Suhas Kotaki
04/14/2023, 7:29 AMdivas verma
04/14/2023, 11:17 AMcatalog.load
in kedro 0.18.4
Py4JJavaError: An error occurred while calling o186.load.
: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class
org.apache.hadoop.fs.s3a.S3AFileSystem not found
any thoughts on what could be going on here?Rafael Hoffmann Fallgatter
04/14/2023, 11:41 AMBen Levy
04/14/2023, 1:14 PMAndrew Stewart
04/14/2023, 6:10 PMAndrew Doherty
04/15/2023, 11:31 AMtrain_start
and train_end
time I could set a global parameter rather than duplicating these for each namespace? If not is there a way to replicate this functionality?
Thanks a lot for your time,
AndrewRob
04/16/2023, 2:48 AM'plotly.JSONDataSet'
s , save them into the catalog as a single item and plot all of them in kedro-viz
? Any other suggestion is welcome 🙂
Thanks in advance!Massinissa Saïdi
04/17/2023, 9:03 AMElielson Silva
04/17/2023, 12:28 PMMatheus Sampaio
04/17/2023, 2:22 PMGary McCormack
04/17/2023, 3:56 PMconf/base/logging.yaml
by default (please correct me if I'm wrong), so I would have thought that I would need to create my own custom logger. If I remove the logging conf file however then this raises an exception (ValueError: dictionary doesn't specify a version
). Any suggestions or advice on how to get setup a custom logger to run on Kedro would be great!Jordan
04/17/2023, 4:05 PMAndrew Doherty
04/17/2023, 4:36 PMparameters.yml
file looks something like this:
namespace1:
raw_data:
datasource1
datasource2
namespace2:
raw_data:
datasource1
datasource3
I have configured a catalog with aligned naming that looks like the following:
namespace1.datasource1:
filepath: data/namespace1/source1.csv
type: pandas.CSVDataSet
namespace1.datasource2:
filepath: data/namespace1/source2.csv
type: pandas.CSVDataSet
namespace2.datasource1:
filepath: data/namespace2/source1.csv
type: pandas.CSVDataSet
namespace2.datasource3:
filepath: data/namespace2/source3.csv
type: pandas.CSVDataSet
I am have many more datasources than shown here which is where the challenge lies. I was wondering if I could create a node that would loop round all of the datasources and then dynamically save to the correct locations like:
nodes = [
node(
func=get_data, # this would loop through all the raw_data entries and download data into a list of df's
inputs="params:raw_data", # passing the list ["datasource1", "datasource2"]
outputs="params:raw_data" # passing the list ["datasource1", "datasource2"] as catalog entries
)
]
This would mean that the inputs and outputs would be dynamic based on the parameters.yml
and if any additional datasources are added/removed this would be reflected. This method does not work as the string params:raw_data" is passed rather than the parameters for the outputs.
Does anyone have a suggestion for how to make this dynamic and avoid creating a node per data source with hard coded inputs and outputs or modifying the structure of my parameters file?
Thanks againcharles
04/18/2023, 4:10 PMDataSetError: Failed while loading data from data set JSONDataSet().
module 'aiobotocore' has no attribute 'AioSession'
Aaditya
04/19/2023, 8:56 AMDharmesh Soni
04/19/2023, 11:15 AMDharmesh Soni
04/19/2023, 11:20 AMimport_pipelines
is returning a dictionary. When I try to import import_pipelines
into the test, it is not producing any results. I tried to print the results within class and it is printing but when I try to print in the test it is not. Can someone help here?Leo Cunha
04/19/2023, 11:38 AMkedro run --custom-flag
. I wanted to pass this flag into KedroContext
. Does anyone know how I could do that? p.s If I pass to extra_params
it will complain that this param is not in the config files (I didn't want it to be in the config files)Luca Disse
04/19/2023, 2:32 PMYinghao Dai
04/19/2023, 3:58 PMMate Scharnitzky
04/20/2023, 8:29 AMpandas
dependencies
Hi All,
What is the recommended way to handle dependencies for Kedro datasets together with other dependencies in a repo?
• either specifying them through kedro, e.g., kedro[pandas.ExcelDataSet]
• or using kedro_datasets
?
Context
• We’re in the process to upgrade our Python env from 3.7
to 3.9
• Our current kedro version is 0.18.3
• When upgrading our branch to Python 3.9
and keeping all other things intact, we get a requirement compilation error for pandas
. In our repo, we consistently pin pandas to ~=1.3.0
which should be aligned with kedro’s pin ~=1.3
defined in the form of kedro[pandas.ExcelDataSet]==0.18.3
. Interestingly and surprisingly, if we remove kedro[pandas.ExcelDataSet]==0.18.3
, the compilation error disappears, while openpyxl
is missing (this latter is expected).
• We’re thinking to change the way we load kedro datasets dependencies and use kedro_datasets
instead, but we would like to get your guidance what’s your recommended handling kedro dataset dependencies, especially from a maintenance point of view.
Thank you!Ana Man
04/20/2023, 9:20 AMbefore_pipeline_run
) that is a subclass of another hook (A) that implements a different hook specification (e.g after_context_created
) that is defined in a plugin to kedro (Plugin A). If I disable Plugin A and register hook B in my settings.py file, would the hook spec defined in hook A still run? I am finding that it does but was unsure if that was what was suppose to happen. Hope that makes sense @Nok Lam Chan