Andrew Doherty
04/17/2023, 4:36 PMparameters.yml
file looks something like this:
namespace1:
raw_data:
datasource1
datasource2
namespace2:
raw_data:
datasource1
datasource3
I have configured a catalog with aligned naming that looks like the following:
namespace1.datasource1:
filepath: data/namespace1/source1.csv
type: pandas.CSVDataSet
namespace1.datasource2:
filepath: data/namespace1/source2.csv
type: pandas.CSVDataSet
namespace2.datasource1:
filepath: data/namespace2/source1.csv
type: pandas.CSVDataSet
namespace2.datasource3:
filepath: data/namespace2/source3.csv
type: pandas.CSVDataSet
I am have many more datasources than shown here which is where the challenge lies. I was wondering if I could create a node that would loop round all of the datasources and then dynamically save to the correct locations like:
nodes = [
node(
func=get_data, # this would loop through all the raw_data entries and download data into a list of df's
inputs="params:raw_data", # passing the list ["datasource1", "datasource2"]
outputs="params:raw_data" # passing the list ["datasource1", "datasource2"] as catalog entries
)
]
This would mean that the inputs and outputs would be dynamic based on the parameters.yml
and if any additional datasources are added/removed this would be reflected. This method does not work as the string params:raw_data" is passed rather than the parameters for the outputs.
Does anyone have a suggestion for how to make this dynamic and avoid creating a node per data source with hard coded inputs and outputs or modifying the structure of my parameters file?
Thanks againmarrrcin
04/18/2023, 7:15 AMbefore_node_run
. Example below (it’s not doing exactly this, it’s just a snippet I have that you should find useful)
class Injector:
def __init__(self):
self.params = None
@hook_impl
def after_context_created(
self,
context: KedroContext,
) -> None:
self.params = deepcopy(context.params)
@hook_impl
def before_node_run( # pylint: disable=too-many-arguments
self,
node: Node,
catalog: DataCatalog,
inputs: Dict[str, Any],
is_async: bool,
session_id: str,
) -> Optional[Dict[str, Any]]:
# at this point you can modify behaviour, injects some values etc.
if node.name == "train_model_node":
node.func = partial(node.func, injected=self.params)
Andrew Doherty
04/18/2023, 8:35 AMclass Injector:
def __init__(self):
self.params = None
@hook_impl
def after_context_created(
self,
context: KedroContext,
) -> None:
self.params = deepcopy(context.params)
@hook_impl
def before_node_run( # pylint: disable=too-many-arguments
self,
node: Node,
catalog: DataCatalog,
inputs: Dict[str, Any],
is_async: bool,
session_id: str,
) -> Optional[Dict[str, Any]]:
# at this point you can modify behaviour, injects some values etc.
if "node_name" in node.name:
namespace = node.name.split(".")[0]
datasets = self.params[namespace]["raw_data"]
output_datasets = [namespace + "." + data for data in datasets]
node._outputs = output_datasets
marrrcin
04/18/2023, 1:53 PMAndrew Doherty
04/18/2023, 1:54 PM