Source code for mapie.risk_control.risks

from __future__ import annotations

from typing import Callable, Tuple, cast

import numpy as np
from numpy.typing import NDArray
from sklearn.utils.validation import column_or_1d


def compute_risk_recall(lambdas: NDArray, y_pred_proba: NDArray, y: NDArray) -> NDArray:
    """
    In `PrecisionRecallController` when `metric_control=recall`,
    compute the recall per observation for each different
    thresholds lambdas.

    Parameters
    ----------
    y_pred_proba: NDArray of shape (n_samples, n_labels, 1)
        Predicted probabilities for each label and each observation.

    y: NDArray of shape (n_samples, n_labels)
        True labels.

    lambdas: NDArray of shape (n_lambdas, )
        Threshold that permit to compute recall.

    Returns
    -------
    NDArray of shape (n_samples, n_labels, n_lambdas)
        Risks for each observation and each value of lambda.
    """
    if y_pred_proba.ndim != 3:
        raise ValueError(
            "y_pred_proba should be a 3d array, got an array of shape "
            "{} instead.".format(y_pred_proba.shape)
        )
    if y.ndim != 2:
        raise ValueError(
            "y should be a 2d array, got an array of shape {} instead.".format(
                y_pred_proba.shape
            )
        )
    if not np.array_equal(y_pred_proba.shape[:-1], y.shape):
        raise ValueError("y and y_pred_proba could not be broadcast.")
    lambdas = cast(NDArray, column_or_1d(lambdas))

    n_lambdas = len(lambdas)
    y_pred_proba_repeat = np.repeat(y_pred_proba, n_lambdas, axis=2)
    y_pred_th = (y_pred_proba_repeat > lambdas).astype(int)

    y_repeat = np.repeat(y[..., np.newaxis], n_lambdas, axis=2)
    risks = 1 - (_true_positive(y_pred_th, y_repeat) / y.sum(axis=1)[:, np.newaxis])
    return risks


def compute_risk_precision(
    lambdas: NDArray, y_pred_proba: NDArray, y: NDArray
) -> NDArray:
    """
    In `PrecisionRecallController` when `metric_control=precision`,
    compute the precision per observation for each different
    thresholds lambdas.

    Parameters
    ----------
    y_pred_proba: NDArray of shape (n_samples, n_labels, 1)
        Predicted probabilities for each label and each observation.

    y: NDArray of shape (n_samples, n_labels)
        True labels.

    lambdas: NDArray of shape (n_lambdas, )
        Threshold that permit to compute precision score.

    Returns
    -------
    NDArray of shape (n_samples, n_labels, n_lambdas)
        Risks for each observation and each value of lambda.
    """
    if y_pred_proba.ndim != 3:
        raise ValueError(
            "y_pred_proba should be a 3d array, got an array of shape "
            "{} instead.".format(y_pred_proba.shape)
        )
    if y.ndim != 2:
        raise ValueError(
            "y should be a 2d array, got an array of shape {} instead.".format(
                y_pred_proba.shape
            )
        )
    if not np.array_equal(y_pred_proba.shape[:-1], y.shape):
        raise ValueError("y and y_pred_proba could not be broadcast.")
    lambdas = cast(NDArray, column_or_1d(lambdas))

    n_lambdas = len(lambdas)
    y_pred_proba_repeat = np.repeat(y_pred_proba, n_lambdas, axis=2)
    y_pred_th = (y_pred_proba_repeat > lambdas).astype(int)

    y_repeat = np.repeat(y[..., np.newaxis], n_lambdas, axis=2)
    with np.errstate(divide="ignore", invalid="ignore"):
        risks = 1 - _true_positive(y_pred_th, y_repeat) / y_pred_th.sum(axis=1)
    risks[np.isnan(risks)] = 1  # nan value indicate high risks.

    return risks


def _true_positive(y_pred_th: NDArray, y_repeat: NDArray) -> NDArray:
    """
    Compute the number of true positive.

    Parameters
    ----------
    y_pred_proba : NDArray of shape (n_samples, n_labels, 1)
        Predicted probabilities for each label and each observation.

    y: NDArray of shape (n_samples, n_labels)
        True labels.

    Returns
    -------
    tp: float
        The number of true positive.
    """
    tp = (y_pred_th * y_repeat).sum(axis=1)
    return tp


[docs] class BinaryClassificationRisk: """ Define a risk (or a performance metric) to be used with the BinaryClassificationController. Predefined instances are implemented, see :data:`mapie.risk_control.precision`, :data:`mapie.risk_control.recall`, :data:`mapie.risk_control.accuracy` and :data:`mapie.risk_control.false_positive_rate`. Here, a binary classification risk (or performance) is defined by an occurrence and a condition. Let's take the example of precision. Precision is the sum of true positives over the total number of predicted positives. In other words, precision is the average of correct predictions (occurrence) given that those predictions are positive (condition). Programmatically, ``precision = (sum(y_pred == y_true) if y_pred == 1)/sum(y_pred == 1)``. Because precision is a performance metric rather than a risk, `higher_is_better` must be set to `True`. See the implementation of `precision` in mapie.risk_control. Note: any risk or performance metric that can be defined as ``sum(occurrence if condition) / sum(condition)`` can be theoretically controlled with the BinaryClassificationController, thanks to the LearnThenTest framework [1] and the binary Hoeffding-Bentkus p-values implemented in MAPIE. Note: by definition, the value of the risk (or performance metric) here is always between 0 and 1. Parameters ---------- risk_occurrence : Callable[[int, int], bool] A function defining the occurrence of the risk for a given sample. Must take y_true and y_pred as input and return a boolean. risk_condition : Callable[[int, int], bool] A function defining the condition of the risk for a given sample, Must take y_true and y_pred as input and return a boolean. higher_is_better : bool Whether this BinaryClassificationRisk instance is a risk (higher_is_better=False) or a performance metric (higher_is_better=True). Attributes ---------- higher_is_better : bool See params. References ---------- [1] Angelopoulos, Anastasios N., Stephen, Bates, Emmanuel J. Candès, et al. "Learn Then Test: Calibrating Predictive Algorithms to Achieve Risk Control." (2022) """
[docs] def __init__( self, risk_occurrence: Callable[[int, int], bool], risk_condition: Callable[[int, int], bool], higher_is_better: bool, ): self._risk_occurrence = risk_occurrence self._risk_condition = risk_condition self.higher_is_better = higher_is_better
[docs] def get_value_and_effective_sample_size( self, y_true: NDArray, y_pred: NDArray, ) -> Tuple[float, int]: """ Computes the value of a risk given an array of ground truth labels and the corresponding predictions. Also returns the number of samples used to compute that value. That number can be different from the total number of samples. For example, in the case of precision, only the samples with positive predictions are used. In the case of a performance metric, this function returns 1 - perf_value. Parameters ---------- y_true : NDArray NDArray of ground truth labels, of shape (n_samples,), with values in {0, 1} y_pred : NDArray NDArray of predictions, of shape (n_samples,), with values in {0, 1} Returns ------- Tuple[float, int] A tuple containing the value of the risk between 0 and 1, and the number of effective samples used to compute that value (between 1 and n_samples). In the case of a performance metric, this function returns 1 - perf_value. If the risk is not defined (condition never met), the value is set to 1, and the number of effective samples is set to -1. """ risk_occurrences = np.array( [ self._risk_occurrence(y_true_i, y_pred_i) for y_true_i, y_pred_i in zip(y_true, y_pred) ] ) risk_conditions = np.array( [ self._risk_condition(y_true_i, y_pred_i) for y_true_i, y_pred_i in zip(y_true, y_pred) ] ) effective_sample_size = len(y_true) - np.sum(~risk_conditions) # Casting needed for MyPy with Python 3.9 effective_sample_size_int = cast(int, effective_sample_size) if effective_sample_size_int != 0: risk_sum: int = np.sum(risk_occurrences[risk_conditions]) risk_value = risk_sum / effective_sample_size_int else: # In this case, the corresponding lambda shouldn't be considered valid. # In the current LTT implementation, providing n_obs=-1 will result # in an infinite p_value, effectively invaliding the lambda risk_value, effective_sample_size_int = 1, -1 if self.higher_is_better: risk_value = 1 - risk_value return risk_value, effective_sample_size_int
precision = BinaryClassificationRisk( risk_occurrence=lambda y_true, y_pred: y_pred == y_true, risk_condition=lambda y_true, y_pred: y_pred == 1, higher_is_better=True, ) accuracy = BinaryClassificationRisk( risk_occurrence=lambda y_true, y_pred: y_pred == y_true, risk_condition=lambda y_true, y_pred: True, higher_is_better=True, ) recall = BinaryClassificationRisk( risk_occurrence=lambda y_true, y_pred: y_pred == y_true, risk_condition=lambda y_true, y_pred: y_true == 1, higher_is_better=True, ) false_positive_rate = BinaryClassificationRisk( risk_occurrence=lambda y_true, y_pred: y_pred == 1, risk_condition=lambda y_true, y_pred: y_true == 0, higher_is_better=False, )