from __future__ import annotations
import warnings
from typing import Any, Iterable, List, Optional, Tuple, Union, cast
import numpy as np
from joblib import Parallel, delayed
from sklearn.base import BaseEstimator, ClassifierMixin, clone
from sklearn.model_selection import BaseCrossValidator, ShuffleSplit
from sklearn.preprocessing import LabelEncoder, label_binarize
from sklearn.utils import _safe_indexing, check_random_state
from sklearn.utils.multiclass import (check_classification_targets,
type_of_target)
from sklearn.utils.validation import (_check_y, _num_samples, check_is_fitted,
indexable)
from ._machine_precision import EPSILON
from ._typing import ArrayLike, NDArray
from .metrics import classification_mean_width_score
from .utils import (check_alpha, check_alpha_and_n_samples, check_cv,
check_estimator_classification, check_n_features_in,
check_n_jobs, check_null_weight, check_verbose,
compute_quantiles, fit_estimator, fix_number_of_classes)
[docs]class MapieClassifier(BaseEstimator, ClassifierMixin):
"""
Prediction sets for classification.
This class implements several conformal prediction strategies for
estimating prediction sets for classification. Instead of giving a
single predicted label, the idea is to give a set of predicted labels
(or prediction sets) which come with mathematically guaranteed coverages.
Parameters
----------
estimator: Optional[ClassifierMixin]
Any classifier with scikit-learn API
(i.e. with fit, predict, and predict_proba methods), by default None.
If ``None``, estimator defaults to a ``LogisticRegression`` instance.
method: Optional[str]
Method to choose for prediction interval estimates.
Choose among:
- ``"naive"``, sum of the probabilities until the 1-alpha thresold.
- ``"lac"`` (formerly called ``"score"``), Least Ambiguous set-valued
Classifier. It is based on the the scores
(i.e. 1 minus the softmax score of the true label)
on the calibration set. See [1] for more details.
- ``"aps"`` (formerly called "cumulated_score"), Adaptive Prediction
Sets method. It is based on the sum of the softmax outputs of the
labels until the true label is reached, on the calibration set.
See [2] for more details.
- ``"raps"``, Regularized Adaptive Prediction Sets method. It uses the
same technique as ``"aps"`` method but with a penalty term
to reduce the size of prediction sets. See [3] for more
details. For now, this method only works with ``"prefit"`` and
``"split"`` strategies.
- ``"top_k"``, based on the sorted index of the probability of the true
label in the softmax outputs, on the calibration set. In case two
probabilities are equal, both are taken, thus, the size of some
prediction sets may be different from the others. See [3] for
more details.
By default ``"lac"``.
cv: Optional[str]
The cross-validation strategy for computing scores.
It directly drives the distinction between jackknife and cv variants.
Choose among:
- ``None``, to use the default 5-fold cross-validation
- integer, to specify the number of folds.
If equal to -1, equivalent to
``sklearn.model_selection.LeaveOneOut()``.
- CV splitter: any ``sklearn.model_selection.BaseCrossValidator``
Main variants are:
- ``sklearn.model_selection.LeaveOneOut`` (jackknife),
- ``sklearn.model_selection.KFold`` (cross-validation)
- ``"split"``, does not involve cross-validation but a division
of the data into training and calibration subsets. The splitter
used is the following: ``sklearn.model_selection.ShuffleSplit``.
- ``"prefit"``, assumes that ``estimator`` has been fitted already.
All data provided in the ``fit`` method is then used
to calibrate the predictions through the score computation.
At prediction time, quantiles of these scores are used to estimate
prediction sets.
By default ``None``.
test_size: Optional[Union[int, float]]
If float, should be between 0.0 and 1.0 and represent the proportion
of the dataset to include in the test split. If int, represents the
absolute number of test samples. If None, it will be set to 0.1.
If cv is not ``"split"``, ``test_size`` is ignored.
By default ``None``.
n_jobs: Optional[int]
Number of jobs for parallel processing using joblib
via the "locky" backend.
At this moment, parallel processing is disabled.
If ``-1`` all CPUs are used.
If ``1`` is given, no parallel computing code is used at all,
which is useful for debugging.
For n_jobs below ``-1``, ``(n_cpus + 1 + n_jobs)`` are used.
None is a marker for `unset` that will be interpreted as ``n_jobs=1``
(sequential execution).
By default ``None``.
random_state: Optional[Union[int, RandomState]]
Pseudo random number generator state used for random uniform sampling
for evaluation quantiles and prediction sets.
Pass an int for reproducible output across multiple function calls.
By default ``None``.
verbose: int, optional
The verbosity level, used with joblib for multiprocessing.
At this moment, parallel processing is disabled.
The frequency of the messages increases with the verbosity level.
If it more than ``10``, all iterations are reported.
Above ``50``, the output is sent to stdout.
By default ``0``.
Attributes
----------
valid_methods: List[str]
List of all valid methods.
single_estimator_: sklearn.ClassifierMixin
Estimator fitted on the whole training set.
n_features_in_: int
Number of features passed to the fit method.
conformity_scores_: ArrayLike of shape (n_samples_train)
The conformity scores used to calibrate the prediction sets.
quantiles_: ArrayLike of shape (n_alpha)
The quantiles estimated from ``conformity_scores_`` and alpha values.
References
----------
[1] Mauricio Sadinle, Jing Lei, and Larry Wasserman.
"Least Ambiguous Set-Valued Classifiers with Bounded Error Levels.",
Journal of the American Statistical Association, 114, 2019.
[2] Yaniv Romano, Matteo Sesia and Emmanuel J. Candès.
"Classification with Valid and Adaptive Coverage."
NeurIPS 202 (spotlight) 2020.
[3] Anastasios Nikolas Angelopoulos, Stephen Bates, Michael Jordan
and Jitendra Malik.
"Uncertainty Sets for Image Classifiers using Conformal Prediction."
International Conference on Learning Representations 2021.
Examples
--------
>>> import numpy as np
>>> from sklearn.naive_bayes import GaussianNB
>>> from mapie.classification import MapieClassifier
>>> X_toy = np.arange(9).reshape(-1, 1)
>>> y_toy = np.stack([0, 0, 1, 0, 1, 2, 1, 2, 2])
>>> clf = GaussianNB().fit(X_toy, y_toy)
>>> mapie = MapieClassifier(estimator=clf, cv="prefit").fit(X_toy, y_toy)
>>> _, y_pi_mapie = mapie.predict(X_toy, alpha=0.2)
>>> print(y_pi_mapie[:, :, 0])
[[ True False False]
[ True False False]
[ True True False]
[ True True False]
[False True False]
[False True True]
[False True True]
[False False True]
[False False True]]
"""
raps_valid_cv_ = ["prefit", "split"]
valid_methods_ = [
"naive", "score", "lac", "cumulated_score", "aps", "top_k", "raps"
]
fit_attributes = [
"single_estimator_",
"estimators_",
"k_",
"n_features_in_",
"conformity_scores_",
"classes_",
"label_encoder_"
]
[docs] def __init__(
self,
estimator: Optional[ClassifierMixin] = None,
method: str = "lac",
cv: Optional[Union[int, str, BaseCrossValidator]] = None,
test_size: Optional[Union[int, float]] = None,
n_jobs: Optional[int] = None,
random_state: Optional[Union[int, np.random.RandomState]] = None,
verbose: int = 0
) -> None:
self.estimator = estimator
self.method = method
self.cv = cv
self.test_size = test_size
self.n_jobs = n_jobs
self.random_state = random_state
self.verbose = verbose
def _check_parameters(self) -> None:
"""
Perform several checks on input parameters.
Raises
------
ValueError
If parameters are not valid.
"""
if self.method not in self.valid_methods_:
raise ValueError(
"Invalid method. "
f"Allowed values are {self.valid_methods_}."
)
check_n_jobs(self.n_jobs)
check_verbose(self.verbose)
check_random_state(self.random_state)
self._check_depreciated()
self._check_raps()
def _check_depreciated(self) -> None:
"""
Check if the chosen method is outdated.
Raises
------
Warning
If method is ``"score"`` (not ``"lac"``) or
if method is ``"cumulated_score"`` (not ``"aps"``).
"""
if self.method == "score":
warnings.warn(
"WARNING: Deprecated method. "
+ "The method \"score\" is outdated. "
+ "Prefer to use \"lac\" instead to keep "
+ "the same behavior in the next release.",
DeprecationWarning
)
if self.method == "cumulated_score":
warnings.warn(
"WARNING: Deprecated method. "
+ "The method \"cumulated_score\" is outdated. "
+ "Prefer to use \"aps\" instead to keep "
+ "the same behavior in the next release.",
DeprecationWarning
)
def _check_target(self, y: ArrayLike) -> None:
"""
Check that if the type of target is binary,
(then the method have to be ``"lac"``), or multi-class.
Parameters
----------
y: NDArray of shape (n_samples,)
Training labels.
Raises
------
ValueError
If type of target is binary and method is not ``"lac"``
or ``"score"`` or if type of target is not multi-class.
"""
check_classification_targets(y)
if type_of_target(y) == "binary" and \
self.method not in ["score", "lac"]:
raise ValueError(
"Invalid method for binary target. "
"Your target is not of type multiclass and "
"allowed values for binary type are "
f"{['score', 'lac']}."
)
def _check_raps(self):
"""
Check that if the method used is ``"raps"``, then
the cross validation strategy is ``"prefit"``.
Raises
------
ValueError
If ``method`` is ``"raps"`` and ``cv`` is not ``"prefit"``.
"""
if (self.method == "raps") and (
(self.cv not in self.raps_valid_cv_)
or isinstance(self.cv, ShuffleSplit)
):
raise ValueError(
"RAPS method can only be used "
f"with cv in {self.raps_valid_cv_}."
)
def _check_include_last_label(
self,
include_last_label: Optional[Union[bool, str]]
) -> Optional[Union[bool, str]]:
"""
Check if ``include_last_label`` is a boolean or a string.
Else raise error.
Parameters
----------
include_last_label: Optional[Union[bool, str]]
Whether or not to include last label in
prediction sets for the ``"aps"`` method. Choose among:
- ``False``, does not include label whose cumulated score is just
over the quantile.
- ``True``, includes label whose cumulated score is just over the
quantile, unless there is only one label in the prediction set.
- ``"randomized"``, randomly includes label whose cumulated score
is just over the quantile based on the comparison of a uniform
number and the difference between the cumulated score of the last
label and the quantile.
Returns
-------
Optional[Union[bool, str]]
Raises
------
ValueError
"Invalid include_last_label argument. "
"Should be a boolean or 'randomized'."
"""
if (
(not isinstance(include_last_label, bool)) and
(not include_last_label == "randomized")
):
raise ValueError(
"Invalid include_last_label argument. "
"Should be a boolean or 'randomized'."
)
else:
return include_last_label
def _check_proba_normalized(
self,
y_pred_proba: ArrayLike,
axis: int = 1
) -> NDArray:
"""
Check if, for all the observations, the sum of
the probabilities is equal to one.
Parameters
----------
y_pred_proba: ArrayLike of shape
(n_samples, n_classes) or
(n_samples, n_train_samples, n_classes)
Softmax output of a model.
Returns
-------
ArrayLike of shape (n_samples, n_classes)
Softmax output of a model if the scores all sum
to one.
Raises
------
ValueError
If the sum of the scores is not equal to one.
"""
np.testing.assert_allclose(
np.sum(y_pred_proba, axis=axis),
1,
err_msg="The sum of the scores is not equal to one.",
rtol=1e-5
)
y_pred_proba = cast(NDArray, y_pred_proba).astype(np.float64)
return y_pred_proba
def _get_last_index_included(
self,
y_pred_proba_cumsum: NDArray,
threshold: NDArray,
include_last_label: Optional[Union[bool, str]]
) -> NDArray:
"""
Return the index of the last included sorted probability
depending if we included the first label over the quantile
or not.
Parameters
----------
y_pred_proba_cumsum: NDArray of shape (n_samples, n_classes)
Cumsumed probabilities in the original order.
threshold: NDArray of shape (n_alpha,) or shape (n_samples_train,)
Threshold to compare with y_proba_last_cumsum, can be either:
- the quantiles associated with alpha values when
``cv`` == "prefit", ``cv`` == "split"
or ``agg_scores`` is "mean"
- the conformity score from training samples otherwise
(i.e., when ``cv`` is a CV splitter and
``agg_scores`` is "crossval")
include_last_label: Union[bool, str]
Whether or not include the last label. If 'randomized',
the last label is included.
Returns
-------
NDArray of shape (n_samples, n_alpha)
Index of the last included sorted probability.
"""
if (
(include_last_label) or
(include_last_label == 'randomized')
):
y_pred_index_last = (
np.ma.masked_less(
y_pred_proba_cumsum
- threshold[np.newaxis, :],
-EPSILON
).argmin(axis=1)
)
elif (include_last_label is False):
max_threshold = np.maximum(
threshold[np.newaxis, :],
np.min(y_pred_proba_cumsum, axis=1)
)
y_pred_index_last = np.argmax(
np.ma.masked_greater(
y_pred_proba_cumsum - max_threshold[:, np.newaxis, :],
EPSILON
), axis=1
)
else:
raise ValueError(
"Invalid include_last_label argument. "
"Should be a boolean or 'randomized'."
)
return y_pred_index_last[:, np.newaxis, :]
def _add_random_tie_breaking(
self,
prediction_sets: NDArray,
y_pred_index_last: NDArray,
y_pred_proba_cumsum: NDArray,
y_pred_proba_last: NDArray,
threshold: NDArray,
lambda_star: Union[NDArray, float, None],
k_star: Union[NDArray, None]
) -> NDArray:
"""
Randomly remove last label from prediction set based on the
comparison between a random number and the difference between
cumulated score of the last included label and the quantile.
Parameters
----------
prediction_sets: NDArray of shape
(n_samples, n_classes, n_threshold)
Prediction set for each observation and each alpha.
y_pred_index_last: NDArray of shape (n_samples, threshold)
Index of the last included label.
y_pred_proba_cumsum: NDArray of shape (n_samples, n_classes)
Cumsumed probability of the model in the original order.
y_pred_proba_last: NDArray of shape (n_samples, 1, threshold)
Last included probability.
threshold: NDArray of shape (n_alpha,) or shape (n_samples_train,)
Threshold to compare with y_proba_last_cumsum, can be either:
- the quantiles associated with alpha values when
``cv`` == "prefit", ``cv`` == "split" or
``agg_scores`` is "mean"
- the conformity score from training samples otherwise
(i.e., when ``cv`` is a CV splitter and
``agg_scores`` is "crossval")
lambda_star: Union[NDArray, float, None] of shape (n_alpha):
Optimal value of the regulizer lambda.
k_star: Union[NDArray, None] of shape (n_alpha):
Optimal value of the regulizer k.
Returns
-------
NDArray of shape (n_samples, n_classes, n_alpha)
Updated version of prediction_sets with randomly removed
labels.
"""
# get cumsumed probabilities up to last retained label
y_proba_last_cumsumed = np.squeeze(
np.take_along_axis(
y_pred_proba_cumsum,
y_pred_index_last,
axis=1
), axis=1
)
if self.method in ["cumulated_score", "aps"]:
# compute V parameter from Romano+(2020)
vs = (
(y_proba_last_cumsumed - threshold.reshape(1, -1)) /
y_pred_proba_last[:, 0, :]
)
else:
# compute V parameter from Angelopoulos+(2020)
L = np.sum(prediction_sets, axis=1)
vs = (
(y_proba_last_cumsumed - threshold.reshape(1, -1)) /
(
y_pred_proba_last[:, 0, :] -
lambda_star * np.maximum(0, L - k_star) +
lambda_star * (L > k_star)
)
)
# get random numbers for each observation and alpha value
random_state = check_random_state(self.random_state)
us = random_state.uniform(size=(prediction_sets.shape[0], 1))
# remove last label from comparison between uniform number and V
vs_less_than_us = np.less_equal(vs - us, EPSILON)
np.put_along_axis(
prediction_sets,
y_pred_index_last,
vs_less_than_us[:, np.newaxis, :],
axis=1
)
return prediction_sets
def _predict_oof_model(
self,
estimator: ClassifierMixin,
X: ArrayLike,
) -> NDArray:
"""
Predict probabilities of a test set from a fitted estimator.
Parameters
----------
estimator: ClassifierMixin
Fitted estimator.
X: ArrayLike
Test set.
Returns
-------
ArrayLike
Predicted probabilities.
"""
y_pred_proba = estimator.predict_proba(X)
# we enforce y_pred_proba to contain all labels included in y
if len(estimator.classes_) != self.n_classes_:
y_pred_proba = fix_number_of_classes(
self.n_classes_,
estimator.classes_,
y_pred_proba
)
y_pred_proba = self._check_proba_normalized(y_pred_proba)
return y_pred_proba
def _fit_and_predict_oof_model(
self,
estimator: ClassifierMixin,
X: ArrayLike,
y: ArrayLike,
train_index: ArrayLike,
val_index: ArrayLike,
k: int,
sample_weight: Optional[ArrayLike] = None,
**fit_params,
) -> Tuple[ClassifierMixin, NDArray, NDArray, ArrayLike]:
"""
Fit a single out-of-fold model on a given training set and
perform predictions on a test set.
Parameters
----------
estimator: ClassifierMixin
Estimator to train.
X: ArrayLike of shape (n_samples, n_features)
Input data.
y: ArrayLike of shape (n_samples,)
Input labels.
train_index: np.ndarray of shape (n_samples_train)
Training data indices.
val_index: np.ndarray of shape (n_samples_val)
Validation data indices.
k: int
Split identification number.
sample_weight: Optional[ArrayLike] of shape (n_samples,)
Sample weights. If None, then samples are equally weighted.
By default None.
**fit_params : dict
Additional fit parameters.
Returns
-------
Tuple[ClassifierMixin, NDArray, NDArray, ArrayLike]
- [0]: ClassifierMixin, fitted estimator
- [1]: NDArray of shape (n_samples_val,),
Estimator predictions on the validation fold,
- [2]: NDArray of shape (n_samples_val,)
Identification number of the validation fold,
- [3]: ArrayLike of shape (n_samples_val,)
Validation data indices
"""
X_train = _safe_indexing(X, train_index)
y_train = _safe_indexing(y, train_index)
X_val = _safe_indexing(X, val_index)
y_val = _safe_indexing(y, val_index)
if sample_weight is None:
estimator = fit_estimator(
estimator, X_train, y_train, **fit_params
)
else:
sample_weight_train = _safe_indexing(sample_weight, train_index)
estimator = fit_estimator(
estimator, X_train, y_train, sample_weight_train, **fit_params
)
if _num_samples(X_val) > 0:
y_pred_proba = self._predict_oof_model(estimator, X_val)
else:
y_pred_proba = np.array([])
val_id = np.full_like(y_val, k, dtype=int)
return estimator, y_pred_proba, val_id, val_index
def _get_true_label_cumsum_proba(
self,
y: ArrayLike,
y_pred_proba: NDArray
) -> Tuple[NDArray, NDArray]:
"""
Compute the cumsumed probability of the true label.
Parameters
----------
y: NDArray of shape (n_samples, )
Array with the labels.
y_pred_proba: NDArray of shape (n_samples, n_classes)
Predictions of the model.
Returns
-------
Tuple[NDArray, NDArray] of shapes
(n_samples, 1) and (n_samples, ). The first element
is the cumsum probability of the true label. The second
is the sorted position of the true label.
"""
y_true = label_binarize(
y=y, classes=self.classes_
)
index_sorted = np.fliplr(np.argsort(y_pred_proba, axis=1))
y_pred_proba_sorted = np.take_along_axis(
y_pred_proba, index_sorted, axis=1
)
y_true_sorted = np.take_along_axis(y_true, index_sorted, axis=1)
y_pred_proba_sorted_cumsum = np.cumsum(y_pred_proba_sorted, axis=1)
cutoff = np.argmax(y_true_sorted, axis=1)
true_label_cumsum_proba = np.take_along_axis(
y_pred_proba_sorted_cumsum, cutoff.reshape(-1, 1), axis=1
)
return true_label_cumsum_proba, cutoff + 1
def _regularize_conformity_score(
self,
k_star: NDArray,
lambda_: Union[NDArray, float],
conf_score: NDArray,
cutoff: NDArray
) -> NDArray:
"""
Regularize the conformity scores with the ``"raps"``
method. See algo. 2 in [3].
Parameters
----------
k_star: NDArray of shape (n_alphas, )
Optimal value of k (called k_reg in the paper). There
is one value per alpha.
lambda_: Union[NDArray, float] of shape (n_alphas, )
One value of lambda for each alpha.
conf_score: NDArray of shape (n_samples, 1)
Conformity scores.
cutoff: NDArray of shape (n_samples, 1)
Position of the true label.
Returns
-------
NDArray of shape (n_samples, 1, n_alphas)
Regularized conformity scores. The regularization
depends on the value of alpha.
"""
conf_score = np.repeat(
conf_score[:, :, np.newaxis], len(k_star), axis=2
)
cutoff = np.repeat(
cutoff[:, np.newaxis], len(k_star), axis=1
)
conf_score += np.maximum(
np.expand_dims(
lambda_ * (cutoff - k_star),
axis=1
),
0
)
return conf_score
def _get_true_label_position(
self,
y_pred_proba: NDArray,
y: NDArray
) -> NDArray:
"""
Return the sorted position of the true label in the
prediction
Parameters
----------
y_pred_proba: NDArray of shape (n_samples, n_calsses)
Model prediction.
y: NDArray of shape (n_samples)
Labels.
Returns
-------
NDArray of shape (n_samples, 1)
Position of the true label in the prediction.
"""
index = np.argsort(
np.fliplr(np.argsort(y_pred_proba, axis=1))
)
position = np.take_along_axis(
index,
y.reshape(-1, 1),
axis=1
)
return position
def _get_last_included_proba(
self,
y_pred_proba: NDArray,
thresholds: NDArray,
include_last_label: Union[bool, str, None],
lambda_: Union[NDArray, float, None],
k_star: Union[NDArray, Any]
) -> Tuple[NDArray, NDArray, NDArray]:
"""
Function that returns the smallest score
among those which are included in the prediciton set.
Parameters
----------
y_pred_proba: NDArray of shape (n_samples, n_classes)
Predictions of the model.
thresholds: NDArray of shape (n_alphas, )
Quantiles that have been computed from the conformity
scores.
include_last_label: Union[bool, str, None]
Whether to include or not the label whose score
exceeds the threshold.
lambda_: Union[NDArray, float, None] of shape (n_alphas)
Values of lambda for the regularization.
k_star: Union[NDArray, Any]
Values of k for the regularization.
Returns
-------
Tuple[ArrayLike, ArrayLike, ArrayLike]
Arrays of shape (n_samples, n_classes, n_alphas),
(n_samples, 1, n_alphas) and (n_samples, 1, n_alphas).
They are respectively the cumsumed scores in the original
order which can be different according to the value of alpha
with the RAPS method, the index of the last included score
and the value of the last included score.
"""
index_sorted = np.flip(
np.argsort(y_pred_proba, axis=1), axis=1
)
# sort probabilities by decreasing order
y_pred_proba_sorted = np.take_along_axis(
y_pred_proba, index_sorted, axis=1
)
# get sorted cumulated score
y_pred_proba_sorted_cumsum = np.cumsum(
y_pred_proba_sorted, axis=1
)
if self.method == "raps":
y_pred_proba_sorted_cumsum += lambda_ * np.maximum(
0,
np.cumsum(
np.ones(y_pred_proba_sorted_cumsum.shape),
axis=1
) - k_star
)
# get cumulated score at their original position
y_pred_proba_cumsum = np.take_along_axis(
y_pred_proba_sorted_cumsum,
np.argsort(index_sorted, axis=1),
axis=1
)
# get index of the last included label
y_pred_index_last = self._get_last_index_included(
y_pred_proba_cumsum,
thresholds,
include_last_label
)
# get the probability of the last included label
y_pred_proba_last = np.take_along_axis(
y_pred_proba,
y_pred_index_last,
axis=1
)
zeros_scores_proba_last = (y_pred_proba_last <= EPSILON)
# If the last included proba is zero, change it to the
# smallest non-zero value to avoid inluding them in the
# prediction sets.
if np.sum(zeros_scores_proba_last) > 0:
y_pred_proba_last[zeros_scores_proba_last] = np.expand_dims(
np.min(
np.ma.masked_less(
y_pred_proba,
EPSILON
).filled(fill_value=np.inf),
axis=1
), axis=1
)[zeros_scores_proba_last]
return y_pred_proba_cumsum, y_pred_index_last, y_pred_proba_last
def _update_size_and_lambda(
self,
best_sizes: NDArray,
alpha_np: NDArray,
y_ps: NDArray,
lambda_: Union[NDArray, float],
lambda_star: NDArray
) -> Tuple[NDArray, NDArray]:
"""Update the values of the optimal lambda if the
average size of the prediction sets decreases with
this new value of lambda.
Parameters
----------
best_sizes: NDArray of shape (n_alphas, )
Smallest average prediciton set size before testing
for the new value of lambda_
alpha_np: NDArray of shape (n_alphas)
Level of confidences.
y_ps: NDArray of shape (n_samples, n_classes, n_alphas)
Prediction sets computed with the RAPS method and the
new value of lambda_
lambda_: NDArray of shape (n_alphas, )
New value of lambda_star to test
lambda_star: NDArray of shape (n_alphas, )
Actual optimal lambda values for each alpha.
Returns
-------
Tuple[NDArray, NDArray]
Arrays of shape (n_alphas, ) and (n_alpha, ) which
respectively represent the updated values of lambda_star
and the new best sizes.
"""
sizes = [
classification_mean_width_score(
y_ps[:, :, i]
) for i in range(len(alpha_np))
]
sizes_improve = (sizes < best_sizes - EPSILON)
lambda_star = (
sizes_improve * lambda_ + (1 - sizes_improve) * lambda_star
)
best_sizes = sizes_improve * sizes + (1 - sizes_improve) * best_sizes
return lambda_star, best_sizes
def _find_lambda_star(
self,
y_pred_proba_raps: NDArray,
alpha_np: NDArray,
include_last_label: Union[bool, str, None],
k_star: NDArray
) -> Union[NDArray, float]:
"""Find the optimal value of lambda for each alpha.
Parameters
----------
y_pred_proba_raps: NDArray of shape (n_samples, n_labels, n_alphas)
Predictions of the model repeated on the last axis as many times
as the number of alphas
alpha_np: NDArray of shape (n_alphas, )
Levels of confidences.
include_last_label: bool
Whether to include or not last label in
the prediction sets
k_star: NDArray of shape (n_alphas, )
Values of k for the regularization.
Returns
-------
ArrayLike of shape (n_alphas, )
Optimal values of lambda.
"""
lambda_star = np.zeros(len(alpha_np))
best_sizes = np.full(len(alpha_np), np.finfo(np.float64).max)
for lambda_ in [.001, .01, .1, .2, .5]: # values given in paper[3]
true_label_cumsum_proba, cutoff = (
self._get_true_label_cumsum_proba(
self.y_raps_no_enc,
y_pred_proba_raps[:, :, 0],
)
)
true_label_cumsum_proba_reg = self._regularize_conformity_score(
k_star,
lambda_,
true_label_cumsum_proba,
cutoff
)
quantiles_ = compute_quantiles(
true_label_cumsum_proba_reg,
alpha_np
)
_, _, y_pred_proba_last = self._get_last_included_proba(
y_pred_proba_raps,
quantiles_,
include_last_label,
lambda_,
k_star
)
y_ps = np.greater_equal(
y_pred_proba_raps - y_pred_proba_last, -EPSILON
)
lambda_star, best_sizes = self._update_size_and_lambda(
best_sizes, alpha_np, y_ps, lambda_, lambda_star
)
if len(lambda_star) == 1:
lambda_star = lambda_star[0]
return lambda_star
def _get_classes_info(
self, estimator: ClassifierMixin, y: NDArray
) -> Tuple[int, NDArray]:
"""
Compute the number of classes and the classes values
according to either the pre-trained model or to the
values in y.
Parameters
----------
estimator: ClassifierMixin
Estimator pre-fitted or not.
y: NDArray
Values to predict.
Returns
-------
Tuple[int, NDArray]
The number of unique classes and their unique
values.
Raises
------
ValueError
If `cv="prefit"` and that classes in `y` are not included into
`estimator.classes_`.
Warning
If number of calibration labels is lower than number of labels
for training (in prefit setting)
"""
n_unique_y_labels = len(np.unique(y))
if self.cv == "prefit":
classes = estimator.classes_
n_classes = len(np.unique(classes))
if not set(np.unique(y)).issubset(classes):
raise ValueError(
"Values in y do not matched values in estimator.classes_."
+ " Check that you are not adding any new label"
)
if n_classes > n_unique_y_labels:
warnings.warn(
"WARNING: your calibration dataset has less labels"
+ " than your training dataset (training"
+ f" has {n_classes} unique labels while"
+ f" calibration have {n_unique_y_labels} unique labels"
)
else:
n_classes = n_unique_y_labels
classes = np.unique(y)
return n_classes, classes
[docs] def fit(
self,
X: ArrayLike,
y: ArrayLike,
sample_weight: Optional[ArrayLike] = None,
size_raps: Optional[float] = .2,
groups: Optional[ArrayLike] = None,
**fit_params,
) -> MapieClassifier:
"""
Fit the base estimator or use the fitted base estimator.
Parameters
----------
X: ArrayLike of shape (n_samples, n_features)
Training data.
y: NDArray of shape (n_samples,)
Training labels.
sample_weight: Optional[ArrayLike] of shape (n_samples,)
Sample weights for fitting the out-of-fold models.
If None, then samples are equally weighted.
If some weights are null,
their corresponding observations are removed
before the fitting process and hence have no prediction sets.
By default ``None``.
size_raps: Optional[float]
Percentage of the data to be used for choosing lambda_star and
k_star for the RAPS method.
By default ``.2``.
groups: Optional[ArrayLike] of shape (n_samples,)
Group labels for the samples used while splitting the dataset into
train/test set.
By default ``None``.
**fit_params : dict
Additional fit parameters.
Returns
-------
MapieClassifier
The model itself.
"""
# Checks
self._check_parameters()
cv = check_cv(
self.cv, test_size=self.test_size, random_state=self.random_state
)
X, y = indexable(X, y)
y = _check_y(y)
sample_weight = cast(Optional[NDArray], sample_weight)
groups = cast(Optional[NDArray], groups)
sample_weight, X, y = check_null_weight(sample_weight, X, y)
y = cast(NDArray, y)
estimator = check_estimator_classification(
X,
y,
cv,
self.estimator
)
self.n_features_in_ = check_n_features_in(X, cv, estimator)
n_samples = _num_samples(y)
self.n_classes_, self.classes_ = self._get_classes_info(
estimator, y
)
enc = LabelEncoder()
enc.fit(self.classes_)
y_enc = enc.transform(y)
self.label_encoder_ = enc
self._check_target(y)
# Initialization
self.estimators_: List[ClassifierMixin] = []
self.k_ = np.empty_like(y, dtype=int)
self.n_samples_ = _num_samples(X)
if self.method == "raps":
raps_split = ShuffleSplit(
1, test_size=size_raps, random_state=self.random_state
)
train_raps_index, val_raps_index = next(raps_split.split(X))
X, self.X_raps, y_enc, self.y_raps = \
_safe_indexing(X, train_raps_index), \
_safe_indexing(X, val_raps_index), \
_safe_indexing(y_enc, train_raps_index), \
_safe_indexing(y_enc, val_raps_index)
self.y_raps_no_enc = self.label_encoder_.inverse_transform(
self.y_raps
)
y = self.label_encoder_.inverse_transform(y_enc)
y_enc = cast(NDArray, y_enc)
n_samples = _num_samples(y_enc)
if sample_weight is not None:
sample_weight = sample_weight[train_raps_index]
sample_weight = cast(NDArray, sample_weight)
if groups is not None:
groups = groups[train_raps_index]
groups = cast(NDArray, groups)
# Work
if cv == "prefit":
self.single_estimator_ = estimator
y_pred_proba = self.single_estimator_.predict_proba(X)
y_pred_proba = self._check_proba_normalized(y_pred_proba)
else:
cv = cast(BaseCrossValidator, cv)
self.single_estimator_ = fit_estimator(
clone(estimator), X, y, sample_weight, **fit_params
)
y_pred_proba = np.empty(
(n_samples, self.n_classes_),
dtype=float
)
outputs = Parallel(n_jobs=self.n_jobs, verbose=self.verbose)(
delayed(self._fit_and_predict_oof_model)(
clone(estimator),
X,
y,
train_index,
val_index,
k,
sample_weight,
**fit_params,
)
for k, (train_index, val_index) in enumerate(
cv.split(X, y_enc, groups)
)
)
(
self.estimators_,
predictions_list,
val_ids_list,
val_indices_list
) = map(list, zip(*outputs))
predictions = np.concatenate(
cast(List[NDArray], predictions_list)
)
val_ids = np.concatenate(cast(List[NDArray], val_ids_list))
val_indices = np.concatenate(
cast(List[NDArray], val_indices_list)
)
self.k_[val_indices] = val_ids
y_pred_proba[val_indices] = predictions
if isinstance(cv, ShuffleSplit):
# Should delete values indices that
# are not used during calibration
self.k_ = self.k_[val_indices]
y_pred_proba = y_pred_proba[val_indices]
y_enc = y_enc[val_indices]
y = cast(NDArray, y)[val_indices]
# RAPS: compute y_pred and position on the RAPS validation dataset
if self.method == "raps":
self.y_pred_proba_raps = self.single_estimator_.predict_proba(
self.X_raps
)
self.position_raps = self._get_true_label_position(
self.y_pred_proba_raps,
self.y_raps
)
# Conformity scores
if self.method == "naive":
self.conformity_scores_ = np.empty(
y_pred_proba.shape,
dtype="float"
)
elif self.method in ["score", "lac"]:
self.conformity_scores_ = np.take_along_axis(
1 - y_pred_proba, y_enc.reshape(-1, 1), axis=1
)
elif self.method in ["cumulated_score", "aps", "raps"]:
self.conformity_scores_, self.cutoff = (
self._get_true_label_cumsum_proba(
y,
y_pred_proba
)
)
y_proba_true = np.take_along_axis(
y_pred_proba, y_enc.reshape(-1, 1), axis=1
)
random_state = check_random_state(self.random_state)
u = random_state.uniform(size=len(y_pred_proba)).reshape(-1, 1)
self.conformity_scores_ -= u * y_proba_true
elif self.method == "top_k":
# Here we reorder the labels by decreasing probability
# and get the position of each label from decreasing
# probability
self.conformity_scores_ = self._get_true_label_position(
y_pred_proba,
y_enc
)
else:
raise ValueError(
"Invalid method. "
f"Allowed values are {self.valid_methods_}."
)
if isinstance(cv, ShuffleSplit):
self.single_estimator_ = self.estimators_[0]
return self
[docs] def predict(
self,
X: ArrayLike,
alpha: Optional[Union[float, Iterable[float]]] = None,
include_last_label: Optional[Union[bool, str]] = True,
agg_scores: Optional[str] = "mean"
) -> Union[NDArray, Tuple[NDArray, NDArray]]:
"""
Prediction prediction sets on new samples based on target confidence
interval.
Prediction sets for a given ``alpha`` are deduced from:
- quantiles of softmax scores (``"lac"`` method)
- quantiles of cumulated scores (``"aps"`` method)
Parameters
----------
X: ArrayLike of shape (n_samples, n_features)
Test data.
alpha: Optional[Union[float, Iterable[float]]]
Can be a float, a list of floats, or a ``ArrayLike`` of floats.
Between 0 and 1, represent the uncertainty of the confidence
interval.
Lower ``alpha`` produce larger (more conservative) prediction
sets.
``alpha`` is the complement of the target coverage level.
By default ``None``.
include_last_label: Optional[Union[bool, str]]
Whether or not to include last label in
prediction sets for the "aps" method. Choose among:
- False, does not include label whose cumulated score is just over
the quantile.
- True, includes label whose cumulated score is just over the
quantile, unless there is only one label in the prediction set.
- "randomized", randomly includes label whose cumulated score is
just over the quantile based on the comparison of a uniform
number and the difference between the cumulated score of
the last label and the quantile.
When set to ``True`` or ``False``, it may result in a coverage
higher than ``1 - alpha`` (because contrary to the "randomized"
setting, none of this methods create empty prediction sets). See
[2] and [3] for more details.
By default ``True``.
agg_scores: Optional[str]
How to aggregate the scores output by the estimators on test data
if a cross-validation strategy is used. Choose among:
- "mean", take the mean of scores.
- "crossval", compare the scores between all training data and each
test point for each label to estimate if the label must be
included in the prediction set. Follows algorithm 2 of
Romano+2020.
By default "mean".
Returns
-------
Union[NDArray, Tuple[NDArray, NDArray]]
- NDArray of shape (n_samples,) if alpha is None.
- Tuple[NDArray, NDArray] of shapes
(n_samples,) and (n_samples, n_classes, n_alpha) if alpha is not None.
"""
if self.method == "top_k":
agg_scores = "mean"
# Checks
cv = check_cv(
self.cv, test_size=self.test_size, random_state=self.random_state
)
include_last_label = self._check_include_last_label(include_last_label)
alpha = cast(Optional[NDArray], check_alpha(alpha))
check_is_fitted(self, self.fit_attributes)
lambda_star, k_star = None, None
# Estimate prediction sets
y_pred = self.single_estimator_.predict(X)
if alpha is None:
return y_pred
n = len(self.conformity_scores_)
# Estimate of probabilities from estimator(s)
# In all cases: len(y_pred_proba.shape) == 3
# with (n_test, n_classes, n_alpha or n_train_samples)
alpha_np = cast(NDArray, alpha)
check_alpha_and_n_samples(alpha_np, n)
if cv == "prefit":
y_pred_proba = self.single_estimator_.predict_proba(X)
y_pred_proba = np.repeat(
y_pred_proba[:, :, np.newaxis], len(alpha_np), axis=2
)
else:
y_pred_proba_k = np.asarray(
Parallel(
n_jobs=self.n_jobs, verbose=self.verbose
)(
delayed(self._predict_oof_model)(estimator, X)
for estimator in self.estimators_
)
)
if agg_scores == "crossval":
y_pred_proba = np.moveaxis(y_pred_proba_k[self.k_], 0, 2)
elif agg_scores == "mean":
y_pred_proba = np.mean(y_pred_proba_k, axis=0)
y_pred_proba = np.repeat(
y_pred_proba[:, :, np.newaxis], len(alpha_np), axis=2
)
else:
raise ValueError("Invalid 'agg_scores' argument.")
# Check that sum of probas is equal to 1
y_pred_proba = self._check_proba_normalized(y_pred_proba, axis=1)
# Choice of the quantile
check_alpha_and_n_samples(alpha_np, n)
if self.method == "naive":
self.quantiles_ = 1 - alpha_np
else:
if (cv == "prefit") or (agg_scores in ["mean"]):
if self.method == "raps":
check_alpha_and_n_samples(alpha_np, len(self.X_raps))
k_star = compute_quantiles(
self.position_raps,
alpha_np
) + 1
y_pred_proba_raps = np.repeat(
self.y_pred_proba_raps[:, :, np.newaxis],
len(alpha_np),
axis=2
)
lambda_star = self._find_lambda_star(
y_pred_proba_raps,
alpha_np,
include_last_label,
k_star
)
self.conformity_scores_regularized = (
self._regularize_conformity_score(
k_star,
lambda_star,
self.conformity_scores_,
self.cutoff
)
)
self.quantiles_ = compute_quantiles(
self.conformity_scores_regularized,
alpha_np
)
else:
self.quantiles_ = compute_quantiles(
self.conformity_scores_,
alpha_np
)
else:
self.quantiles_ = (n + 1) * (1 - alpha_np)
# Build prediction sets
if self.method in ["score", "lac"]:
if (cv == "prefit") or (agg_scores == "mean"):
prediction_sets = np.greater_equal(
y_pred_proba - (1 - self.quantiles_), -EPSILON
)
else:
y_pred_included = np.less_equal(
(1 - y_pred_proba) - self.conformity_scores_.ravel(),
EPSILON
).sum(axis=2)
prediction_sets = np.stack(
[
np.greater_equal(
y_pred_included - _alpha * (n - 1), -EPSILON
)
for _alpha in alpha_np
], axis=2
)
elif self.method in ["naive", "cumulated_score", "aps", "raps"]:
# specify which thresholds will be used
if (cv == "prefit") or (agg_scores in ["mean"]):
thresholds = self.quantiles_
else:
thresholds = self.conformity_scores_.ravel()
# sort labels by decreasing probability
y_pred_proba_cumsum, y_pred_index_last, y_pred_proba_last = (
self._get_last_included_proba(
y_pred_proba,
thresholds,
include_last_label,
lambda_star,
k_star,
)
)
# get the prediction set by taking all probabilities
# above the last one
if (cv == "prefit") or (agg_scores in ["mean"]):
y_pred_included = np.greater_equal(
y_pred_proba - y_pred_proba_last, -EPSILON
)
else:
y_pred_included = np.less_equal(
y_pred_proba - y_pred_proba_last, EPSILON
)
# remove last label randomly
if include_last_label == "randomized":
y_pred_included = self._add_random_tie_breaking(
y_pred_included,
y_pred_index_last,
y_pred_proba_cumsum,
y_pred_proba_last,
thresholds,
lambda_star,
k_star
)
if (cv == "prefit") or (agg_scores in ["mean"]):
prediction_sets = y_pred_included
else:
# compute the number of times the inequality is verified
prediction_sets_summed = y_pred_included.sum(axis=2)
prediction_sets = np.less_equal(
prediction_sets_summed[:, :, np.newaxis]
- self.quantiles_[np.newaxis, np.newaxis, :],
EPSILON
)
elif self.method == "top_k":
y_pred_proba = y_pred_proba[:, :, 0]
index_sorted = np.fliplr(np.argsort(y_pred_proba, axis=1))
y_pred_index_last = np.stack(
[
index_sorted[:, quantile]
for quantile in self.quantiles_
], axis=1
)
y_pred_proba_last = np.stack(
[
np.take_along_axis(
y_pred_proba,
y_pred_index_last[:, iq].reshape(-1, 1),
axis=1
)
for iq, _ in enumerate(self.quantiles_)
], axis=2
)
prediction_sets = np.greater_equal(
y_pred_proba[:, :, np.newaxis]
- y_pred_proba_last,
-EPSILON
)
else:
raise ValueError(
"Invalid method. "
f"Allowed values are {self.valid_methods_}."
)
return y_pred, prediction_sets