from __future__ import annotations
import warnings
from typing import Iterable, List, Optional, Tuple, Union, cast
import numpy as np
from sklearn.base import RegressorMixin, clone
from sklearn.linear_model import QuantileRegressor
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.utils import check_random_state
from sklearn.utils.validation import (_check_y, _num_samples, check_is_fitted,
indexable)
from mapie._compatibility import np_quantile
from mapie._typing import ArrayLike, NDArray
from mapie.utils import (check_alpha_and_n_samples,
check_defined_variables_predict_cqr,
check_estimator_fit_predict, check_lower_upper_bounds,
check_null_weight, fit_estimator)
from .regression import MapieRegressor
[docs]class MapieQuantileRegressor(MapieRegressor):
"""
This class implements the conformalized quantile regression strategy
as proposed by Romano et al. (2019) to make conformal predictions.
The only valid ``method`` is ``"quantile"`` and the only valid
``cv`` is ``"split"``.
Parameters
----------
estimator : Optional[RegressorMixin]
Any regressor with scikit-learn API
(i.e. with ``fit`` and ``predict`` methods).
If ``None``, estimator defaults to a ``QuantileRegressor`` instance.
By default ``"None"``.
method: str
Method to choose for prediction, in this case, the only valid method
is the ``"quantile"`` method.
By default ``"quantile"``.
cv: Optional[str]
The cross-validation strategy for computing conformity scores.
In theory a split method is implemented as it is needed to provide
both a training and calibration set.
By default ``None``.
alpha: float
Between ``0.0`` and ``1.0``, represents the risk level of the
confidence interval.
Lower ``alpha`` produce larger (more conservative) prediction
intervals.
``alpha`` is the complement of the target coverage level.
By default ``0.1``.
Attributes
----------
valid_methods_: List[str]
List of all valid methods.
single_estimator_: RegressorMixin
Estimator fitted on the whole training set.
estimators_: List[RegressorMixin]
- [0]: Estimator with quantile value of alpha/2
- [1]: Estimator with quantile value of 1 - alpha/2
- [2]: Estimator with quantile value of 0.5
conformity_scores_: NDArray of shape (n_samples_train, 3)
Conformity scores between ``y_calib`` and ``y_pred``.
- [:, 0]: for ``y_calib`` coming from prediction estimator
with quantile of alpha/2
- [:, 1]: for ``y_calib`` coming from prediction estimator
with quantile of 1 - alpha/2
- [:, 2]: maximum of those first two scores
n_calib_samples: int
Number of samples in the calibration dataset.
References
----------
Yaniv Romano, Evan Patterson and Emmanuel J. Candès.
"Conformalized Quantile Regression"
Advances in neural information processing systems 32 (2019).
Examples
--------
>>> import numpy as np
>>> from mapie.regression import MapieQuantileRegressor
>>> X_train = np.array([[0], [1], [2], [3], [4], [5]])
>>> y_train = np.array([5, 7.5, 9.5, 10.5, 12.5, 15])
>>> X_calib = np.array([[0], [1], [2], [3], [4], [5], [6], [7], [8], [9]])
>>> y_calib = np.array([5, 7, 9, 4, 8, 1, 5, 7.5, 9.5, 12])
>>> mapie_reg = MapieQuantileRegressor().fit(
... X_train,
... y_train,
... X_calib=X_calib,
... y_calib=y_calib
... )
>>> y_pred, y_pis = mapie_reg.predict(X_train)
>>> print(y_pis[:, :, 0])
[[-8.16666667 19. ]
[-6.33333333 20.83333333]
[-4.5 22.66666667]
[-2.66666667 24.5 ]
[-0.83333333 26.33333333]
[ 1. 28.16666667]]
>>> print(y_pred)
[ 5. 7. 9. 11. 13. 15.]
"""
valid_methods_ = ["quantile"]
fit_attributes = [
"estimators_",
"conformity_scores_",
"n_calib_samples",
]
quantile_estimator_params = {
"GradientBoostingRegressor": {
"loss_name": "loss",
"alpha_name": "alpha"
},
"QuantileRegressor": {
"loss_name": "quantile",
"alpha_name": "quantile"
},
"HistGradientBoostingRegressor": {
"loss_name": "loss",
"alpha_name": "quantile"
},
"LGBMRegressor": {
"loss_name": "objective",
"alpha_name": "alpha"
},
}
[docs] def __init__(
self,
estimator: Optional[
Union[
RegressorMixin,
Pipeline,
List[Union[RegressorMixin, Pipeline]]
]
] = None,
method: str = "quantile",
cv: Optional[str] = None,
alpha: float = 0.1,
) -> None:
super().__init__(
estimator=estimator,
method=method,
)
self.cv = cv
self.alpha = alpha
def _check_alpha(
self,
alpha: float = 0.1,
) -> NDArray:
"""
Perform several checks on the alpha value and changes it from
a float to an ArrayLike.
Parameters
----------
alpha : float
Can only be a float value between ``0.0`` and ``1.0``.
Represent the risk level of the confidence interval.
Lower alpha produce larger (more conservative) prediction
intervals. Alpha is the complement of the target coverage level.
By default ``0.1``.
Returns
-------
ArrayLike
An ArrayLike of three values:
- [0]: alpha value of alpha/2
- [1]: alpha value of of 1 - alpha/2
- [2]: alpha value of 0.5
Raises
------
ValueError
If alpha is not a float.
ValueError
If the value of ``alpha`` is not between ``0.0`` and ``1.0``.
"""
if self.cv == "prefit":
warnings.warn(
"WARNING: The alpha that is set needs to be the same"
+ " as the alpha of your prefitted model in the following"
" order [alpha/2, 1 - alpha/2, 0.5]"
)
if isinstance(alpha, float):
if np.any(np.logical_or(alpha <= 0, alpha >= 1.0)):
raise ValueError(
"Invalid alpha. Allowed values are between 0.0 and 1.0."
)
else:
alpha_np = np.array([alpha / 2, 1 - alpha / 2, 0.5])
else:
raise ValueError(
"Invalid alpha. Allowed values are float."
)
return alpha_np
def _check_estimator(
self,
estimator: Optional[Union[RegressorMixin, Pipeline]] = None,
) -> Union[RegressorMixin, Pipeline]:
"""
Perform several checks on the estimator to check if it has
all the required specifications to be used with this methodology.
The estimators that can be used in MapieQuantileRegressor need to
have a ``fit`` and ``predict`` attribute, but also need to allow
a quantile loss and therefore also setting a quantile value.
Note that there is a ``TypedDict`` to check which methods allow for
quantile regression.
Parameters
----------
estimator : Optional[RegressorMixin], optional
Estimator to check, by default ``None``.
Returns
-------
RegressorMixin
The estimator itself or a default ``QuantileRegressor`` instance
with ``solver`` set to "highs".
Raises
------
ValueError
If the estimator implements ``fit`` or ``predict`` methods.
ValueError
We check if it's a known estimator that does quantile regression
according to the dictionnary set quantile_estimator_params.
This dictionnary will need to be updated with the latest new
available estimators.
ValueError
The estimator does not have the ``"loss_name"`` in its parameters
and therefore can not be used as an estimator.
ValueError
There is no quantile ``"loss_name"`` and therefore this estimator
can not be used as a ``MapieQuantileRegressor``.
ValueError
The parameter to set the alpha value does not exist in this
estimator and therefore we cannot use it.
"""
if estimator is None:
return QuantileRegressor(
solver="highs-ds",
alpha=0.0,
)
check_estimator_fit_predict(estimator)
if isinstance(estimator, Pipeline):
self._check_estimator(estimator[-1])
return estimator
else:
name_estimator = estimator.__class__.__name__
if name_estimator == "QuantileRegressor":
return estimator
else:
if name_estimator in self.quantile_estimator_params:
param_estimator = estimator.get_params()
loss_name, alpha_name = self.quantile_estimator_params[
name_estimator
].values()
if loss_name in param_estimator:
if param_estimator[loss_name] != "quantile":
raise ValueError(
"You need to set the loss/objective argument"
+ " of your base model to ``quantile``."
)
else:
if alpha_name in param_estimator:
return estimator
else:
raise ValueError(
"The matching parameter `alpha_name` for"
" estimator does not exist. "
"Make sure you set it when initializing "
"your estimator."
)
else:
raise ValueError(
"The matching parameter `loss_name` for"
+ " estimator does not exist."
)
else:
raise ValueError(
"The base model does not seem to be accepted"
+ " by MapieQuantileRegressor. \n"
"Give a base model among: \n"
f"{self.quantile_estimator_params.keys()} "
"Or, add your base model to"
+ " ``quantile_estimator_params``."
)
def _check_cv(
self,
cv: Optional[str] = None
) -> str:
"""
Check if cv argument is ``None``, ``"split"`` or ``"prefit"``.
Parameters
----------
cv : Optional[str], optional
cv to check, by default ``None``.
Returns
-------
str
cv itself or a default ``"split"``.
Raises
------
ValueError
Raises an error if the cv is anything else but the method
``"split"`` or ``"prefit"``.
Only the split method has been implemented.
"""
if cv is None:
return "split"
if cv in ("split", "prefit"):
return cv
else:
raise ValueError(
"Invalid cv method, only valid method is ``split``."
)
def _check_calib_set(
self,
X: ArrayLike,
y: ArrayLike,
sample_weight: Optional[ArrayLike] = None,
X_calib: Optional[ArrayLike] = None,
y_calib: Optional[ArrayLike] = None,
calib_size: Optional[float] = 0.3,
random_state: Optional[Union[int, np.random.RandomState, None]] = None,
shuffle: Optional[bool] = True,
stratify: Optional[ArrayLike] = None,
) -> Tuple[
ArrayLike, ArrayLike, ArrayLike, ArrayLike, Optional[ArrayLike]
]:
"""
Check if a calibration set has already been defined, if not, then
we define one using the ``train_test_split`` method.
Parameters
----------
Same definition of parameters as for the ``fit`` method.
Returns
-------
Tuple[ArrayLike, ArrayLike, ArrayLike, ArrayLike, ArrayLike]
- [0]: ArrayLike of shape (n_samples_*(1-calib_size), n_features)
X_train
- [1]: ArrayLike of shape (n_samples_*(1-calib_size),)
y_train
- [2]: ArrayLike of shape (n_samples_*calib_size, n_features)
X_calib
- [3]: ArrayLike of shape (n_samples_*calib_size,)
y_calib
- [4]: ArrayLike of shape (n_samples_,)
sample_weight_train
"""
if X_calib is None or y_calib is None:
if sample_weight is None:
X_train, X_calib, y_train, y_calib = train_test_split(
X,
y,
test_size=calib_size,
random_state=random_state,
shuffle=shuffle,
stratify=stratify
)
sample_weight_train = sample_weight
else:
(
X_train,
X_calib,
y_train,
y_calib,
sample_weight_train,
_,
) = train_test_split(
X,
y,
sample_weight,
test_size=calib_size,
random_state=random_state,
shuffle=shuffle,
stratify=stratify
)
else:
X_train, y_train, sample_weight_train = X, y, sample_weight
X_train, X_calib = cast(ArrayLike, X_train), cast(ArrayLike, X_calib)
y_train, y_calib = cast(ArrayLike, y_train), cast(ArrayLike, y_calib)
sample_weight_train = cast(ArrayLike, sample_weight_train)
return X_train, y_train, X_calib, y_calib, sample_weight_train
def _check_prefit_params(
self,
estimator: List[Union[RegressorMixin, Pipeline]],
) -> None:
"""
Check the parameters set for the specific case of prefit
estimators.
Parameters
----------
estimator : List[Union[RegressorMixin, Pipeline]]
List of three prefitted estimators that should have
pre-defined quantile levels of alpha/2, 1 - alpha/2 and 0.5.
Raises
------
ValueError
If a non-iterable variable is provided for estimator.
ValueError
If less or more than three models are defined.
Warning
If the alpha is defined, warns the user that it must be set
accordingly with the prefit estimators.
"""
if isinstance(estimator, Iterable) is False:
raise ValueError(
"Estimator for prefit must be an iterable object."
)
if len(estimator) == 3:
for est in estimator:
check_estimator_fit_predict(est)
check_is_fitted(est)
else:
raise ValueError(
"You need to have provided 3 different estimators, they"
" need to be preset with alpha values in the following"
" order [alpha/2, 1 - alpha/2, 0.5]."
)
[docs] def fit(
self,
X: ArrayLike,
y: ArrayLike,
sample_weight: Optional[ArrayLike] = None,
groups: Optional[ArrayLike] = None,
X_calib: Optional[ArrayLike] = None,
y_calib: Optional[ArrayLike] = None,
calib_size: Optional[float] = 0.3,
random_state: Optional[Union[int, np.random.RandomState]] = None,
shuffle: Optional[bool] = True,
stratify: Optional[ArrayLike] = None,
**fit_params,
) -> MapieQuantileRegressor:
"""
Fit estimator and compute residuals used for prediction intervals.
All the clones of the estimators for different quantile values are
stored in order alpha/2, 1 - alpha/2, 0.5 in the ``estimators_``
attribute. Residuals for the first two estimators and the maximum
of residuals among these residuals are stored in the
``conformity_scores_`` attribute.
Parameters
----------
X: ArrayLike of shape (n_samples, n_features)
Training data.
y: ArrayLike of shape (n_samples,)
Training labels.
sample_weight: Optional[ArrayLike] of shape (n_samples,)
Sample weights for fitting the out-of-fold models.
If ``None``, then samples are equally weighted.
If some weights are null,
their corresponding observations are removed
before the fitting process and hence have no residuals.
If weights are non-uniform, residuals are still uniformly weighted.
Note that the sample weight defined are only for the training, not
for the calibration procedure.
By default ``None``.
groups: Optional[ArrayLike] of shape (n_samples,)
Always ignored, exists for compatibility.
X_calib: Optional[ArrayLike] of shape (n_calib_samples, n_features)
Calibration data.
y_calib: Optional[ArrayLike] of shape (n_calib_samples,)
Calibration labels.
calib_size: Optional[float]
If ``X_calib`` and ``y_calib`` are not defined,
then the calibration dataset is created with the split
defined by ``calib_size``.
random_state: Optional[Union[int, np.random.RandomState]], default=None
For the ``sklearn.model_selection.train_test_split`` documentation.
Controls the shuffling applied to the data before applying the
split.
Pass an int for reproducible output across multiple function calls.
See :term:`Glossary <random_state>`.
By default ``None``.
shuffle: bool, default=True
For the ``sklearn.model_selection.train_test_split`` documentation.
Whether or not to shuffle the data before splitting.
If ``shuffle=False`` then stratify must be None.
By default ``True``.
stratify: array-like, default=None
For the ``sklearn.model_selection.train_test_split`` documentation.
If not ``None``, data is split in a stratified fashion, using this
as the class labels.
Read more in the :ref:`User Guide <stratification>`.
By default ``None``.
**fit_params : dict
Additional fit parameters.
Returns
-------
MapieQuantileRegressor
The model itself.
"""
self.cv = self._check_cv(cast(str, self.cv))
# Initialization
self.estimators_: List[RegressorMixin] = []
if self.cv == "prefit":
estimator = cast(List, self.estimator)
alpha = self._check_alpha(self.alpha)
self._check_prefit_params(estimator)
X_calib, y_calib = indexable(X, y)
self.n_calib_samples = _num_samples(y_calib)
y_calib_preds = np.full(
shape=(3, self.n_calib_samples),
fill_value=np.nan
)
for i, est in enumerate(estimator):
self.estimators_.append(est)
y_calib_preds[i] = est.predict(X_calib).ravel()
self.single_estimator_ = self.estimators_[2]
else:
# Checks
self._check_parameters()
checked_estimator = self._check_estimator(self.estimator)
alpha = self._check_alpha(self.alpha)
X, y = indexable(X, y)
random_state = check_random_state(random_state)
results = self._check_calib_set(
X,
y,
sample_weight,
X_calib,
y_calib,
calib_size,
random_state,
shuffle,
stratify,
)
X_train, y_train, X_calib, y_calib, sample_weight_train = results
X_train, y_train = indexable(X_train, y_train)
X_calib, y_calib = indexable(X_calib, y_calib)
y_train, y_calib = _check_y(y_train), _check_y(y_calib)
self.n_calib_samples = _num_samples(y_calib)
check_alpha_and_n_samples(self.alpha, self.n_calib_samples)
sample_weight_train, X_train, y_train = check_null_weight(
sample_weight_train,
X_train,
y_train
)
y_train = cast(NDArray, y_train)
y_calib_preds = np.full(
shape=(3, self.n_calib_samples),
fill_value=np.nan
)
if isinstance(checked_estimator, Pipeline):
estimator = checked_estimator[-1]
else:
estimator = checked_estimator
name_estimator = estimator.__class__.__name__
alpha_name = self.quantile_estimator_params[
name_estimator
]["alpha_name"]
for i, alpha_ in enumerate(alpha):
cloned_estimator_ = clone(checked_estimator)
params = {alpha_name: alpha_}
if isinstance(checked_estimator, Pipeline):
cloned_estimator_[-1].set_params(**params)
else:
cloned_estimator_.set_params(**params)
self.estimators_.append(fit_estimator(
cloned_estimator_,
X_train,
y_train,
sample_weight_train,
**fit_params,
)
)
y_calib_preds[i] = self.estimators_[-1].predict(X_calib)
self.single_estimator_ = self.estimators_[2]
self.conformity_scores_ = np.full(
shape=(3, self.n_calib_samples),
fill_value=np.nan
)
self.conformity_scores_[0] = y_calib_preds[0] - y_calib
self.conformity_scores_[1] = y_calib - y_calib_preds[1]
self.conformity_scores_[2] = np.max(
[
self.conformity_scores_[0],
self.conformity_scores_[1]
], axis=0
)
return self
[docs] def predict(
self,
X: ArrayLike,
ensemble: bool = False,
alpha: Optional[Union[float, Iterable[float]]] = None,
optimize_beta: bool = False,
allow_infinite_bounds: bool = False,
symmetry: Optional[bool] = True,
**predict_params,
) -> Union[NDArray, Tuple[NDArray, NDArray]]:
"""
Predict target on new samples with confidence intervals.
Residuals from the training set and predictions from the model clones
are central to the computation.
Prediction Intervals for a given ``alpha`` are deduced from the
quantile regression at the alpha values: alpha/2, 1 - (alpha/2)
while adding a constant based uppon their residuals.
Parameters
----------
X: ArrayLike of shape (n_samples, n_features)
Test data.
ensemble: bool
Ensemble has not been defined in predict and therefore should
will not have any effects in this method.
alpha: Optional[Union[float, Iterable[float]]]
For ``MapieQuantileRegresor`` the alpha has to be defined
directly in initial arguments of the class.
symmetry: Optional[bool]
Deciding factor to whether to find the quantile value for
each residuals separatly or to use the maximum of the two
combined.
predict_params : dict
Additional predict parameters.
Returns
-------
Union[NDArray, Tuple[NDArray, NDArray]]
- NDArray of shape (n_samples,) if ``alpha`` is ``None``.
- Tuple[NDArray, NDArray] of shapes (n_samples,) and
(n_samples, 2, n_alpha) if ``alpha`` is not ``None``.
- [:, 0, :]: Lower bound of the prediction interval.
- [:, 1, :]: Upper bound of the prediction interval.
"""
check_is_fitted(self, self.fit_attributes)
check_defined_variables_predict_cqr(ensemble, alpha)
alpha = self.alpha if symmetry else self.alpha/2
check_alpha_and_n_samples(alpha, self.n_calib_samples)
n = self.n_calib_samples
q = (1 - (alpha)) * (1 + (1 / n))
y_preds = np.full(
shape=(3, _num_samples(X)),
fill_value=np.nan,
dtype=float,
)
for i, est in enumerate(self.estimators_):
y_preds[i] = est.predict(X, **predict_params)
check_lower_upper_bounds(y_preds[0], y_preds[1], y_preds[2])
if symmetry:
quantile = np.full(
2,
np_quantile(
self.conformity_scores_[2], q, method="higher"
)
)
else:
quantile = np.array(
[
np_quantile(
self.conformity_scores_[0], q, method="higher"
),
np_quantile(
self.conformity_scores_[1], q, method="higher"
)
]
)
y_pred_low = y_preds[0][:, np.newaxis] - quantile[0]
y_pred_up = y_preds[1][:, np.newaxis] + quantile[1]
check_lower_upper_bounds(y_pred_low, y_pred_up, y_preds[2])
return y_preds[2], np.stack([y_pred_low, y_pred_up], axis=1)