Coverage Validity with MAPIE for Regression Task

This example verifies that conformal claims are valid in the MAPIE package when using the CP prefit/split methods.

This notebook is inspired of the notebook used for episode “Uncertainty Quantification: Avoid these Missteps in Validating Your Conformal Claims!” (link to the [orginal notebook](https://github.com/mtorabirad/MLBoost)).

For more details on theoretical guarantees:

[1] Vovk, Vladimir, Alexander Gammerman, and Glenn Shafer. “Algorithmic Learning in a Random World.” Springer Nature, 2022.

[2] Angelopoulos, Anastasios N., and Stephen Bates. “Conformal prediction: A gentle introduction.” Foundations and Trends® in Machine Learning 16.4 (2023): 494-591.

import numpy as np
import matplotlib.pyplot as plt

from sklearn.tree import DecisionTreeRegressor
from sklearn.datasets import make_regression
from sklearn.model_selection import ShuffleSplit, train_test_split

from mapie.regression import MapieRegressor
from mapie.conformity_scores import AbsoluteConformityScore
from mapie.metrics import regression_coverage_score_v2

from joblib import Parallel, delayed

import warnings
warnings.filterwarnings("ignore")
warnings.simplefilter("ignore", RuntimeWarning)
warnings.simplefilter("ignore", UserWarning)

Section 1: Comparison with the split conformalizer method (light version)

We propose here to implement a lighter version of split CP by calculating the quantile with a small correction according to [1]. We prepare the fit/calibration/test routine in order to calculate the average coverage over several simulations.

# Conformalizer Class
class StandardConformalizer():
    def __init__(
        self,
        pre_trained_model,
        non_conformity_func,
        delta
    ):
        # Initialize the conformalizer with required parameters
        self.estimator = pre_trained_model
        self.non_conformity_func = non_conformity_func
        self.delta = delta

    def _calculate_quantile(self, scores_calib):
        # Calculate the quantile value based on delta and non-conformity scores
        self.delta_cor = np.ceil(self.delta*(self.n_calib+1))/self.n_calib
        return np.quantile(scores_calib, self.delta_cor, method='lower')

    def _calibrate(self, X_calib, y_calib):
        # Calibrate the conformalizer to calculate q_hat
        y_calib_pred = self.estimator.predict(X_calib)
        scores_calib = self.non_conformity_func(y_calib_pred, y_calib)
        self.q_hat = self._calculate_quantile(scores_calib)

    def fit(self, X, y):
        # Fit the conformalizer to the data and calculate q_hat
        self.n_calib = X.shape[0]
        self._calibrate(X, y)
        return self

    def predict(self, X, alpha=None):
        # Returns the predicted interval
        y_pred = self.estimator.predict(X)
        y_lower, y_upper = y_pred - self.q_hat, y_pred + self.q_hat
        y_pis = np.expand_dims(np.stack([y_lower, y_upper], axis=1), axis=-1)
        return y_lower, y_pis


def non_conformity_func(y, y_hat):
    return np.abs(y - y_hat)


def get_coverage_prefit(
    conformalizer, data, target, delta, n_calib, random_state=None
):
    """
    Calculate the fraction of test samples within the predicted intervals.

    This function splits the data into a training set and a test set. If the
    cross-validation strategy of the mapie regressor is a ShuffleSplit, it fits
    the regressor to the entire training set. Otherwise, it further splits the
    training set into a calibration set and a training set, and fits the
    regressor to the calibration set. It then predicts intervals for the test
    set and calculates the fraction of test samples within these intervals.

    Parameters:
    -----------
    conformalizer: object
        A mapie regressor object.

    data: array-like of shape (n_samples, n_features)
        The data to be split into a training set and a test set.

    target: array-like of shape (n_samples,)
        The target values for the data.

    delta: float
        The level of confidence for the predicted intervals.

    Returns:
    --------
    fraction_within_bounds: float
        The fraction of test samples within the predicted intervals.
    """
    # Split data step
    X_cal, X_test, y_cal, y_test = train_test_split(
        data, target, train_size=n_calib, random_state=random_state
    )
    # Calibration step
    conformalizer.fit(X_cal, y_cal)
    # Prediction step
    _, y_pis = conformalizer.predict(X_test, alpha=1-delta)
    # Coverage step
    coverage = regression_coverage_score_v2(y_test, y_pis)

    return coverage


def cumulative_average(arr):
    """
    Calculate the cumulative average of a list of numbers.

    This function computes the cumulative average of a list of numbers by
    calculating the cumulative sum of the numbers and dividing it by the
    index of the current number.

    Parameters:
    -----------
    arr: List[float]
        The input list of numbers.

    Returns:
    --------
    running_avg: List[float]
        The cumulative average of the input list.
    """
    cumsum = np.cumsum(arr)
    indices = np.arange(1, len(arr) + 1)
    cumulative_avg = cumsum / indices
    return cumulative_avg

Experiment 1: Coverage Validity for a given delta, n_calib

To begin, we propose to use delta=0.8 and n_delta=6 and compare the coverage validity claim of the MAPIE class and the referenced class.

# Parameters of the modelisation
delta = 0.8
n_calib = 6

n_train = 1000
n_test = 1000
num_splits = 1000

# Load toy Data
n_all = n_train + n_calib + n_test
data, target = make_regression(n_all, random_state=1)

# Split dataset into training, calibration and validation sets
X_train, X_cal_test, y_train, y_cal_test = train_test_split(
    data, target, train_size=n_train, random_state=1
)

# Create a regression model and fit it to the training data
model = DecisionTreeRegressor()
model.fit(X_train, y_train)

# Compute theorical bounds and exact coverage to attempt
lower_bound = delta
upper_bound = (delta + 1/(n_calib+1))
upper_bound_2 = (delta + 1/(n_calib/2+1))
exact_cov = (np.ceil((n_calib+1)*delta))/(n_calib+1)

# Run the experiment
empirical_coverages_ref = []
empirical_coverages_mapie = []

for i in range(1, num_splits):
    # Compute empirical coverage for each trial with StandardConformalizer
    conformalizer = StandardConformalizer(model, non_conformity_func, delta)
    coverage = get_coverage_prefit(
        conformalizer, X_cal_test, y_cal_test, delta, n_calib, random_state=i
    )
    empirical_coverages_ref.append(coverage)

    # Compute empirical coverage for each trial with MapieRegressor
    conformalizer = MapieRegressor(estimator=model, cv="prefit")
    coverage = get_coverage_prefit(
        conformalizer, X_cal_test, y_cal_test, delta, n_calib, random_state=i
    )
    empirical_coverages_mapie.append(coverage)

cumulative_averages_ref = cumulative_average(empirical_coverages_ref)
cumulative_averages_mapie = cumulative_average(empirical_coverages_mapie)

# Plot the results
fig, ax = plt.subplots()
plt.plot(cumulative_averages_ref, alpha=0.5, label='SplitCP', color='r')
plt.plot(cumulative_averages_mapie, alpha=0.5, label='MAPIE', color='g')

plt.hlines(exact_cov, 0, num_splits, color='r', ls='--', label='Exact Cov.')
plt.hlines(lower_bound, 0, num_splits, color='k', label='Lower Bound')
plt.hlines(upper_bound, 0, num_splits, color='b', label='Upper Bound')

plt.xlabel(r'Split Number')
plt.ylabel(r'$\overline{\mathbb{C}}$')
plt.title(r'$|D_{cal}| = $' + str(n_calib) + r' and $\delta = $' + str(delta))

plt.legend(loc="upper right", ncol=2)
plt.ylim(0.7, 1)
plt.tight_layout()
plt.show()
$|D_{cal}| = $6 and $\delta = $0.8

It can be seen that the two curves overlap, proving that both methods produce the same results. Their effective coverage stabilizes between the theoretical limits, always above the target coverage and converges towards the exact coverage (i.e. expected according to the theory).

Experiment 2: Again but without fixing random_state

We just propose to reproduce the previous experiment without fixing the random_state. The methods therefore follow different trajectories but always achieve the expected coverage.

# Run the experiment
empirical_coverages_ref = []
empirical_coverages_mapie = []

for i in range(1, num_splits):
    # Compute empirical coverage for each trial with StandardConformalizer
    conformalizer = StandardConformalizer(model, non_conformity_func, delta)
    coverage = get_coverage_prefit(
        conformalizer, X_cal_test, y_cal_test, delta, n_calib
    )
    empirical_coverages_ref.append(coverage)

    # Compute empirical coverage for each trial with MapieRegressor
    conformalizer = MapieRegressor(estimator=model, cv="prefit")
    coverage = get_coverage_prefit(
        conformalizer, X_cal_test, y_cal_test, delta, n_calib
    )
    empirical_coverages_mapie.append(coverage)

cumulative_averages_ref = cumulative_average(empirical_coverages_ref)
cumulative_averages_mapie = cumulative_average(empirical_coverages_mapie)

# Plot the results
fig, ax = plt.subplots()
plt.plot(cumulative_averages_ref, alpha=0.5, label='SplitCP', color='r')
plt.plot(cumulative_averages_mapie, alpha=0.5, label='MAPIE', color='g')

plt.hlines(exact_cov, 0, num_splits, color='r', ls='--', label='Exact Cov.')
plt.hlines(lower_bound, 0, num_splits, color='k', label='Lower Bound')
plt.hlines(upper_bound, 0, num_splits, color='b', label='Upper Bound')

plt.xlabel(r'Split Number')
plt.ylabel(r'$\overline{\mathbb{C}}$')
plt.title(r'$|D_{cal}| = $' + str(n_calib) + r' and $\delta = $' + str(delta))

plt.legend(loc="upper right", ncol=2)
plt.ylim(0.7, 1)
plt.tight_layout()
plt.show()
$|D_{cal}| = $6 and $\delta = $0.8

Section 2: Comparison with different MAPIE CP methods

We propose to reproduce the previous experience with different methods of the MAPIE package (prefit, prefit with asymmetrical non-conformity scores and split).

def get_coverage_split(conformalizer, data, target, delta, random_state=None):
    """
    Calculate the fraction of test samples within the predicted intervals.

    This function splits the data into a training set and a test set. If the
    cross-validation strategy of the mapie regressor is a ShuffleSplit, it fits
    the regressor to the entire training set. Otherwise, it further splits the
    training set into a calibration set and a training set, and fits the
    regressor to the calibration set. It then predicts intervals for the test
    set and calculates the fraction of test samples within these intervals.

    Parameters:
    -----------
    conformalizer: object
        A mapie regressor object.

    data: array-like of shape (n_samples, n_features)
        The data to be split into a training set and a test set.

    target: array-like of shape (n_samples,)
        The target values for the data.

    delta: float
        The level of confidence for the predicted intervals.

    Returns:
    --------
    fraction_within_bounds: float
        The fraction of test samples within the predicted intervals.
    """
    # Split data step
    X_train_cal, X_test, y_train_cal, y_test = train_test_split(
        data, target, test_size=n_test
    )

    # Calibration step
    if isinstance(conformalizer, MapieRegressor) and \
            isinstance(conformalizer.cv, ShuffleSplit):
        conformalizer.fit(X_train_cal, y_train_cal)
    else:
        _, X_cal, _, y_cal = train_test_split(
            X_train_cal, y_train_cal, test_size=n_calib
        )
        conformalizer.fit(X_cal, y_cal)

    # Prediction step
    if isinstance(conformalizer, StandardConformalizer):
        _, y_pis = conformalizer.predict(X_test)
    else:
        _, y_pis = conformalizer.predict(X_test, alpha=1-delta)

    # Coverage step
    fraction_within_bounds = regression_coverage_score_v2(y_test, y_pis)

    return fraction_within_bounds


def run_get_coverage_split(model, params, n_calib, data, target, delta):
    if not params:
        ref_reg = StandardConformalizer(model, non_conformity_func, delta)
        return get_coverage_split(ref_reg, data, target, delta)
    try:
        mapie_reg = MapieRegressor(estimator=model, **params(n_calib))
        coverage = get_coverage_split(mapie_reg, data, target, delta)
    except Exception:
        coverage = np.nan
    return coverage


STRATEGIES = {
    "reference": None,
    "prefit": lambda n: dict(
        method="base",
        cv="prefit",
        conformity_score=AbsoluteConformityScore(sym=True)
    ),
    "prefit_asym": lambda n: dict(
        method="base",
        cv="prefit",
        conformity_score=AbsoluteConformityScore(sym=False)
    ),
}

Experiment 3: Again but with different MAPIE CP methods

The methods always follow different trajectories but always achieve the expected coverage. Since asymmetric scores can be used, the limits are not exactly the same. We should calculate them differently but that doesn’t change our conclusion.

# Parameters of the modelisation
delta = 0.8
n_calib = 12  # for asymmetric non-conformity scores
num_splits = 1000

# Run the experiment
cumulative_averages_dict = dict()

for method, params in STRATEGIES.items():
    coverages_list = []
    run_params = model, params, n_calib, data, target, delta
    coverages_list = Parallel(n_jobs=-1)(
        delayed(run_get_coverage_split)(*run_params)
        for _ in range(num_splits)
    )
    cumulative_averages_dict[method] = cumulative_average(coverages_list)

# Plot the results
fig, ax = plt.subplots()
for method in STRATEGIES:
    plt.plot(cumulative_averages_dict[method], alpha=0.5, label=method)

plt.hlines(exact_cov, 0, num_splits, color='r', ls='--', label='Exact Cov.')
plt.hlines(lower_bound, 0, num_splits, color='k', label='Lower Bound')
plt.hlines(upper_bound, 0, num_splits, color='b', label='Upper Bound')

plt.xlabel(r'Split Number')
plt.ylabel(r'$\overline{\mathbb{C}}$')
plt.title(r'$|D_{cal}| = $' + str(n_calib) + r' and $\delta = $' + str(delta))

plt.legend(loc="upper right", ncol=2)
plt.ylim(0.7, 1)
plt.tight_layout()
plt.show()
$|D_{cal}| = $12 and $\delta = $0.8

Experiment 4: Extensive experimentation on different delta and n_calib

Here we propose to extend the experiment on different sizes of the calibration dataset and target coverage. We show the influence of size on effective coverage. In particular, we see that the expected coverage fluctuates between the limits with respect to the size of the calibration dataset but continues to converge towards the target coverage. It can be noted that all methods follow this trajectory and continue to achieve coverage validity.

num_splits = 100

nc_min, nc_max = 10, 30
n_calib_array = np.arange(nc_min, nc_max+1, 2)
delta = 0.8
delta_array = [delta]

final_coverage_dict = {
    method: {delta: [] for delta in delta_array}
    for method in STRATEGIES
}
effective_coverage_dict = {
    method: {delta: [] for delta in delta_array}
    for method in STRATEGIES
}

# Run experiment
for method, params in STRATEGIES.items():
    for n_calib in n_calib_array:
        coverages_list = []
        run_params = model, params, n_calib, data, target, delta
        coverages_list = Parallel(n_jobs=-1)(
            delayed(run_get_coverage_split)(*run_params)
            for _ in range(num_splits)
        )
        coverages_list = np.array(coverages_list)
        final_coverage = cumulative_average(coverages_list)[-1]
        final_coverage_dict[method][delta].append(final_coverage)


# Theorical bounds and exact coverage to attempt
def lower_bound_fct(delta):
    return delta * np.ones_like(n_calib_array)


def upper_bound_fct(delta):
    return delta + 1/(n_calib_array)


def upper_bound_asym_fct(delta):
    return delta + 1/(n_calib_array//2)


def exact_coverage_fct(delta):
    return np.ceil((n_calib_array+1)*delta)/(n_calib_array+1)


def exact_coverage_asym_fct(delta):
    new_n = n_calib_array//2-1
    return np.ceil((new_n+1)*delta)/(new_n+1)


# Plot the results
n_strat = len(final_coverage_dict)
nrows, ncols = n_strat, 1

fig, ax = plt.subplots(nrows=nrows, ncols=ncols)

for i, method in enumerate(final_coverage_dict):
    # Compute the different bounds, target
    cov = final_coverage_dict[method][delta]
    ub = upper_bound_fct(delta)
    lb = lower_bound_fct(delta)
    exact_cov = exact_coverage_fct(delta)
    if 'asym' in method:
        ub = upper_bound_asym_fct(delta)
        exact_cov = exact_coverage_asym_fct(delta)
    ub = np.clip(ub, a_min=0, a_max=1)
    lb = np.clip(lb, a_min=0, a_max=1)

    # Plot the results
    ax[i].plot(n_calib_array, cov, alpha=0.5, label=method, color='g')
    ax[i].plot(n_calib_array, lb, color='k', label='Lower Bound')
    ax[i].plot(n_calib_array, ub, color='b', label='Upper Bound')
    ax[i].plot(n_calib_array, exact_cov, color='g', ls='--', label='Exact Cov')
    ax[i].hlines(delta, nc_min, nc_max, color='r', ls='--', label='Target Cov')

    ax[i].legend(loc="upper right", ncol=2)
    ax[i].set_ylim(np.min(lb) - 0.05, 1.0)
    ax[i].set_xlabel(r'$n_{calib}$')
    ax[i].set_ylabel(r'$\overline{\mathbb{C}}$')

fig.suptitle(r'$\delta = $' + str(delta))
plt.show()
$\delta = $0.8

Total running time of the script: ( 0 minutes 23.336 seconds)

Gallery generated by Sphinx-Gallery