Source code for mapie.conformity_scores.sets.aps

from typing import Optional, Tuple, Union, cast

import numpy as np
from sklearn.dummy import check_random_state
from sklearn.calibration import label_binarize

from mapie.conformity_scores.sets.naive import NaiveConformityScore
from mapie.conformity_scores.sets.utils import (
    check_include_last_label, check_proba_normalized
)
from mapie.estimator.classifier import EnsembleClassifier

from mapie._machine_precision import EPSILON
from mapie._typing import ArrayLike, NDArray
from mapie.utils import compute_quantiles


[docs]class APSConformityScore(NaiveConformityScore): """ Adaptive Prediction Sets (APS) method-based non-conformity score. It is based on the sum of the softmax outputs of the labels until the true label is reached, on the calibration set. See [1] for more details. References ---------- [1] Yaniv Romano, Matteo Sesia and Emmanuel J. Candès. "Classification with Valid and Adaptive Coverage." NeurIPS 202 (spotlight) 2020. Attributes ---------- classes: Optional[ArrayLike] Names of the classes. random_state: Optional[Union[int, RandomState]] Pseudo random number generator state. quantiles_: ArrayLike of shape (n_alpha) The quantiles estimated from ``get_sets`` method. """
[docs] def __init__(self) -> None: super().__init__()
[docs] def get_predictions( self, X: NDArray, alpha_np: NDArray, estimator: EnsembleClassifier, agg_scores: Optional[str] = "mean", **kwargs ) -> NDArray: """ Get predictions from an EnsembleClassifier. Parameters ----------- X: NDArray of shape (n_samples, n_features) Observed feature values. alpha_np: NDArray of shape (n_alpha,) NDArray of floats between ``0`` and ``1``, represents the uncertainty of the confidence interval. estimator: EnsembleClassifier Estimator that is fitted to predict y from X. agg_scores: Optional[str] Method to aggregate the scores from the base estimators. If "mean", the scores are averaged. If "crossval", the scores are obtained from cross-validation. By default ``"mean"``. Returns -------- NDArray Array of predictions. """ y_pred_proba = estimator.predict(X, agg_scores) y_pred_proba = check_proba_normalized(y_pred_proba, axis=1) if agg_scores != "crossval": y_pred_proba = np.repeat( y_pred_proba[:, :, np.newaxis], len(alpha_np), axis=2 ) return y_pred_proba
[docs] @staticmethod def get_true_label_cumsum_proba( y: ArrayLike, y_pred_proba: NDArray, classes: ArrayLike ) -> Tuple[NDArray, NDArray]: """ Compute the cumsumed probability of the true label. Parameters ---------- y: NDArray of shape (n_samples, ) Array with the labels. y_pred_proba: NDArray of shape (n_samples, n_classes) Predictions of the model. classes: NDArray of shape (n_classes, ) Array with the classes. Returns ------- Tuple[NDArray, NDArray] of shapes (n_samples, 1) and (n_samples, ). The first element is the cumsum probability of the true label. The second is the sorted position of the true label. """ y_true = label_binarize(y=y, classes=classes) index_sorted = np.fliplr(np.argsort(y_pred_proba, axis=1)) y_pred_sorted = np.take_along_axis(y_pred_proba, index_sorted, axis=1) y_true_sorted = np.take_along_axis(y_true, index_sorted, axis=1) y_pred_sorted_cumsum = np.cumsum(y_pred_sorted, axis=1) cutoff = np.argmax(y_true_sorted, axis=1) true_label_cumsum_proba = np.take_along_axis( y_pred_sorted_cumsum, cutoff.reshape(-1, 1), axis=1 ) cutoff += 1 return true_label_cumsum_proba, cutoff
[docs] def get_conformity_scores( self, y: NDArray, y_pred: NDArray, y_enc: Optional[NDArray] = None, **kwargs ) -> NDArray: """ Get the conformity score. Parameters ---------- y: NDArray of shape (n_samples,) Observed target values. y_pred: NDArray of shape (n_samples,) Predicted target values. y_enc: NDArray of shape (n_samples,) Target values as normalized encodings. Returns ------- NDArray of shape (n_samples,) Conformity scores. """ # Casting y_enc = cast(NDArray, y_enc) classes = cast(NDArray, self.classes) # Conformity scores conformity_scores, self.cutoff = ( self.get_true_label_cumsum_proba(y, y_pred, classes) ) y_proba_true = np.take_along_axis( y_pred, y_enc.reshape(-1, 1), axis=1 ) random_state = check_random_state(self.random_state) u = random_state.uniform(size=len(y_pred)).reshape(-1, 1) conformity_scores -= u * y_proba_true return conformity_scores
[docs] def get_conformity_score_quantiles( self, conformity_scores: NDArray, alpha_np: NDArray, estimator: EnsembleClassifier, agg_scores: Optional[str] = "mean", **kwargs ) -> NDArray: """ Get the quantiles of the conformity scores for each uncertainty level. Parameters ----------- conformity_scores: NDArray of shape (n_samples,) Conformity scores for each sample. alpha_np: NDArray of shape (n_alpha,) NDArray of floats between 0 and 1, representing the uncertainty of the confidence interval. estimator: EnsembleClassifier Estimator that is fitted to predict y from X. agg_scores: Optional[str] Method to aggregate the scores from the base estimators. If "mean", the scores are averaged. If "crossval", the scores are obtained from cross-validation. By default ``"mean"``. Returns -------- NDArray Array of quantiles with respect to alpha_np. """ n = len(conformity_scores) if estimator.cv == "prefit" or agg_scores in ["mean"]: quantiles_ = compute_quantiles(conformity_scores, alpha_np) else: quantiles_ = (n + 1) * (1 - alpha_np) return quantiles_
def _compute_v_parameter( self, y_proba_last_cumsumed: NDArray, threshold: NDArray, y_pred_proba_last: NDArray, prediction_sets: NDArray, **kwargs ) -> NDArray: """ Compute the V parameters from Romano+(2020). Parameters ----------- y_proba_last_cumsumed: NDArray of shape (n_samples, n_alpha) Cumulated score of the last included label. threshold: NDArray of shape (n_alpha,) or shape (n_samples_train,) Threshold to compare with y_proba_last_cumsum. y_pred_proba_last: NDArray of shape (n_samples, 1, n_alpha) Last included probability. predicition_sets: NDArray of shape (n_samples, n_alpha) Prediction sets. Returns -------- NDArray of shape (n_samples, n_alpha) Vs parameters. """ # compute V parameter from Romano+(2020) v_param = ( (y_proba_last_cumsumed - threshold.reshape(1, -1)) / y_pred_proba_last[:, 0, :] ) return v_param def _add_random_tie_breaking( self, prediction_sets: NDArray, y_pred_index_last: NDArray, y_pred_proba_cumsum: NDArray, y_pred_proba_last: NDArray, threshold: NDArray, **kwargs ) -> NDArray: """ Randomly remove last label from prediction set based on the comparison between a random number and the difference between cumulated score of the last included label and the quantile. Parameters ---------- prediction_sets: NDArray of shape (n_samples, n_classes, n_threshold) Prediction set for each observation and each alpha. y_pred_index_last: NDArray of shape (n_samples, threshold) Index of the last included label. y_pred_proba_cumsum: NDArray of shape (n_samples, n_classes) Cumsumed probability of the model in the original order. y_pred_proba_last: NDArray of shape (n_samples, 1, threshold) Last included probability. threshold: NDArray of shape (n_alpha,) or shape (n_samples_train,) Threshold to compare with y_proba_last_cumsum, can be either: - the quantiles associated with alpha values when ``cv`` == "prefit", ``cv`` == "split" or ``agg_scores`` is "mean" - the conformity score from training samples otherwise (i.e., when ``cv`` is CV splitter and ``agg_scores`` is "crossval") Returns ------- NDArray of shape (n_samples, n_classes, n_alpha) Updated version of prediction_sets with randomly removed labels. """ # get cumsumed probabilities up to last retained label y_proba_last_cumsumed = np.squeeze( np.take_along_axis( y_pred_proba_cumsum, y_pred_index_last, axis=1 ), axis=1 ) # get the V parameter from Romano+(2020) or Angelopoulos+(2020) v_param = self._compute_v_parameter( y_proba_last_cumsumed, threshold, y_pred_proba_last, prediction_sets ) # get random numbers for each observation and alpha value random_state = check_random_state(self.random_state) random_state = cast(np.random.RandomState, random_state) u_param = random_state.uniform(size=(prediction_sets.shape[0], 1)) # remove last label from comparison between uniform number and V label_to_keep = np.less_equal(v_param - u_param, EPSILON) np.put_along_axis( prediction_sets, y_pred_index_last, label_to_keep[:, np.newaxis, :], axis=1 ) return prediction_sets
[docs] def get_prediction_sets( self, y_pred_proba: NDArray, conformity_scores: NDArray, alpha_np: NDArray, estimator: EnsembleClassifier, agg_scores: Optional[str] = "mean", include_last_label: Optional[Union[bool, str]] = True, **kwargs ) -> NDArray: """ Generate prediction sets based on the probability predictions, the conformity scores and the uncertainty level. Parameters ----------- y_pred_proba: NDArray of shape (n_samples, n_classes) Target prediction. conformity_scores: NDArray of shape (n_samples,) Conformity scores for each sample. alpha_np: NDArray of shape (n_alpha,) NDArray of floats between 0 and 1, representing the uncertainty of the confidence interval. estimator: EnsembleClassifier Estimator that is fitted to predict y from X. agg_scores: Optional[str] Method to aggregate the scores from the base estimators. If "mean", the scores are averaged. If "crossval", the scores are obtained from cross-validation. By default ``"mean"``. include_last_label: Optional[Union[bool, str]] Whether or not to include last label in prediction sets. Choose among ``False``, ``True`` or ``"randomized"``. By default, ``True``. Returns -------- NDArray Array of quantiles with respect to alpha_np. """ include_last_label = check_include_last_label(include_last_label) # specify which thresholds will be used if estimator.cv == "prefit" or agg_scores in ["mean"]: thresholds = self.quantiles_ else: thresholds = conformity_scores.ravel() # sort labels by decreasing probability y_pred_proba_cumsum, y_pred_index_last, y_pred_proba_last = ( self._get_last_included_proba( y_pred_proba, thresholds, include_last_label, prediction_phase=True, **kwargs ) ) # get the prediction set by taking all probabilities above the last one if estimator.cv == "prefit" or agg_scores in ["mean"]: y_pred_included = np.greater_equal( y_pred_proba - y_pred_proba_last, -EPSILON ) else: y_pred_included = np.less_equal( y_pred_proba - y_pred_proba_last, EPSILON ) # remove last label randomly if include_last_label == "randomized": y_pred_included = self._add_random_tie_breaking( y_pred_included, y_pred_index_last, y_pred_proba_cumsum, y_pred_proba_last, thresholds, **kwargs ) if estimator.cv == "prefit" or agg_scores in ["mean"]: prediction_sets = y_pred_included else: # compute the number of times the inequality is verified prediction_sets_summed = y_pred_included.sum(axis=2) prediction_sets = np.less_equal( prediction_sets_summed[:, :, np.newaxis] - self.quantiles_[np.newaxis, np.newaxis, :], EPSILON ) return prediction_sets