Source code for mapie.regression.time_series_regression

from __future__ import annotations

from typing import Iterable, Optional, Tuple, Union, cast

import numpy as np
from sklearn.base import RegressorMixin
from sklearn.model_selection import BaseCrossValidator
from sklearn.utils.validation import check_is_fitted

from mapie._compatibility import np_nanquantile
from mapie._typing import ArrayLike, NDArray
from mapie.aggregation_functions import aggregate_all
from .regression import MapieRegressor
from mapie.utils import check_alpha, check_alpha_and_n_samples


[docs]class MapieTimeSeriesRegressor(MapieRegressor): """ Prediction intervals with out-of-fold residuals for time series. This class implements the EnbPI strategy for estimating prediction intervals on single-output time series. The only valid ``method`` is ``"enbpi"``. Actually, EnbPI only corresponds to ``MapieTimeSeriesRegressor`` if the ``cv`` argument is of type ``BlockBootstrap``. References ---------- Chen Xu, and Yao Xie. "Conformal prediction for dynamic time-series." https://arxiv.org/abs/2010.09107 """ cv_need_agg_function_ = MapieRegressor.cv_need_agg_function_ \ + ["BlockBootstrap"] valid_methods_ = ["enbpi"]
[docs] def __init__( self, estimator: Optional[RegressorMixin] = None, method: str = "enbpi", cv: Optional[Union[int, str, BaseCrossValidator]] = None, n_jobs: Optional[int] = None, agg_function: Optional[str] = "mean", verbose: int = 0, random_state: Optional[Union[int, np.random.RandomState]] = None, ) -> None: super().__init__( estimator=estimator, method=method, cv=cv, n_jobs=n_jobs, agg_function=agg_function, verbose=verbose, random_state=random_state )
def _relative_conformity_scores( self, X: ArrayLike, y: ArrayLike, ) -> NDArray: """ Compute the conformity scores on a data set. Parameters ---------- X : ArrayLike of shape (n_samples, n_features) Input data. y : ArrayLike of shape (n_samples,) Input labels. Returns ------- The conformity scores corresponding to the input data set. """ y_pred, _ = super().predict(X, alpha=0.5, ensemble=True) return np.asarray(y) - np.asarray(y_pred) def _beta_optimize( self, alpha: Union[float, NDArray], upper_bounds: NDArray, lower_bounds: NDArray, ) -> NDArray: """ Minimize the width of the PIs, for a given difference of quantiles. Parameters ---------- alpha: Union[float, NDArray] The quantiles to compute. upper_bounds: NDArray The array of upper values. lower_bounds: NDArray The array of lower values. Returns ------- NDArray Array of betas minimizing the differences ``(1-alpa+beta)-quantile - beta-quantile``. Raises ------ ValueError If lower and upper bounds arrays don't have the same shape. """ if lower_bounds.shape != upper_bounds.shape: raise ValueError( "Lower and upper bounds arrays should have the same shape." ) alpha = cast(NDArray, alpha) betas_0 = np.full( shape=(len(lower_bounds), len(alpha)), fill_value=np.nan, dtype=float, ) for ind_alpha, _alpha in enumerate(alpha): betas = np.linspace( _alpha / (len(lower_bounds) + 1), _alpha, num=len(lower_bounds), endpoint=True, ) one_alpha_beta = np_nanquantile( upper_bounds.astype(float), 1 - _alpha + betas, axis=1, method="higher", ) beta = np_nanquantile( lower_bounds.astype(float), betas, axis=1, method="lower", ) betas_0[:, ind_alpha] = betas[ np.argmin(one_alpha_beta - beta, axis=0) ] return betas_0
[docs] def fit( self, X: ArrayLike, y: ArrayLike, sample_weight: Optional[ArrayLike] = None, ) -> MapieTimeSeriesRegressor: """ Compared to the method ``fit`` of ``MapieRegressor``, the ``fit`` method of ``MapieTimeSeriesRegressor`` computes the ``conformity_scores_`` with relative values. Parameters ---------- X: ArrayLike of shape (n_samples, n_features) Training data. y: ArrayLike of shape (n_samples,) Training labels. sample_weight: Optional[ArrayLike] of shape (n_samples,) Sample weights for fitting the out-of-fold models. If ``None``, then samples are equally weighted. If some weights are null, their corresponding observations are removed before the fitting process and hence have no conformity scores. If weights are non-uniform, conformity scores are still uniformly weighted. By default ``None``. Returns ------- MapieTimeSeriesRegressor The model itself. """ self = super().fit(X=X, y=y, sample_weight=sample_weight) self.conformity_scores_ = self._relative_conformity_scores(X, y) return self
[docs] def partial_fit( self, X: ArrayLike, y: ArrayLike, ) -> MapieTimeSeriesRegressor: """ Update the ``conformity_scores_`` attribute when new data with known labels are available. Note: Don't use ``partial_fit`` with samples of the training set. Parameters ---------- X: ArrayLike of shape (n_samples_test, n_features) Input data. y: ArrayLike of shape (n_samples_test,) Input labels. Returns ------- MapieTimeSeriesRegressor The model itself. Raises ------ ValueError If the length of ``y`` is greater than the length of the training set. """ check_is_fitted(self, self.fit_attributes) X = cast(NDArray, X) y = cast(NDArray, y) n = len(self.conformity_scores_) if len(X) > n: raise ValueError( "The number of observations to update is higher than the" "number of training instances." ) new_conformity_scores_ = self._relative_conformity_scores(X, y) self.conformity_scores_ = np.roll( self.conformity_scores_, -len(new_conformity_scores_) ) self.conformity_scores_[ -len(new_conformity_scores_): ] = new_conformity_scores_ return self
[docs] def predict( self, X: ArrayLike, ensemble: bool = False, alpha: Optional[Union[float, Iterable[float]]] = None, optimize_beta: bool = True, ) -> Union[NDArray, Tuple[NDArray, NDArray]]: """ Correspond to 'Conformal prediction for dynamic time-series'. Parameters ---------- X: ArrayLike of shape (n_samples, n_features) Test data. ensemble: bool Boolean determining whether the predictions are ensembled or not. If ``False``, predictions are those of the model trained on the whole training set. If ``True``, predictions from perturbed models are aggregated by the aggregation function specified in the ``agg_function`` attribute. If ``cv`` is ``"prefit"`` or ``"split"``, ``ensemble`` is ignored. By default ``False``. alpha: Optional[Union[float, Iterable[float]]] Can be a float, a list of floats, or a ``ArrayLike`` of floats. Between ``0`` and ``1``, represents the uncertainty of the confidence interval. Lower ``alpha`` produce larger (more conservative) prediction intervals. ``alpha`` is the complement of the target coverage level. By default ``None``. optimize_beta: bool Whether to optimize the PIs' width or not. Returns ------- Union[NDArray, Tuple[NDArray, NDArray]] - NDArray of shape (n_samples,) if ``alpha`` is ``None``. - Tuple[NDArray, NDArray] of shapes (n_samples,) and (n_samples, 2, n_alpha) if ``alpha`` is not ``None``. - [:, 0, :]: Lower bound of the prediction interval. - [:, 1, :]: Upper bound of the prediction interval. """ # Checks check_is_fitted(self, self.fit_attributes) self._check_ensemble(ensemble) alpha = cast(Optional[NDArray], check_alpha(alpha)) y_pred = self.estimator_.single_estimator_.predict(X) n = len(self.conformity_scores_) if alpha is None: return np.array(y_pred) alpha_np = cast(NDArray, alpha) check_alpha_and_n_samples(alpha_np, n) if optimize_beta: betas_0 = self._beta_optimize( alpha_np, self.conformity_scores_.reshape(1, -1), self.conformity_scores_.reshape(1, -1), ) else: betas_0 = np.repeat(alpha[:, np.newaxis] / 2, n, axis=0) lower_quantiles = np_nanquantile( self.conformity_scores_.astype(float), betas_0[0, :], axis=0, method="lower", ).T higher_quantiles = np_nanquantile( self.conformity_scores_.astype(float), 1 - alpha_np + betas_0[0, :], axis=0, method="higher", ).T self.lower_quantiles_ = lower_quantiles self.higher_quantiles_ = higher_quantiles if self.method in self.no_agg_methods_ \ or self.cv in self.no_agg_cv_: y_pred_low = y_pred[:, np.newaxis] + lower_quantiles y_pred_up = y_pred[:, np.newaxis] + higher_quantiles else: y_pred_multi = self.estimator_._pred_multi(X) pred = aggregate_all(self.agg_function, y_pred_multi) lower_bounds, upper_bounds = pred, pred y_pred_low = lower_bounds.reshape(-1, 1) + lower_quantiles y_pred_up = upper_bounds.reshape(-1, 1) + higher_quantiles if ensemble: y_pred = aggregate_all(self.agg_function, y_pred_multi) return y_pred, np.stack([y_pred_low, y_pred_up], axis=1)
def _more_tags(self): return { "_xfail_checks": { "check_estimators_partial_fit_n_features": "partial_fit can only be called on fitted models" } }