Hi all, another question :slightly_smiling_face: ....
# questions
a
Hi all, another question 🙂 . I am creating a modular namespace pipeline for ingesting multiple files. The
parameters.yml
file looks something like this:
Copy code
namespace1:
    raw_data:
        datasource1
        datasource2

namespace2:
    raw_data:
        datasource1
        datasource3
I have configured a catalog with aligned naming that looks like the following:
Copy code
namespace1.datasource1:
    filepath: data/namespace1/source1.csv
    type: pandas.CSVDataSet

namespace1.datasource2:
    filepath: data/namespace1/source2.csv
    type: pandas.CSVDataSet

namespace2.datasource1:
    filepath: data/namespace2/source1.csv
    type: pandas.CSVDataSet
namespace2.datasource3:
    filepath: data/namespace2/source3.csv
    type: pandas.CSVDataSet
I am have many more datasources than shown here which is where the challenge lies. I was wondering if I could create a node that would loop round all of the datasources and then dynamically save to the correct locations like:
Copy code
nodes = [
    node(
        func=get_data, # this would loop through all the raw_data entries and download data into a list of df's
        inputs="params:raw_data", # passing the list ["datasource1", "datasource2"]
        outputs="params:raw_data" # passing the list ["datasource1", "datasource2"] as catalog entries
    )
]
This would mean that the inputs and outputs would be dynamic based on the
parameters.yml
and if any additional datasources are added/removed this would be reflected. This method does not work as the string params:raw_data" is passed rather than the parameters for the outputs. Does anyone have a suggestion for how to make this dynamic and avoid creating a node per data source with hard coded inputs and outputs or modifying the structure of my parameters file? Thanks again
m
I think you can use hooks for that
First, capture context, to be able to read parameters, then do some conditional/dynamic logic in the
before_node_run
. Example below (it’s not doing exactly this, it’s just a snippet I have that you should find useful)
Copy code
class Injector:
    def __init__(self):
        self.params = None

    @hook_impl
    def after_context_created(
            self,
            context: KedroContext,
    ) -> None:
        self.params = deepcopy(context.params)

    @hook_impl
    def before_node_run(  # pylint: disable=too-many-arguments
            self,
            node: Node,
            catalog: DataCatalog,
            inputs: Dict[str, Any],
            is_async: bool,
            session_id: str,
    ) -> Optional[Dict[str, Any]]:
        # at this point you can modify behaviour, injects some values etc.
        if node.name == "train_model_node":
            node.func = partial(node.func, injected=self.params)
a
Nice one @marrrcin, thanks a lot for your response. I'll have a go with using hooks.
@marrrcin, thanks a lot your code helped me get this working.
Copy code
class Injector:

    def __init__(self):
        self.params = None

    @hook_impl
    def after_context_created(
            self,
            context: KedroContext,
    ) -> None:
        self.params = deepcopy(context.params)

    @hook_impl
    def before_node_run(  # pylint: disable=too-many-arguments
            self,
            node: Node,
            catalog: DataCatalog,
            inputs: Dict[str, Any],
            is_async: bool,
            session_id: str,
    ) -> Optional[Dict[str, Any]]:
        # at this point you can modify behaviour, injects some values etc.
        if "node_name" in node.name:
            namespace = node.name.split(".")[0]
            datasets = self.params[namespace]["raw_data"]
            output_datasets = [namespace + "." + data  for data in datasets]
            node._outputs = output_datasets
🥳 1
🔥 1
m
Great, thanks for the feedback!
a
A little hacky due to namespace but it is working at least. Thanks again