Hi everyone, I am currently using Kedro to analyze...
# questions
t
Hi everyone, I am currently using Kedro to analyze some data from experiments (experimental physics) and I managed to make a
CustomDataSet
for this purpose. The problem is that I want to make a
PartitionedDataSet
from it but I get complications. Here is the class I made:
Copy code
class LedExperiment(AbstractDataSet):
    def __init__(self, filepath: str):
        breakpoint()
        self.path = filepath
        self.files = glob.glob(os.path.join(filepath, "*"))
        self.files.sort()
        self.gate_voltage = self.get_gate_voltage(self.path)
        self.info_path, self.voltages_path, self.data_path = self.files

    @staticmethod
    def get_gate_voltage(path: str) -> float:
        """
        This is a function that is able to get the gate voltage from the folder name
        that is the root of the data
        :param path: path of the data, ussualy but not restricted to self.path
        :return: the voltage from the Dirac Point used as gate voltage
        """
        # note: sometimes there is more than one measurement for one voltage from the DP, it should
        # be always separed by an underscore "_".
        breakpoint()
        folder_name = os.path.split(path)[-1]
        gate_voltage = float(folder_name)
        return gate_voltage


    @staticmethod
    def get_info(path: str, gate_voltage: float) -> pd.DataFrame:
        """
        This method takes a path to the info file and returns a pandas
        datatrame of one row and the info in each column
        :param path: path to the info file of the experiment
        :param gate_voltage: this is the gate voltage with respect to the Dirac Point
        :return: a pandas dataframe with the parsed information
        """
        with open(path, "r") as f:
            r = f.read()

        r = r.split("\n")[1:-2]
        r = [i.split(",") for i in r]
        r = [item for sublist in r for item in sublist]
        r = [i.replace(" ", "") for i in r]
        r = {i.split("=")[0]: i.split("=")[1] for i in r}

        r["Vmin"] = float(r["Vmin"][:-1])
        r["Vmax"] = float(r["Vmax"][:-1])
        r["Vstep"] = float(r["Vstep"][:-1])
        r["Cycles"] = int(r["Cycles"])
        r["waitingtime"] = float(r["waitingtime"][:-1])
        r["timeatlight"] = float(r["timeatlight"][:-1])
        r["timeatdark"] = float(r["timeatdark"][:-1])
        r["wavelength"] = float(r["wavelength"][:-2])
        r["gate_voltage"] = gate_voltage
        info = pd.DataFrame(r, index=["value"])
        return info

    @staticmethod
    def get_led_voltage_list(voltage_list_path: str) -> pd.DataFrame:
        """
        This funtion takes the path to the file containing the list of the voltages to the led driver
        and returns a pandas dataframe containing all the voltages in the order they appear in the file
        which is the same order as they were used.
        :param voltage_list_path: path to the file containing the voltage list.
        :return: a pandas dataframe with all the information.
        """

        with open(voltage_list_path, "r") as f:
            r = f.read()
        r = r.split("\n")[:-1][::2]

        voltages = [float(i) for i in r]
        voltages = pd.DataFrame(voltages, columns=["LED driver voltages"])
        return voltages

    @staticmethod
    def get_data(data_path: str) -> pd.DataFrame:
        """
        This function reads the data from the experiment
        :param data_path: path to the file containing the time series data
        :return: a pandas dataframe with the time series data of the currents
        """
        return pd.read_csv(data_path, sep="\t")

    def _load(self) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
        """
        This function loads the data using the information provided in the init
        :return: A tuple with the information, LED voltages and data DataFrames in
        that order.
        """
        breakpoint()
        info = self.get_info(self.info_path, self.gate_voltage)
        led_voltages = self.get_led_voltage_list(self.voltages_path)
        data = self.get_data(self.data_path)

        return info, led_voltages, data

    def _save(self, data) -> None:
        # TODO: finish saving method
        pass

    def _describe(self) -> Dict[str, Any]:
        """
        Returns a dict that describes the attributes of the dataset.
        :return: Returns a dict that describes the attributes of the dataset.
        """
        return dict(
            information_path=self.info_path,
            voltages_path=self.voltages_path,
            data_path=self.data_path,
            gate_voltage=self.gate_voltage  # note that this is w respect to the DP
        )
The thing is that when I make a PartitionedDataSet from it the paths get all messed up, which is not ideal, it results in the class having errors. Can anyone help me with this? EDIT: I added 3 replies to the thread explaining further the issue
The thing is that this class uses the name of the containing folder as part of the information, say the folder containing the three experiment files is called "1.0", that is part of the information (it corresponds to an important voltage of the experiment)
But when I use the Partitioned Data Set the path gets all messed up
as a reference here is my catalog. The file that is not a PartitionedDataSet works as intended:
Copy code
prueba:
  type: responsivity.extras.datasets.led_experiment_dataset.LedExperiment
  filepath: data/01_raw/UB1C/385/-0.7


prueba_partitioned:
  type: "PartitionedDataSet"
  path: "data/01_raw/UB1C/385"
  dataset: "responsivity.extras.datasets.led_experiment_dataset.LedExperiment"
n
But when I use the Partitioned Data Set the path gets all messed up
How exactly is it messed up? What is the resolved path when you used PartitionedDataSet?
t
@Nok Lam Chan, thanks for your response and yes sorry I was missing that part
apparently when using the PartitionedDataSet is not taking all the folders as individual data objects. As an example: say I have the following folder structure:
Copy code
data/
├─ 01_raw/
   ├─ device1/
      ├─ experiment1/
      │  ├─ file1.txt
      │  ├─ file2.txt
      ├─ experiment2/
         ├─ file1.txt
         ├─ file2.txt
the normal dataset would accept an experiment folder
hence in my catalog, the file called
prueba
has a path to a folder that would correspond to an experiment
now when I use the partitioned data set I would like it to trat all the experiment folder of a device as individual experiments
hence I will give it the path to the device folder
As I pointed out before, the folder of the experiment contains important imfrmation in its name
in reality they are called 0.3, 0.1, etc
which represent voltages
when I use the individual data set it works as intended
but when I use the partitioned, instead of using the name of the folder to parse the information
it tries to use the name of one of the files
which results in an error since it is not posible to convert the name of an experiment file to a float
@Nok Lam Chan Idk if that makes things any clearer(?)
n
So with your example, it tries to resolve a path like
01_raw/device1/experiment1/file1.txt
instead of
01_raw/devixe1/experiment1
is that correct?
t
yes that is correct
n
I will try to reproduce this problem, but it will take some time to get back to you
t
no worries, if you want I can even send you some data
I think the challenge here is you want the folders only, but
PartitionedDataSet
is looking for files by default. If you look into the docs you can configure how `PartitionedDataSet`looking for files/directory. It uses the
find
function, which is similar to the UNIX command. Here I created an example that loop through the folder & subfolder. You should be able to apply similar logic to your CustomDataSet to parse the correct path information. The key here is in the
catalog.yml
, by using
withdirs=1
and
maxdepth=1
, it look for the subfolder but not the file within it. I hope this is useful.
Copy code
def _list_partitions(self) -> List[str]:
        return [
            path
            for path in self._filesystem.find(self._normalized_path, **self._load_args)
            if path.endswith(self._filename_suffix)
        ]
This is extracted from kedro source code, you can see that the
partition
is defined by the
find
command.
t
Hi, at least it now gives me a new error
Copy code
DataSetError: No partitions found in 'data/01_raw/UB1C'
I changed the catalog as you suggested:
Copy code
prueba_partitioned:
  type: "PartitionedDataSet"
  path: "data/01_raw/UB1C"
  dataset: "responsivity.extras.datasets.led_experiment_dataset.LedExperiment"
  load_args:
    with_dirs: 1
    maxdepth: 1
n
What’s the structure of your files? Maybe u need depth=2 base on the example u gave before.
Withdirs=True , it’s a typo before.
t
This is the structure and here is the catalog
Copy code
prueba_partitioned:
  type: "PartitionedDataSet"
  path: "data/01_raw/UB1C/405"
  dataset: "responsivity.extras.datasets.led_experiment_dataset.LedExperiment"
  load_args:
    with_dirs: True
    maxdepth: 1
I tried depth=2 and returned to the original problem
n
You can refer to my example and try to play around with the folder structure and config
t
At the end I solved the issue by making zip files with the files and extracting them on memory with a new custom dataset
but now when I get the partitioned dataset I get a dictionary with bound methods which load the datasets is there a way to automatically deal with the dictionaries or should I write the nodes thinking on the partitioned datasets?
maybe I should post that as another question
n
Do post it as a separate question since the thread is growing quite large.
What do you mean by automatically deal with dictionaries? What’s the output you are expecting
t
No worries @Nok Lam Chan I solved it, but as a comment, it would be great to load a partitioned data set as just the folder path, as I intended originally. Now I made this object is someone wants to use it as inspiration that unzips the files in memory to be able to, instead of one folder, have one zip file:
Copy code
class LedExperimentZip(AbstractDataSet):
    def __init__(self, filepath: str):
        self.path = filepath
        self.gate_voltage = float(os.path.split(filepath)[1][:-4])
        self.zip_file = None
        self.zip_files_dict = None
        self.info_str, self.voltages_str, self.data_str = None, None, None

    @staticmethod
    def get_info(r: str, gate_voltage: float) -> pd.DataFrame:
        """
        This method takes a path to the info file and returns a pandas
        datatrame of one row and the info in each column
        :param r: string containing the information of the file
        :param gate_voltage: this is the gate voltage with respect to the Dirac Point
        :return: a pandas dataframe with the parsed information
        """
        r = r.split("\r\n")[1:-2]
        r = [i.split(",") for i in r]
        r = [item for sublist in r for item in sublist]
        r = [i.replace(" ", "") for i in r]
        r = {i.split("=")[0]: i.split("=")[1] for i in r}

        r["Vmin"] = float(r["Vmin"][:-1])
        r["Vmax"] = float(r["Vmax"][:-1])
        r["Vstep"] = float(r["Vstep"][:-1])
        r["Cycles"] = int(r["Cycles"])
        r["waiting_time"] = float(r["waitingtime"][:-1])
        r["time_at_light"] = float(r["timeatlight"][:-1])
        r["time_at_dark"] = float(r["timeatdark"][:-1])
        r["wavelength"] = float(r["wavelength"][:-2])
        r["gate_voltage"] = gate_voltage
        info = pd.DataFrame(r, index=["value"])
        return info

    @staticmethod
    def get_data(data_string: str) -> pd.DataFrame:
        """
        This function reads the data from the experiment
        :param data_string: data as a string utf-8
        :return: a pandas dataframe with the time series data of the currents
        """
        return pd.read_csv(StringIO(data_string), sep="\t")

    @staticmethod
    def get_led_voltage_list(r: str) -> pd.DataFrame:
        """
        This funtion takes the path to the file containing the list of the voltages to the led driver
        and returns a pandas dataframe containing all the voltages in the order they appear in the file
        which is the same order as they were used.
        :param r: Voltage list information as string
        :return: a pandas dataframe with all the information.
        """
        r = r.split("\n")[:-1][::2]

        voltages = [float(i) for i in r]
        voltages = pd.DataFrame(voltages, columns=["LED driver voltages"])
        return voltages
    def _load(self) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
        """
        This function loads the data using the information provided in the init
        :return: A tuple with the information, LED voltages and data DataFrames in
        that order.
        """
        self.zip_file = ZipFile(self.path)
        self.zip_files_dict = {name: self.zip_file.read(name) for name in self.zip_file.namelist()}
        names = [name for name in self.zip_files_dict]
        names.sort()

        info_str, voltages_str, data_str = [str(self.zip_files_dict[name], 'utf-8') for name in names]
        info = self.get_info(info_str, self.gate_voltage)
        led_voltages = self.get_led_voltage_list(voltages_str)
        data = self.get_data(data_str)
        return info, led_voltages, data


    def _save(self, data) -> None:
        # TODO: finish saving method
        pass

    def _describe(self) -> Dict[str, Any]:
        """
        Returns a dict that describes the attributes of the dataset.
        :return: Returns a dict that describes the attributes of the dataset.
        """
        return dict(
            file_path=self.path,
            gate_voltage=self.gate_voltage  # note that this is w respect to the DP
        )
n
It’s great that you have found a solution. Did you want to load the entire folder as one object? This is not what
PartitionedDataSet
design for, the
bound
method you get is designed for lazy loading, they are important for memory hungry cases where you simply cannot load all the partitions into memory. You can still use PartitionedDataSet in that way, by simply doing a for loop, assuming it’s a simple
pandas.dataframe
in this case
Copy code
# This should be your node logic

data = []
for partitions in partitioned_datasets:
    data.append(partitions())
data = pd.concat(data)

# Now data is a large dataframe that contains all the partitions
t
Thanks, and sorry for being late for this 🙂 I ended up making a custom object that when called it unzips the files in memory (all the files are now only one file in a zip file)
👍🏼 1