Sebastian Cardona Lozano
06/10/2023, 12:31 AMimport fsspec
from pathlib import PurePosixPath
from typing import Any, Dict
from annoy import AnnoyIndex
from <http://kedro.io|kedro.io> import AbstractDataSet
from kedro.io.core import get_filepath_str, get_protocol_and_path
class AnnoyIndexDataSet(AbstractDataSet[AnnoyIndex, AnnoyIndex]):
"""``AnnoyIndexDataSet`` loads / save Annoy index from a given filepath.
"""
def __init__(self, filepath: str, dimension:int, metric:str):
"""Creates a new instance of AnnoyIndexDataSet to load / save an Annoy
Index at the given filepath.
Args:
filepath (str): The path to the file where the index will be saved
or loaded from.
dimension (int): The length of the vectors that will be indexed.
metric (str): The distance metric to use. One of "angular",
"euclidean", "manhattan", "hamming", or "dot".
"""
# parse the path and protocol (e.g. file, http, s3, etc.)
protocol, path = get_protocol_and_path(filepath)
self._protocol = protocol
self._filepath = PurePosixPath(path)
self._fs = fsspec.filesystem(self._protocol)
self.dimension = dimension
self.metric = metric
def _load(self) -> AnnoyIndex:
"""Load the index from the file.
Returns:
An instance of AnnoyIndex.
"""
# using get_filepath_str ensures that the protocol and path are appended correctly for different filesystems
load_path = get_filepath_str(self._filepath, self._protocol)
annoy_index = AnnoyIndex(self.dimension, self.metric)
annoy_index.load(load_path)
return annoy_index
def _save(self, annoy_index: AnnoyIndex) -> None:
"""Save the index to the file.
Args:
data: An instance of AnnoyIndex.
"""
save_path = get_filepath_str(self._filepath, self._protocol)
annoy_index.save(save_path)
def _describe(self) -> Dict[str, Any]:
"""Return a dict describing the dataset.
Returns:
A dict with keys "filepath", "dimension", and "metric".
"""
return {
"filepath": self._filepath,
"dimension": self.dimension,
"metric": self.metric,
}
And in the data catalog I have this:
annoy_index:
type: pricing.extras.datasets.annoy_dataset.AnnoyIndexDataSet
dimension: 1026
metric: angular
filepath: /data/06_models/products_index.ann
layer: model_input
My goal is to save the .ann file in Google Cloud Storage or in a local folder, but I got the next error when running the node that saves the file:
DataSetError: Failed while saving data to data set AnnoyIndexDataSet(dimension=1026,
filepath=/data/06_models/products_index.ann, metric=angular).
Unable to open: No such file or directory (2)
Please your help. Thanks!!marrrcin
06/12/2023, 7:05 AM/data/06_models
directory exist? Does this happen for all file paths or only for local ones?
I’m not sure about the internals of the Annoy library, but if you pass the file path as str
to the library, it will most likely not work with GCS/S3/other remote paths. You’ll most likely need to implement a wrapper with temporary directory around it.Nok Lam Chan
06/12/2023, 10:49 AMSebastian Cardona Lozano
06/12/2023, 2:09 PMwith self._fs.open(load_path, mode="r") as f:
But It is still not working with the local directory or a GCS bucket 😞 .
So, I gave up and made it simpler without fsspec
and versioning and it's working (only locally):
from pathlib import Path, PurePosixPath
from typing import Any, Dict
import fsspec
from annoy import AnnoyIndex
from <http://kedro.io|kedro.io> import AbstractDataSet
from kedro.io.core import get_filepath_str, get_protocol_and_path
class AnnoyIndexDataSet(AbstractDataSet[AnnoyIndex, AnnoyIndex]):
"""``AnnoyIndexDataSet`` loads / save Annoy index from a given filepath."""
def __init__(self, filepath: str, dimension: int, metric: str):
"""Creates a new instance of AnnoyIndexDataSet to load / save an Annoy
Index at the given filepath.
Args:
filepath (str): The path to the file where the index will be saved
or loaded from.
dimension (int): The length of the vectors that will be indexed.
metric (str): The distance metric to use. One of "angular",
"euclidean", "manhattan", "hamming", or "dot".
"""
self._filepath = Path(filepath).as_posix()
self.dimension = dimension
self.metric = metric
def _load(self) -> AnnoyIndex:
"""Load the index from the file.
Returns:
An instance of AnnoyIndex.
"""
annoy_index = AnnoyIndex(self.dimension, self.metric)
annoy_index.load(self._filepath)
return annoy_index
def _save(self, annoy_index: AnnoyIndex) -> None:
"""Save the index to the file.
Args:
data: An instance of AnnoyIndex.
"""
annoy_index.save(self._filepath)
def _describe(self) -> Dict[str, Any]:
"""Return a dict describing the dataset.
Returns:
A dict with keys "filepath", "dimension", and "metric".
"""
return {
"filepath": self._filepath,
"dimension": self.dimension,
"metric": self.metric,
}
I've seen that maybe the problem was with PurePosixPath
and fsspec
, but I'm not sure. But, I'd like to be able to load and save from GCS and version this data set as the other data sets built by Kedro.
Thanks for your help! 🙂marrrcin
06/12/2023, 2:12 PMfile-like object
, it will simplify the integration with fsspec a lotSebastian Cardona Lozano
06/12/2023, 2:21 PM