https://kedro.org/ logo
#questions
Title
# questions
m

Matheus Pinto

03/01/2023, 12:23 PM
Hi Team, we have custom class that creates a sklearn compatible pipeline and perform fit, predict methods, the thing is in this class the last estimator is a tensorflow model (with also custom objects, such a loss functions) and due to that, the object is not serializable. Tensoflow object and pickle kedro datasets don’t work. Do you have an advice on how to save this object?. We where thinking in a custom dataset, saving everything that is not tf model as pickle and then just saving the model as tf object dataset. is this the most elegant solution?, do you have any ideas on how to solve this ? Class:
Copy code
class ModelPipeline(BaseEstimator):
    """A pipeline to train and make predictions using a machine learning model.

    Args:
        params (dict): A dictionary containing the parameters required to build the pipeline.

    Attributes:
        pipeline (list): A list of transformers and an estimator built using the given parameters.
        data_prepocessing_pipe (list): A list of transformers in the pipeline
            used for data pre-processing.
        estimator: The estimator in the pipeline used for making predictions.
        target_names (list): A list of column names in the target variable.
        is_fitted (bool): A flag indicating if the estimator is fitted or not.

    """

    def __init__(self, params: dict):
        """Initializes the ModelPipeline object.

        The object is initialized with the given parameters to create a pipeline.

        Args:
            params (dict): A dictionary containing the parameters required to build the pipeline.
        """
        self.params = params
        self.target_params = list(
            params["model"]["data_preparation"]["target_builder"]["kwargs"].values()
        )[0]
        func_path = self.params["builder_function"]
        module_name, func_name = func_path.rsplit(".", 1)
        module = importlib.import_module(module_name)
        func = getattr(module, func_name)
        self.pipeline = func(self.params)

    def fit(self, X, y):
        """Fits the pipeline to the given data.

        Fits each transformer in the pre-processing pipeline
        to the data and then fits the estimator to the transformed data.

        Args:
            X (array-like or sparse matrix): Input data of shape (n_samples, n_features)
            y (array-like or sparse matrix): Target values of shape (n_samples,) or
            (n_samples, n_targets)

        Returns:
            self: Returns an instance of self.
        """
        X_ = self.pipeline[:-1].fit_transform(X, y)
        self.pipeline[-1:].fit(X_, y)
        self.is_fitted = True
        return self

    @check_is_fitted
    @ensure_data_quality
    def predict(self, X: tp.Union[Matrix, Vector]) -> tp.Union[Matrix, Vector]:
        """Predicts the target variable using the fitted pipeline.

        Transforms the input data using the pre-processing pipeline and then makes predictions
        using the fitted estimator.

        Args:
            X (array-like or sparse matrix): Input data of shape (n_samples, n_features)

        Returns:
            pandas.DataFrame: A dataframe containing the predicted values of
                shape (n_samples, n_targets).
        """
        X_ = self.pipeline[:-1].transform(X)
        y_pred = self.pipeline[-1].predict(X_)
        return y_pred

    @check_is_fitted
    @ensure_data_quality
    def inference(
        self, X: tp.Union[Matrix, Vector], y: tp.Union[Matrix, Vector] = None
    ) -> pd.DataFrame:
        """Custom inference using the fitted pipeline.

        Transforms the input data using the pre-processing pipeline and then makes inference
        using the fitted estimator and for the specific problem., classification and regression

        Args:
            X (array-like or sparse matrix): Input data of shape (n_samples, n_features)

        Returns:
            pandas.DataFrame: A dataframe containing the predicted values of
                shape (n_samples, n_targets).
        """
        X_ = self.pipeline[:-1].transform(X)
        inference = self.pipeline[-1].inference(X_, y)
        return inference
w

William Caicedo

03/01/2023, 6:10 PM
I think a custom dataset would do it. If you don’t want to write one, a hook could be another option (i.e. load all the objects and pass them to a new instance of your pipeline class, and then inject it into the node’s inputs)
👍 1
d

datajoely

03/02/2023, 11:16 AM
Hi @Matheus Pinto there are two things here: I’ve seen a more native Kedro approach I’ve seen where you use the Sklearn style pipelines where you do use the trick that classes can be called like functions etc:
Copy code
def create_pipeline(
        model_type: Type = LinearRegression,
        model_args: List[str] = None
) -> Pipeline:
    model_args = model_args or ["params:fit_intercept"]

    return Pipeline([
        node(
            func=model_type,
            inputs=model_args,
            outputs="model",
            name="init_model"
        ),
        node(
            func=LinearRegression.fit,
            inputs=["model", "data"],
            outputs="fit_model",
            name="training"
        )
    ])
Secondly you should be able to pickle TF objects with joblib
Copy code
final_model: # example with load and save args
  type: pickle.PickleDataSet
  filepath: <s3://your_bucket/final_model.pkl.lz4>
  backend: joblib
  credentials: s3_credentials
  save_args:
    compress: lz4
m

Matheus Pinto

03/02/2023, 1:23 PM
Thanks guys, custom dataset worked, but we still think it is not the cleanest solution Joel, is there any way to tell pickle dataset not to compile the tf graph ?
Copy code
DataSetError: Failed while loading data from data set PickleDataSet(backend=compress_pickle, 
filepath=test.pickle.lz4, load_args={'compression': lz4}, protocol=file, save_args={'compression': lz4}).
__init__() got an unexpected keyword argument 'reduction'
This is because pickle wants to initialize the model from scratch natively and we modify the loss function for training purposes, so the way keras proposes to load the model is with parameter compile=False, but pickle intialize it as it was saved 😞
m

Matheus Pinto

03/02/2023, 1:28 PM
noup, because the tf object is the estimator step in the sklearn pipeline of the ModelPipeline class defined 👆
what worked was continue the idea on this issue https://github.com/kedro-org/kedro/issues/982 but we don’t know if it is the best solution
This PR, was never accepted to kedro, but the idea somehow solved our problem https://github.com/kedro-org/kedro/pull/983/files
5 Views