Source code for mapie.multi_label_classification

from __future__ import annotations

import warnings
from typing import Iterable, Optional, Sequence, Tuple, Union, cast

import numpy as np
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.multioutput import MultiOutputClassifier
from sklearn.pipeline import Pipeline
from sklearn.utils import check_random_state
from sklearn.utils.validation import (_check_y, _num_samples, check_is_fitted,
                                      indexable)

from ._typing import ArrayLike, NDArray
from .utils import check_alpha, check_n_jobs, check_verbose


[docs]class MapieMultiLabelClassifier(BaseEstimator, ClassifierMixin): """ Prediction sets for multilabel-classification. This class implements two conformal prediction methods for estimating prediction sets for multilabel-classification. It guarantees (under the hypothesis of exchangeability) that a risk is at least 1 - alpha (alpha is a user-specified parameter). For now, we consider the recall as risk. Parameters ---------- estimator : Optional[ClassifierMixin] Any fitted multi-label classifier with scikit-learn API (i.e. with fit, predict, and predict_proba methods). If ``None``, estimator by default is a sklearn LogisticRegression instance. by default ``None`` n_jobs: Optional[int] Number of jobs for parallel processing using joblib via the "locky" backend. For this moment, parallel processing is disabled. If ``-1`` all CPUs are used. If ``1`` is given, no parallel computing code is used at all, which is useful for debugging. For n_jobs below ``-1``, ``(n_cpus + 1 + n_jobs)`` are used. "None" is a marker for `unset` that will be interpreted as ``n_jobs=1`` (sequential execution). By default ``None``. random_state: Optional[Union[int, RandomState]] Pseudo random number generator state used for random uniform sampling to evaluate quantiles and prediction sets in cumulated_score method. Pass an int for reproducible output across multiple function calls. By default ``1``. verbose : int, optional The verbosity level, used with joblib for parallel processing. For the moment, parallel processing is disabled. The frequency of the messages increases with the verbosity level. If it more than ``10``, all iterations are reported. Above ``50``, the output is sent to stdout. By default ``0``. Attributes ---------- valid_methods: List[str] List of all valid methods. Either CRC or RCPS valid_methods: List[Union[str, ``None``]] List of all valid bounds computation for RCPS only. single_estimator_ : sklearn.ClassifierMixin Estimator fitted on the whole training set. n_lambdas: int Number of thresholds on which we compute the risk. lambdas: NDArray Array with all the values of lambda. risks : ArrayLike of shape (n_samples_cal, n_lambdas) The risk for each observation for each threshold r_hat : ArrayLike of shape (n_lambdas) Average risk for each lambda r_hat_plus: ArrayLike of shape (n_lambdas) Upper confidence bound for each lambda, computed with different bounds (see predict). Only relevant when method="rcps". lambdas_star: ArrayLike of shape (n_lambdas) Optimal threshold for a given alpha. References ---------- [1] Lihua Lei Jitendra Malik Stephen Bates, Anastasios Angelopoulos and Michael I. Jordan. Distribution-free, risk-controlling prediction sets. CoRR, abs/2101.02703, 2021. URL https://arxiv.org/abs/2101.02703.39 [2] Angelopoulos, Anastasios N., Stephen, Bates, Adam, Fisch, Lihua, Lei, and Tal, Schuster. "Conformal Risk Control." (2022). Examples -------- >>> import numpy as np >>> from sklearn.multioutput import MultiOutputClassifier >>> from sklearn.linear_model import LogisticRegression >>> from mapie.multi_label_classification import MapieMultiLabelClassifier >>> X_toy = np.arange(4).reshape(-1, 1) >>> y_toy = np.stack([[1, 0, 1], [1, 0, 0], [0, 1, 1], [0, 1, 0]]) >>> clf = MultiOutputClassifier(LogisticRegression()).fit(X_toy, y_toy) >>> mapie = MapieMultiLabelClassifier(estimator=clf).fit(X_toy, y_toy) >>> _, y_pi_mapie = mapie.predict(X_toy, alpha=0.3) >>> print(y_pi_mapie[:, :, 0]) [[ True False True] [ True False True] [False True False] [False True False]] """ valid_methods_ = ["crc", "rcps"] valid_bounds_ = ["hoeffding", "bernstein", "wsr", None] lambdas = np.arange(0, 1, 0.01) n_lambdas = len(lambdas) fit_attributes = [ "single_estimator_", "risks" ] sigma_init = .25 # Value given in the paper. cal_size = .3
[docs] def __init__( self, estimator: Optional[ClassifierMixin] = None, n_jobs: Optional[int] = None, random_state: Optional[Union[int, np.random.RandomState]] = None, verbose: int = 0 ) -> None: self.estimator = estimator self.n_jobs = n_jobs self.random_state = random_state self.verbose = verbose
def _check_parameters(self) -> None: """ Check n_jobs, verbose and random_states. Raises ------ ValueError If parameters are not valid. """ check_n_jobs(self.n_jobs) check_verbose(self.verbose) check_random_state(self.random_state) def _check_method(self) -> None: """ Check that the specified method is valid Raises ------ ValueError Raise error if the name of the method is not in self.valid_methods_ """ if self.method not in self.valid_methods_: raise ValueError( "Invalid method. " "Allowed values are" + " or ".join(self.valid_methods_) ) def _check_all_labelled(self, y: NDArray) -> None: """ Check that all observations have at least one label Parameters ---------- y : NDArray of shape (n_samples, n_labels) Labels of the observations. Raises ------ ValueError Raise error if at least one observation has no label. """ if not (y.sum(axis=1) > 0).all(): raise ValueError( "Invalid y. " "All observations should contain at " "least one label." ) def _check_delta(self, delta: Optional[float]): """ Check that delta is not ``None`` when the method is RCPS and that it is between 0 and 1. Parameters ---------- delta : float Probability with which we control the risk. The higher the probability, the more conservative the algorithm will be. Raises ------ ValueError If delta is ``None`` and method is RCSP or if delta is not in [0, 1] and method is RCPS. Warning If delta is not ``None`` and method is CRC """ if (self.method == "rcps") and (delta is None): raise ValueError( "Invalid delta. " "delta cannot be ``None`` when using " "RCPS method." ) if (not isinstance(delta, float)) and (delta is not None): raise ValueError( "Invalid delta. " f"delta must be a float, not a {type(delta)}" ) if (delta is not None) and ( ((delta <= 0) or (delta >= 1)) and (self.method == "rcps") ): raise ValueError( "Invalid delta. " "delta must be in ]0, 1[" ) if (self.method == "crc") and (delta is not None): warnings.warn( "WARNING: you are using crc method, hence " + "even if the delta is not ``None``, it won't be" + "taken into account" ) def _check_estimator( self, X: ArrayLike, y: ArrayLike, estimator: Optional[ClassifierMixin] = None, _refit: Optional[bool] = False, ) -> ClassifierMixin: """ Check the estimator value. If it is ``None``, it returns a multi-output ``LogisticRegression`` instance if necessary. Parameters ---------- X : ArrayLike of shape (n_samples, n_features) Training data. y : ArrayLike of shape (n_samples,) Training labels. estimator : Optional[ClassifierMixin], optional Estimator to check, by default ``None`` _refit : Optional[bool] Whether or not the user is using fit (True) or partial_fit (False). By default False Returns ------- ClassifierMixin The estimator itself or a default multi-output ``LogisticRegression`` instance. Raises ------ ValueError If the estimator is not ``None`` and has no fit, predict, nor predict_proba methods. NotFittedError If the estimator is not fitted. Warning If estimator is then to warn about the split of the data between train and calibration """ if (estimator is None) and (not _refit): raise ValueError( "Invalid estimator with partial_fit. " "If the estimator is ``None`` you can not " "use partial_fit." ) if (estimator is None) and (_refit): estimator = MultiOutputClassifier( LogisticRegression(multi_class="multinomial") ) X_train, X_calib, y_train, y_calib = train_test_split( X, y, test_size=self.calib_size, random_state=self.random_state, ) estimator.fit(X_train, y_train) warnings.warn( "WARNING: To avoid overffiting, X has been splitted" + "into X_train and X_calib. The calibraiton will only" + "be done on X_calib" ) return estimator, X_calib, y_calib if isinstance(estimator, Pipeline): est = estimator[-1] else: est = estimator if ( not hasattr(est, "fit") or not hasattr(est, "predict") or not hasattr(est, "predict_proba") ): raise ValueError( "Invalid estimator. " "Please provide a classifier with fit," "predict, and predict_proba methods." ) check_is_fitted(est) return estimator, X, y def _check_partial_fit_first_call(self) -> bool: """ Check that this is the first time partial_fit or fit is called. Returns ------- bool True if it is the first time, else False. """ return not hasattr(self, "risks") def _check_bound(self, bound: Optional[str]): """ Check the value of the bound. Parameters ---------- bound : Optional[str] Bound defined in the predict. Raises ------ AttributeError If bound is not in ["hoeffding", "bernstein", "wsr", ``None``] Warning If bound is not ``None``and method is CRC """ if bound not in self.valid_bounds_: raise ValueError( "bound must be in ['hoeffding', 'bernstein', 'wsr', ``None``]" ) elif (bound is not None) and (self.method == "crc"): warnings.warn( "WARNING: you are using crc method, hence " + "even if the bound is not ``None``, it won't be" + "taken into account" ) def _transform_pred_proba( self, y_pred_proba: Union[Sequence[NDArray], NDArray] ) -> NDArray: """If the output of the predict_proba is a list of arrays (output of the ``predict_proba`` of ``MultiOutputClassifier``) we transform it into an array of shape (n_samples, n_classes, 1), otherwise, we add one dimension at the end. Parameters ---------- y_pred_proba : Union[List, NDArray] Output of the multi-label classifier. Returns ------- NDArray of shape (n_samples, n_classe, 1) Output of the model ready for risk computation. """ if isinstance(y_pred_proba, np.ndarray): y_pred_proba_array = y_pred_proba else: y_pred_proba_stacked = np.stack( y_pred_proba, # type: ignore axis=0 )[:, :, 1] y_pred_proba_array = np.moveaxis(y_pred_proba_stacked, 0, -1) return np.expand_dims(y_pred_proba_array, axis=2) def _compute_risks(self, y_pred_proba: NDArray, y: NDArray) -> NDArray: """ Compute the risks Parameters ---------- y_pred_proba : NDArray of shape (n_samples, n_labels) Predicted probabilities for each label and each observation y : NDArray of shape (n_samples, n_labels) True labels. Returns ------- NDArray of shape (n_samples, n_lambdas) Risks for each observation and each value of lambda. """ y_pred_proba_repeat = np.repeat( y_pred_proba, self.n_lambdas, axis=2 ) y_pred_th = (y_pred_proba_repeat > self.lambdas).astype(int) y_repeat = np.repeat(y[..., np.newaxis], self.n_lambdas, axis=2) risks = 1 - ( (y_pred_th * y_repeat).sum(axis=1) / y.sum(axis=1)[:, np.newaxis] ) return risks def _get_r_hat_plus( self, bound: Optional[str], delta: Optional[float], ) -> Tuple[NDArray, NDArray]: """Compute the upper bound of the loss for each lambda. Parameters ---------- bound : str Bounds to compute. Either hoeffding, bernstein or wsr. delta : float Level of confidence. sigma_init : float, optional First variance in the sigma_hat array. The default value is the same as in the paper implementation. By default .25 Returns ------- Tuple[NDArray, NDArray] of shape (n_lambdas, ) and (n_lambdas) Average risk over all the obervations and upper bound of the risk. """ r_hat = self.risks.mean(axis=0) n_obs = len(self.risks) if (self.method == "rcps") and (delta is not None): if bound == "hoeffding": r_hat_plus = ( r_hat + np.sqrt((1 / (2 * n_obs)) * np.log(1 / delta)) ) elif bound == "bernstein": sigma_hat_bern = np.var(r_hat, axis=0, ddof=1) r_hat_plus = ( r_hat + np.sqrt((sigma_hat_bern * 2 * np.log(2 / delta)) / n_obs) + (7 * np.log(2 / delta)) / (3 * (n_obs - 1)) ) else: mu_hat = ( (.5 + np.cumsum(self.risks, axis=0)) / (np.repeat( [range(1, n_obs + 1)], self.n_lambdas, axis=0 ).T + 1) ) sigma_hat = ( (.25 + np.cumsum((self.risks - mu_hat)**2, axis=0)) / (np.repeat( [range(1, n_obs + 1)], self.n_lambdas, axis=0 ).T + 1) ) sigma_hat = np.concatenate( [ np.full( (1, self.n_lambdas), fill_value=self.sigma_init ), sigma_hat[:-1] ] ) nu = np.minimum( 1, np.sqrt((2 * np.log(1 / delta)) / (n_obs * sigma_hat)) ) # Split the calculation in two to prevent memory issues batches = [ range(int(n_obs / 2)), range(n_obs - int(n_obs / 2), n_obs) ] K_R_max = np.zeros((self.n_lambdas, self.n_lambdas)) for batch in batches: nu_batch = nu[batch] losses_batch = self.risks[batch] nu_batch = np.repeat( np.expand_dims(nu_batch, axis=2), self.n_lambdas, axis=2 ) losses_batch = np.repeat( np.expand_dims(losses_batch, axis=2), self.n_lambdas, axis=2 ) R = self.lambdas K_R = np.cumsum( np.log( ( 1 - nu_batch * (losses_batch - R) ) + np.finfo(np.float64).eps ), axis=0 ) K_R = np.max(K_R, axis=0) K_R_max += K_R r_hat_plus_tronc = self.lambdas[np.argwhere( np.cumsum(K_R_max > -np.log(delta), axis=1) == 1 )[:, 1]] r_hat_plus = np.ones(self.n_lambdas) r_hat_plus[:len(r_hat_plus_tronc)] = r_hat_plus_tronc else: r_hat_plus = (n_obs / (n_obs + 1)) * r_hat + (1 / (n_obs + 1)) return r_hat, r_hat_plus def _find_lambda_star( self, r_hat_plus: NDArray, alpha_np: NDArray ) -> NDArray: """Find the higher value of lambda such that for all smaller lambda, the risk is smaller, for each value of alpha. Parameters ---------- r_hat_plus : NDArray of shape (n_lambdas, ) Upper bounds computed in the `get_r_hat_plus` method. alphas : NDArray of shape (n_alphas, ) Risk levels. Returns ------- NDArray of shape (n_alphas, ) Optimal lambdas which control the risks for each value of alpha. """ if len(alpha_np) > 1: alphas_np = alpha_np[:, np.newaxis] else: alphas_np = alpha_np bound_rep = np.repeat( np.expand_dims(r_hat_plus, axis=0), len(alphas_np), axis=0 ) bound_rep[:, np.argmax(bound_rep, axis=1)] = np.maximum( alphas_np, bound_rep[:, np.argmax(bound_rep, axis=1)] ) # to avoid an error if the risk is always higher than alpha lambdas_star = self.lambdas[np.argmin( - np.greater_equal( bound_rep, alphas_np ).astype(int), axis=1 )] return lambdas_star
[docs] def partial_fit( self, X: ArrayLike, y: ArrayLike, _refit: Optional[bool] = False, ) -> MapieMultiLabelClassifier: """ Fit the base estimator or use the fitted base estimator on batch data. All the computed risks will be concatenated each time the partial_fit method is called. Parameters ---------- X : ArrayLike of shape (n_samples, n_features) Training data. y : NDArray of shape (n_samples, n_classes) Training labels. _refit: bool Whether or not refit from scratch. By default False Returns ------- MapieMultiLabelClassifier The model itself. """ # Checks first_call = self._check_partial_fit_first_call() self._check_parameters() X, y = indexable(X, y) _check_y(y, multi_output=True) estimator, X, y = self._check_estimator( X, y, self.estimator, _refit ) y = cast(NDArray, y) X = cast(NDArray, X) self._check_all_labelled(y) self.n_samples_ = _num_samples(X) # Work if first_call or _refit: self.single_estimator_ = estimator y_pred_proba = self.single_estimator_.predict_proba(X) y_pred_proba_array = self._transform_pred_proba(y_pred_proba) self.theta_ = X.shape[1] self.risks = self._compute_risks(y_pred_proba_array, y) else: if X.shape[1] != self.theta_: msg = "Number of features %d does not match previous data %d." raise ValueError(msg % (X.shape[1], self.theta_)) self.single_estimator_ = estimator y_pred_proba = self.single_estimator_.predict_proba(X) y_pred_proba_array = self._transform_pred_proba(y_pred_proba) partial_risk = self._compute_risks(y_pred_proba_array, y) self.risks = np.concatenate([self.risks, partial_risk], axis=0) return self
[docs] def fit( self, X: ArrayLike, y: ArrayLike, calib_size: Optional[float] = .3 ) -> MapieMultiLabelClassifier: """ Fit the base estimator or use the fitted base estimator. Parameters ---------- X : ArrayLike of shape (n_samples, n_features) Training data. y : NDArray of shape (n_samples, n_classes) Training labels. calib_size: Optional[float] Size of the calibration dataset with respect to X if the given model is ``None`` need to fit a LogisticRegression. By default .3 Returns ------- MapieMultiLabelClassifier The model itself. """ self.calib_size = calib_size return self.partial_fit(X, y, _refit=True)
[docs] def predict( self, X: ArrayLike, method: Optional[str] = "crc", alpha: Optional[Union[float, Iterable[float]]] = None, delta: Optional[float] = None, bound: Optional[Union[str, None]] = "wsr" ) -> Union[NDArray, Tuple[NDArray, NDArray]]: """ Prediction sets on new samples based on target confidence interval. Prediction sets for a given ``alpha`` are deduced from the computed risks. Parameters ---------- X : ArrayLike of shape (n_samples, n_features) Test data. method : Optional[str] Method to choose for prediction interval estimates. Choose among: - "crc", for Conformal Risk Control. See [1] for more details. - "rcps", based on computation of the upper bound of the risk. See [2] for more details. By default "crc". alpha : Optional[Union[float, Iterable[float]]] Can be a float, a list of floats, or a ``ArrayLike`` of floats. Between 0 and 1, represent the uncertainty of the confidence interval. Lower ``alpha`` produce larger (more conservative) prediction sets. ``alpha`` is the complement of the target coverage level. By default ``None``. delta : Optional[float] Can be a float, or ``None``. If using method="rcps", then it can not be set to ``None``. Between 0 and 1, the level of certainty at which we compute the Upper Confidence Bound of the average risk. Lower ``delta`` produce larger (more conservative) prediction sets. By default ``None``. bound : Optional[Union[str, ``None``]] Method used to compute the Upper Confidence Bound of the average risk. Only necessary with the RCPS method. By default "wsr" Returns ------- Union[NDArray, Tuple[NDArray, NDArray]] - NDArray of shape (n_samples,) if alpha is ``None``. - Tuple[NDArray, NDArray] of shapes (n_samples, n_classes) and (n_samples, n_classes, n_alpha) if alpha is not ``None``. """ self.method = method self._check_method() self._check_delta(delta) self._check_bound(bound) alpha = cast(Optional[NDArray], check_alpha(alpha)) check_is_fitted(self, self.fit_attributes) # Estimate prediction sets y_pred = self.single_estimator_.predict(X) if alpha is None: return np.array(y_pred) alpha_np = cast(NDArray, alpha) y_pred_proba = self.single_estimator_.predict_proba(X) y_pred_proba_array = self._transform_pred_proba(y_pred_proba) y_pred_proba_array = np.repeat( y_pred_proba_array, len(alpha_np), axis=2 ) self.r_hat, self.r_hat_plus = self._get_r_hat_plus( bound, delta, ) self.lambdas_star = self._find_lambda_star(self.r_hat_plus, alpha_np) y_pred_proba_array = ( y_pred_proba_array > self.lambdas_star[np.newaxis, np.newaxis, :] ) return y_pred, y_pred_proba_array