https://kedro.org/ logo
#questions
Title
# questions
a

Abhishek Bhatia

02/18/2024, 6:15 PM
Hey Team! K Is there a way to use
OmegaConfigLoader
custom_resolver with dataset factories? In the
settings.py
I define the following:
Copy code
from kedro.config import OmegaConfigLoader

CONFIG_LOADER_CLASS = OmegaConfigLoader

def split_dot_into_path(dot_str: str)-> str:
    return dot_str.replace(".", "/")

# Keyword arguments to pass to the `CONFIG_LOADER_CLASS` constructor.
CONFIG_LOADER_ARGS = {
    "base_env": "base",
    "default_run_env": "local",
    # "config_patterns": {
    #     "spark" : ["spark*/"],
    #     "parameters": ["parameters*", "parameters*/**", "**/parameters*"],
    # }
    "custom_resolvers": {
        "split_dot_into_path": split_dot_into_path        
    }
}
My pipeline looks like this:
Copy code
from kedro.pipeline import Pipeline, node, pipeline

def create_pipeline(**kwargs)->Pipeline:

    nodes = [
        node(
            lambda x: x,
            inputs="raw_dataset",
            outputs="processed_dataset"
        )
    ]

    return pipeline(nodes, inputs="raw_dataset", namespace="level1.level2")
And then the catalog entry looks like this:
Copy code
raw_dataset:
  type: pandas.CSVDataset
  filepath: "data/01_raw/data.csv"


"{prefix}.processed_dataset":
  type: pandas.CSVDataset
  filepath: "data/03_processed/${split_dot_into_path:{prefix}}/data.csv"
So, basically, given any namespace, which is separated by dot
.
, I want to set the nested structure of folder by converting the dot delimited namespace by forward slash delimited path at runtime. Thanks! 🙂
d

Dmitry Sorokin

02/19/2024, 12:05 PM
Hi Abhishek, thank you for your question. I will look into it
a

Ankita Katiyar

02/19/2024, 12:54 PM
Hey Abhishek, This is not possible currently, the resolution of
omegaconf
config, including custom resolvers happens before the dataset factories are evaluated in the process. We’ve had similar questions in the past and we have an open issue for collecting use cases if you’d want to add yours to it - https://github.com/kedro-org/kedro/issues/3086 (it’s a bit of a catch-all issue for now but we’ll groom it soon)
thankyou 1
As a bit of a workaround, does
namespace = "level1/level2"
without using the custom resolver work?
n

Nok Lam Chan

02/19/2024, 1:56 PM
If I understand correctly, you want to have the data output structure similarly to namespace. So when I have a node,
node(xxx, ... outputs="data", namespace=level1.level2)
, it get save to
level1/level2/data.csv
? I agree with @Ankita Katiyar this is not possible currently because it requires resolving config much later than the current implementation. Alternatively, I think this can be simplified by having explicit nested namespace.
Copy code
"{level1}.{level2}.processed_dataset":
  type: pandas.CSVDataset
  filepath: "data/03_processed/{level1}}/{level2}/data.csv"
👍 1
a

Abhishek Bhatia

02/20/2024, 7:02 AM
Thanks @Ankita Katiyar and @Nok Lam Chan for helping out! 🙂 We are following the above approach only where we explicitly specify the nested levels, however we would like to have a generic factory pattern to catch an arbitrary level of nesting i.e. level1/level2..../leveln The slash method would probably work however let me see how it shows up in kedro viz
@Ankita Katiyar / @Nok Lam Chan I think I solved it (partially) 😄 I defined a custom dataset
NamespacedPandasCSVDataset
to just alter the filepath from being dot delimited
.
to path-like catalog entry:
Copy code
"{prefix}.namespaced_dataset":
  type: demo.namespaced_dataset.NamespacedPandasCSVDataset
  base_path: "data/03_processed"
  namespace: "{prefix}"
  fname: "data.csv"
And my custom dataset looks like this:
Copy code
class NamespacedPandasCSVDataset(pandas.CSVDataset):

    def __init__(
        self,
        *,
        base_path: str,
        namespace: str,
        fname: str,
        load_args: dict[str, Any] = None,
        save_args: dict[str, Any] = None,
        version: Version = None,
        credentials: dict[str, Any] = None,
        fs_args: dict[str, Any] = None,
        metadata: dict[str, Any] = None,
    )-> None:

        filepath = self._get_full_filepath(base_path, namespace, fname)
        super().__init__(
            filepath=filepath, 
            load_args=load_args,
            save_args=save_args,
            version=version,
            credentials=credentials,
            fs_args=fs_args,
            metadata=metadata
        )

        self.base_path = base_path
        self.namespace = namespace
        self.fname = fname


    def _get_full_filepath(self, base_path, namespace, fname):
        return os.path.join(
            base_path, 
            self.split_dot_into_path(namespace),
            fname
        )

    @staticmethod
    def split_dot_into_path(dot_str: str)-> str:
        return dot_str.replace(".", "/")
I feel it could be much more generic than this to encompass any base dataset type as opposed to created a custom namespaced dataset for every dataset type. Thoughts? @Ankita Katiyar / @Nok Lam Chan 🙂
Further is there any way to directly access the dataset name as an attribute?
.name
doesn't seem to be present.