Source code for score_analysis.scores

from __future__ import annotations

import math
from dataclasses import dataclass
from enum import Enum
from typing import Any, Callable, List, Optional, Tuple, Union

import numpy as np

from . import utils
from .cm import ConfusionMatrix


[docs]class BinaryLabel(Enum): """ Simple enum for positive and negative classes. """ pos = "pos" neg = "neg" def __eq__(self, other): return self.value == BinaryLabel(other).value def __hash__(self): return hash(self.value)
SAMPLING_METHOD_REPLACEMENT = "replacement" SAMPLING_METHOD_SINGLE_PASS = "single_pass" SAMPLING_METHOD_DYNAMIC = "dynamic" SAMPLING_METHOD_PROPORTION = "proportion" CustomSamplingMethod = Callable[["Scores"], "Scores"] SamplingMethod = Union[str, CustomSamplingMethod]
[docs]@dataclass(frozen=True) class BootstrapConfig: """ Bootstrap configuration for creating bootstrap samples and computing CIs. Args: nb_samples: Number of samples to use for bootstrapping. bootstrap_method: Method to compute the CI from the bootstrap samples. Possible values are * "quantile" uses the alpha/2 and 1-alpha/2 quantiles of the empirical metric distribution. * "bc" applies bias correction to correct for the bias of the median of the empirical distribution * "bca" applies bias correction and acceleration to correct for non- constant standard error. See Ch. 11 of Computer Age Statistical Inference by Efron and Hastie for details. sampling_method: Sampling method to create bootstrap sample. Supported methods are * "replacement" creates a sample with the same number of positive and negative scores using sampling with replacement. * "single_pass" approximates replacement sampling using a Poisson distribution to determine how often each score would be selected in the bootstrap sample. This speeds up sampling by up to ~40% compared to replacement sampling since the sampled scores are already sorted. However, the method cannot guarantee that each bootstrap sample has the same number of scores. This should not matter if the number of scores is >100 (per group, if stratified sampling is used). * "dynamic" chooses between replacement and single pass sampling. It chooses single pass sampling, if >100 scores are present per group and smoothing is disabled and reverts to replacement sampling otherwise. The threshold can be changed by setting the variable ``SINGLE_PASS_SAMPLE_THRESHOLD``. * "proportion" creates a sample of size defined by ratio using sampling without replacement. This is similar to cross-validation, where a proportion of data is used in each iteration. * A callable with signature:: method(source: Scores, **kwargs) -> Scores creating one sample from a source Scores object. stratified_sampling: Stratified sampling is only supported for replacement sampling. Possible values are * ``None``. No stratification is used * "by_label". Sampling preserves the proportion of positive and negative samples as well as the proportion of easy positive and negative samples. * "by_group". Sampling preserves the proportion of samples in each group. Defaults to non-stratified sampling, if no groups are present. smoothing: Optional smoothing of sampled scores. ratio: Size of sample when using proportional sampling. In range (0, 1). """ nb_samples: int = 1_000 bootstrap_method: str = "bca" sampling_method: SamplingMethod = SAMPLING_METHOD_DYNAMIC stratified_sampling: Optional[str] = None smoothing: bool = False ratio: Optional[float] = None
SINGLE_PASS_SAMPLE_THRESHOLD = 100 DEFAULT_BOOTSTRAP_CONFIG = BootstrapConfig()
[docs]class Scores: def __init__( self, pos, neg, *, nb_easy_pos: int = 0, nb_easy_neg: int = 0, score_class: Union[BinaryLabel, str] = "pos", equal_class: Union[BinaryLabel, str] = "pos", is_sorted: bool = False, ): """ Args: pos: Scores for positive samples. neg: Scores for negative samples. nb_easy_pos: Number of positive samples that we assume are always correctly classified when computing metrics. These parameters when evaluating a highly accurate classifier on only the hardest samples to speed up evaluation. nb_easy_neg: Number of negative samples that we assume are always correctly classified. score_class: Do scores indicate membership of the positive or the negative class? equal_class: Do samples with score equal to the threshold get assigned to the positive or negative class? is_sorted: If True, we assume the scores are already sorted. Can be used to speed up Scores object creation. """ self.pos = np.asarray(pos) self.neg = np.asarray(neg) self.nb_easy_pos = nb_easy_pos self.nb_easy_neg = nb_easy_neg self.score_class = BinaryLabel(score_class) self.equal_class = BinaryLabel(equal_class) if not is_sorted: self.pos = np.sort(self.pos) self.neg = np.sort(self.neg) @property def hard_pos_ratio(self) -> float: if self.nb_easy_pos > 0: return len(self.pos) / (len(self.pos) + self.nb_easy_pos) else: # The default state is that all samples are hard return 1.0 @property def hard_neg_ratio(self) -> float: if self.nb_easy_neg > 0: return len(self.neg) / (len(self.neg) + self.nb_easy_neg) else: # The default state is that all samples are hard return 1.0 @property def easy_pos_ratio(self) -> float: return 1.0 - self.hard_pos_ratio @property def easy_neg_ratio(self) -> float: return 1.0 - self.hard_neg_ratio @property def nb_easy_samples(self) -> int: return self.nb_easy_pos + self.nb_easy_neg @property def nb_hard_pos(self) -> int: return len(self.pos) @property def nb_hard_neg(self) -> int: return len(self.neg) @property def nb_hard_samples(self) -> int: return len(self.pos) + len(self.neg) @property def nb_all_pos(self) -> int: return self.nb_easy_pos + len(self.pos) @property def nb_all_neg(self) -> int: return self.nb_easy_neg + len(self.neg) @property def nb_all_samples(self) -> int: return self.nb_easy_samples + self.nb_hard_samples @property def easy_ratio(self) -> float: if self.nb_easy_samples > 0: return self.nb_easy_samples / self.nb_all_samples else: # The default state is that all samples are hard return 0.0 @property def hard_ratio(self) -> float: return 1.0 - self.easy_ratio
[docs] @staticmethod def from_labels( labels, scores, *, pos_label: Any = 1, nb_easy_pos: int = 0, nb_easy_neg: int = 0, score_class: Union[BinaryLabel, str] = "pos", equal_class: Union[BinaryLabel, str] = "pos", is_sorted: bool = False, ) -> Scores: """ Args: labels: Array with sample labels. scores: Array with sample scores. pos_label: The label of the positive class. All other labels are treated as negative labels. nb_easy_pos: Number of positive samples that we assume are always correctly classified when computing metrics. These parameters when evaluating a highly accurate classifier on only the hardest samples to speed up evaluation. nb_easy_neg: Number of negative samples that we assume are always correctly classified. score_class: Do scores indicate membership of the positive or the negative class? equal_class: Do samples with score equal to the threshold get assigned to the positive or negative class? is_sorted: If True, we assume the scores are already sorted. Can be used to speed up Scores object creation. Returns: A Scores instance. """ labels = np.asarray(labels) scores = np.asarray(scores) pos = scores[labels == pos_label] neg = scores[labels != pos_label] return Scores( pos=pos, neg=neg, nb_easy_pos=nb_easy_pos, nb_easy_neg=nb_easy_neg, score_class=score_class, equal_class=equal_class, is_sorted=is_sorted, )
def __eq__(self, other: Scores) -> bool: """ Tests if two Scores objects are equal. Equality is tested exactly, rounding errors can lead to objects being not equal. Args: other: Object to test against. Returns: True, if objects are equal, false otherwise. """ equal = ( np.array_equal(self.pos, other.pos) and np.array_equal(self.neg, other.neg) and self.nb_easy_pos == other.nb_easy_pos and self.nb_easy_neg == other.nb_easy_neg and self.score_class == other.score_class and self.equal_class == other.equal_class ) return equal
[docs] def swap(self) -> Scores: """ Swaps positive and negative scores. Also reverses the decision logic, so that fpr of original scores equals fnr of reversed scores. Returns: Scores object with positive and negative scores reversed. """ return Scores( pos=self.neg, neg=self.pos, nb_easy_pos=self.nb_easy_neg, nb_easy_neg=self.nb_easy_pos, score_class="neg" if self.score_class == BinaryLabel.pos else "pos", equal_class="neg" if self.equal_class == BinaryLabel.pos else "pos", is_sorted=True, )
[docs] def cm(self, threshold) -> ConfusionMatrix: """ Computes confusion matrices at the given threshold(s). Args: threshold: Can be a scalar or array-like. Returns: A binary confusion matrix. """ threshold = np.asarray(threshold) if self.score_class == BinaryLabel.pos: side = "left" if self.equal_class == BinaryLabel.pos else "right" else: side = "right" if self.equal_class == BinaryLabel.pos else "left" pos_below = np.searchsorted(self.pos, threshold, side=side) neg_below = np.searchsorted(self.neg, threshold, side=side) pos_above = len(self.pos) - pos_below neg_above = len(self.neg) - neg_below if self.score_class == BinaryLabel.pos: tp, fn = pos_above, pos_below fp, tn = neg_above, neg_below else: tp, fn = pos_below, pos_above fp, tn = neg_below, neg_above # Account for easy samples tp += self.nb_easy_pos tn += self.nb_easy_neg matrix = np.empty((*threshold.shape, 2, 2), dtype=int) matrix[..., 0, 0] = tp matrix[..., 0, 1] = fn matrix[..., 1, 0] = fp matrix[..., 1, 1] = tn return ConfusionMatrix(matrix=matrix, binary=True)
confusion_matrix = cm
[docs] def tpr(self, threshold): """True Positive Rate at threshold(s).""" return self.cm(threshold).tpr()
[docs] def fnr(self, threshold): """False Negative Rate at threshold(s).""" return self.cm(threshold).fnr()
[docs] def tnr(self, threshold): """True Negative Rate at threshold(s).""" return self.cm(threshold).tnr()
[docs] def fpr(self, threshold): """False Positive Rate at threshold(s).""" return self.cm(threshold).fpr()
[docs] def topr(self, threshold): """Test Outcome Positive Rate at threshold(s).""" return self.cm(threshold).topr()
[docs] def tonr(self, threshold): """Test Outcome Negative Rate at threshold(s).""" return self.cm(threshold).tonr()
# Aliases.
[docs] def tar(self, threshold): """ True Acceptance Rate at threshold(s). Alias for :func:`~Scores.tpr`. """ return self.tpr(threshold)
[docs] def frr(self, threshold): """ False Rejection Rate at threshold(s). Alias for :func:`~Scores.fnr`. """ return self.fnr(threshold)
[docs] def trr(self, threshold): """ True Rejection Rate at threshold(s). Alias for :func:`~Scores.tnr`. """ return self.tnr(threshold)
[docs] def far(self, threshold): """ False Acceptance Rate at threshold(s). Alias for :func:`~Scores.fpr`. """ return self.fpr(threshold)
[docs] def acceptance_rate(self, threshold): """ Acceptance Rate at threshold(s). Alias for :func:`~Scores.topr`. """ return self.topr(threshold)
[docs] def rejection_rate(self, threshold): """ Rejection Rate at threshold(s). Alias for :func:`~Scores.tonr`. """ return self.tonr(threshold)
[docs] def threshold_at_tpr(self, tpr, *, method: str = "linear"): """ Set threshold at True Positive Rate. Args: tpr: TPR values at which to set threshold. method: Possible values are "linear", "lower", "higher". If "lower" or "higher", we return the closest score at which the metric is lower or higher that the target. If "linear", we apply linear interpolation between the lower and higher values. """ if len(self.pos) == 0: raise ValueError("Cannot set threshold at TPR with no positive values.") # Example: We want threshold at 70% TPR. If easy_pos_ratio=60%, then we want # the threshold at 25% TPR on the remaining 40% hard positives, since # 70% - 60% = 10% is 25% of the remaining 40% tpr = np.maximum(np.asarray(tpr) - self.easy_pos_ratio, 0.0) tpr = np.minimum(tpr / self.hard_pos_ratio, 1.0) return self._threshold_at_ratio(self.pos, tpr, False, BinaryLabel.pos, method)
[docs] def threshold_at_fnr(self, fnr, *, method: str = "linear"): """ Set threshold at False Negative Rate. Args: fnr: FNR values at which to set threshold. method: Possible values are "linear", "lower", "higher". If "lower" or "higher", we return the closest score at which the metric is lower or higher that the target. If "linear", we apply linear interpolation between the lower and higher values. """ if len(self.pos) == 0: raise ValueError("Cannot set threshold at FNR with no positive values.") # Example: We want the threshold at 5% FNR. If hard_pos_ratio=10%, then we want # the threshold at 5% / 0.1 = 50% of the available 10% of hard positives. fnr = np.minimum(np.asarray(fnr) / self.hard_pos_ratio, 1.0) return self._threshold_at_ratio(self.pos, fnr, True, BinaryLabel.pos, method)
[docs] def threshold_at_tnr(self, tnr, *, method: str = "linear"): """ Set threshold at True Negative Rate. Args: tnr: TNR values at which to set threshold. method: Possible values are "linear", "lower", "higher". If "lower" or "higher", we return the closest score at which the metric is lower or higher that the target. If "linear", we apply linear interpolation between the lower and higher values. """ if len(self.neg) == 0: raise ValueError("Cannot set threshold at TNR with no negative values.") # See explanation in threshold_at_tpr() tnr = np.maximum(np.asarray(tnr) - self.easy_neg_ratio, 0.0) tnr = np.minimum(tnr / self.hard_neg_ratio, 1.0) return self._threshold_at_ratio(self.neg, tnr, True, BinaryLabel.neg, method)
[docs] def threshold_at_fpr(self, fpr, *, method: str = "linear"): """ Set threshold at False Positive Rate. Args: fpr: FPR values at which to set threshold. method: Possible values are "linear", "lower", "higher". If "lower" or "higher", we return the closest score at which the metric is lower or higher that the target. If "linear", we apply linear interpolation between the lower and higher values. """ if len(self.neg) == 0: raise ValueError("Cannot set threshold at FPR with no negative values.") # See explanation at threshold_at_fnr() fpr = np.minimum(np.asarray(fpr) / self.hard_neg_ratio, 1.0) return self._threshold_at_ratio(self.neg, fpr, False, BinaryLabel.neg, method)
[docs] def threshold_at_topr(self, topr, *, method: str = "linear"): """ Set threshold at Test Outcome Positive Rate. This is the proportion of samples where the test outcome is positive, i.e. the test detects the condition. Args: topr: TOPR values at which to set threshold. method: Possible values are "linear", "lower", "higher". If "lower" or "higher", we return the closest score at which the metric is lower or higher that the target. If "linear", we apply linear interpolation between the lower and higher values. """ concat_scores = np.sort(np.concatenate([self.neg, self.pos])) if len(concat_scores) == 0: raise ValueError("Cannot set threshold at TOPR without any values.") # See explanation at threshold_at_tonr() easy_pos_to_total_ratio = self.nb_easy_pos / self.nb_all_samples topr = np.maximum(np.asarray(topr) - easy_pos_to_total_ratio, 0.0) topr = np.minimum(topr / self.hard_ratio, 1.0) return self._threshold_at_ratio( concat_scores, topr, False, BinaryLabel.pos, method )
[docs] def threshold_at_tonr(self, tonr, *, method: str = "linear"): """ Set threshold at Test Outcome Negative Rate. This is the proportion of samples where the test outcome is negative, i.e. the test does not detect the condition. Args: tonr: TONR values at which to set threshold. method: Possible values are "linear", "lower", "higher". If "lower" or "higher", we return the closest score at which the metric is lower or higher that the target. If "linear", we apply linear interpolation between the lower and higher values. """ concat_scores = np.sort(np.concatenate([self.neg, self.pos])) if len(concat_scores) == 0: raise ValueError("Cannot set threshold at TONR without any values.") # Example: Imagine we want a threshold at 85% TONR and 80% of our data are # easy negatives and 10% of our data are easy positives. Then, we want the # threshold at 50% TONR on the 10% of data for which we have scores, since # 85% - 80% = 5% is 50% of the 10% data with scores (5% / 10%). easy_neg_to_total_ratio = self.nb_easy_neg / self.nb_all_samples tonr = np.maximum(np.asarray(tonr) - easy_neg_to_total_ratio, 0.0) tonr = np.minimum(tonr / self.hard_ratio, 1.0) return self._threshold_at_ratio( concat_scores, tonr, True, BinaryLabel.neg, method )
def _threshold_at_ratio( self, scores, target_ratio, increasing: bool, ratio_class: BinaryLabel, method: str, ): """ Helper function to set the threshold at a specific metric, for metrics that are defined as ratios, such as TPR, FPR, TNR and FNR. The following table relates the parameters ``increasing`` and ``ratio_class`` to common metrics, e.g., TPR, etc. increasing ratio_class TPR False pos FNR True pos TNR True neg FPR False neg We compute a threshold, such that P(score < threshold) = target_ratio, if metric is left continuous P(score <= threshold) = target_ratio, if metric is right continuous Args: scores: Array of scores to threshold target_ratio: The threshold is set such that the target_ratio of scores will be above or below the threshold. This can be an array. increasing: States, if the metric is increasing or decreasing, when score_class = "pos". Note that this is a property of the metric. ratio_class: States, if the metric is calculated using positive or negative scores. method: Possible values are "linear", "lower", "higher". """ if method not in {"lower", "higher", "linear"}: raise ValueError(f"Unknown interpolation method: {method}.") # Dictionary for reversing the interpolation method. reverse_method = {"lower": "higher", "higher": "lower", "linear": "linear"} isscalar = np.isscalar(target_ratio) target_ratio = np.asarray(target_ratio) # The standard case is an increasing metric based on positive scores, i.e., FNR, # with equal_class and score_class equal to pos. This case is left-continuous. # Metrics based on neg, reverse the continuity behaviour. left_continuous = ratio_class == BinaryLabel.pos # The equality class also determines the continuity if self.equal_class != BinaryLabel.pos: left_continuous = not left_continuous # We transform non-increasing metrics into increasing ones. Note that this does # not change continuity. if not increasing: target_ratio = 1.0 - target_ratio method = reverse_method[method] # And we transform the score direction as well. This does change continuity. if self.score_class != BinaryLabel.pos: target_ratio = 1.0 - target_ratio left_continuous = not left_continuous method = reverse_method[method] # From here on, we can pretend to be in the standard case, i.e., calculate # thresholds based on an increasing metric. threshold = self._invert_increasing_function( scores, target_ratio, left_continuous, method ) if isscalar: threshold = threshold.item() return threshold @staticmethod def _invert_increasing_function( scores, target_ratio, left_continuous: bool, method: str ): scores = scores.astype(float) # Otherwise we can get problems with nextafter if not left_continuous: min_ratio = 1.0 / len(scores) target_ratio = target_ratio - min_ratio target = target_ratio * len(scores) left_idx = np.floor(target) # Element to left of threshold right_idx = np.ceil(target) # Element to right of threshold la = right_idx - target # Coefficient in convex sum # Ensure we don't exceed array limits left_idx = np.maximum(np.minimum(left_idx.astype(int), len(scores) - 1), 0) right_idx = np.maximum(np.minimum(right_idx.astype(int), len(scores) - 1), 0) if method == "linear": threshold = la * scores[left_idx] + (1 - la) * scores[right_idx] elif method == "lower": threshold = scores[left_idx] else: # "higher" threshold = scores[right_idx] threshold = np.asarray(threshold) # We need this when target_ratio is scalar # Special cases of TPR <= 0. and TPR >= 1. threshold[target_ratio <= 0.0] = np.nextafter(scores[0], -np.inf) threshold[target_ratio >= 1.0] = np.nextafter(scores[-1], np.inf) return threshold # Aliases.
[docs] def threshold_at_tar(self, tar, *, method: str = "linear"): """ Set threshold at True Acceptance Rate Alias for :func:`~Scores.threshold_at_tpr`. """ return self.threshold_at_tpr(tpr=tar, method=method)
[docs] def threshold_at_frr(self, frr, *, method: str = "linear"): """ Set threshold at False Rejection Rate Alias for :func:`~Scores.threshold_at_fnr`. """ return self.threshold_at_fnr(fnr=frr, method=method)
[docs] def threshold_at_trr(self, trr, *, method: str = "linear"): """ Set threshold at True Rejection Rate Alias for :func:`~Scores.threshold_at_tnr`. """ return self.threshold_at_tnr(tnr=trr, method=method)
[docs] def threshold_at_far(self, far, *, method: str = "linear"): """ Set threshold at False Acceptance Rate Alias for :func:`~Scores.threshold_at_fpr`. """ return self.threshold_at_fpr(fpr=far, method=method)
[docs] def threshold_at_acceptance_rate(self, acceptance_rate, *, method: str = "linear"): """ Set threshold at Acceptance Rate Alias for :func:`~Scores.threshold_at_topr`. """ return self.threshold_at_topr(topr=acceptance_rate, method=method)
[docs] def threshold_at_rejection_rate(self, rejection_rate, *, method: str = "linear"): """ Set threshold at Rejection Rate Alias for :func:`~Scores.threshold_at_tonr`. """ return self.threshold_at_tonr(tonr=rejection_rate, method=method)
[docs] def threshold_at_metric( self, target, metric: Union[str, Callable], points: Optional[Union[int, np.ndarray]] = None, ) -> List[np.ndarray]: """ General function for setting thresholds at arbitrary metrics. No assumption is made about the metric being monotone or the threshold being unique. Given a metric function and a target value, the function will find all values for the threshold such that ``metric(threshold) = target``. If ``N = len(pos) + len(neg)`` is the number of scores and ``T = len(target)`` is the number of thresholds we want to set, this function has complexity O(N*T), because it searches over the whole score space to find all solutions. We can speed up the function by considering only a subset of points. Args: target: Target points at which to set the threshold. metric: Can be a string indicating a member function of the Scores class or a callable with signature:: metric(sample: Scores, threshold: np.ndarray) -> np.ndarray points: If a scalar, we use this many linearly spaced scores between ``min(pos, neg)`` and ``max(pos, neg)``. If given an array, we evaluate the metric at exactly these points. Returns: A list of thresholds of the same length as ``target``, such that ``threshold[j]`` is a strictly increasing array containing all solutions of the equation ``metric(theta) = target[j]``. """ if isinstance(metric, str): metric = getattr(type(self), metric) if points is None: points = np.sort(np.concatenate([self.pos, self.neg])) if len(points) < 2: raise ValueError("At least two values are required to set thresholds.") elif isinstance(points, int): min_score = min( self.pos[0] if len(self.pos) > 0 else np.inf, self.neg[0] if len(self.neg) > 0 else np.inf, ) max_score = max( self.pos[-1] if len(self.pos) > 0 else -np.inf, self.neg[-1] if len(self.neg) > 0 else -np.inf, ) if min_score >= max_score: raise ValueError("At least two values are required to set thresholds.") points = np.linspace(min_score, max_score, points, endpoint=True) threshold = utils.invert_pl_function(x=points, y=metric(self, points), t=target) return threshold
[docs] def eer(self) -> Tuple[float, float]: """ Calculates Equal Error Rate, i.e., where FPR = FNR (or, equivalently, where FAR = FRR). Returns: Tuple ``(threshold, eer)``, consisting of the threshold at which EER is achieved and the EER value itself. """ # We treat the case of perfect separation separately if self.pos[0] >= self.neg[-1] and self.score_class == BinaryLabel.pos: return (self.pos[0] + self.neg[-1]) / 2, 0.0 if self.pos[-1] <= self.neg[0] and self.score_class == BinaryLabel.neg: return (self.pos[-1] + self.neg[0]) / 2, 0.0 sign = -(self.threshold_at_fpr(0.0) - self.threshold_at_fnr(0.0)) # We consider the inverse functions, i.e., the function that map fpr/fnr to # the threshold and find the cross-over point using the bisection method. def f(x): y = self.threshold_at_fpr(x) - self.threshold_at_fnr(x) y = sign * y # Normalize function, such that f(0) <= 0. return y # EER cannot be larger than this: FPR cannot be larger than hard_pos_ratio, # since all other positives are easy and thus always correctly classified; # similarly, FNR cannot be larger than hard_neg_ratio max_eer = min(self.hard_pos_ratio, self.hard_neg_ratio) # This is the case of a very bad classifier, i.e., without easy samples this # happens only if all samples are incorrectly classified, i.e., EER=1.0. With # easy samples, we have to interpolate at the edge of where hard samples stop. if f(max_eer) < 0: if np.isclose(self.hard_pos_ratio, self.hard_neg_ratio): threshold = ( self.threshold_at_fpr(max_eer) + self.threshold_at_fnr(max_eer) ) / 2 return threshold, max_eer elif self.hard_pos_ratio < self.hard_neg_ratio: threshold = self.threshold_at_fpr(self.hard_pos_ratio) return threshold, self.hard_pos_ratio else: threshold = self.threshold_at_fnr(self.hard_neg_ratio) return threshold, self.hard_neg_ratio # The function f is increasing, but not strictly, i.e., it can have flat spots, # so we find the left-most root, the right-most root and then take the average. left = self._find_root(f, 0.0, max_eer, find_first=True) right = self._find_root(f, 0.0, max_eer, find_first=False) eer = (left + right) / 2 threshold = self.threshold_at_fpr(eer) return threshold, eer
[docs] def auc( self, lower: float = 0.0, upper: float = 1.0, *, x_axis: str = "fpr", y_axis: str = "tpr", ): """ Computes the (partial) AUC for the given Scores object using the trapezoid integration rule. Args: lower: Lower limit of integration. upper: Upper limit of integration. x_axis: Metric to plot on x-axis. Defaults to FPR. y_axis: Metric to plot on y-axis. Defaults to TPR. """ # We add +/-eps to each score to deal with continuity properties of the ROC # curve. AUC is invariant to left-/right-continuity, but the metrics have # varying continuity behaviour at the score values. Duplicating the scores # makes the function slower, but it means that we don't have to figure out # continuity properties by hand. points = np.nextafter( np.concatenate([self.pos, self.neg]), [[-np.inf], [np.inf]] ) points = np.sort(points.flatten()) x = getattr(self, x_axis)(points) y = getattr(self, y_axis)(points) if x[-1] < x[0]: x = x[::-1] y = y[::-1] left = np.searchsorted(x, lower, side="left") # x[l - 1] < lower <= x[l] right = np.searchsorted(x, upper, side="right") # x[r - 1] <= upper < x[r] # Ensure indices are in range left = np.minimum(left, len(y) - 1) right = np.maximum(right, 1) x = np.concatenate([[lower], x[left:right], [upper]]) y = np.concatenate([[y[left]], y[left:right], [y[right - 1]]]) try: trapezoid = np.trapezoid # Only available in Numpy 2.x except AttributeError: # pragma: no cover trapezoid = np.trapz # Deprecated # We use abs, so we don't have to worry about increasing/decreasing values. return np.abs(trapezoid(y, x))
@staticmethod def _find_root(f, xa, xe, find_first, xtol=1e-10) -> float: """Finds first or last root of a monotone function on interval (xa, xe).""" if not (f(xa) <= 0 <= f(xe)): raise ValueError(f"f({xa}) <= 0 <= f({xe}) not satisfied.") while not np.abs(xa - xe) < xtol: xm = (xa + xe) / 2 if f(xm) < 0: xa = xm elif f(xm) > 0: xe = xm else: if find_first: # If we care about the first root, we move the end of # the interval to the left. xe = xm else: xa = xm return (xa + xe) / 2 def _sampling_method(self, config: BootstrapConfig) -> SamplingMethod: """ If sampling method is "dynamic", we select the appropriate method between replacement and single pass, depending on the number of scores and whether smoothing is enabled. """ if config.sampling_method != SAMPLING_METHOD_DYNAMIC: return config.sampling_method # Nothing to choose here. if ( self.nb_hard_pos < SINGLE_PASS_SAMPLE_THRESHOLD or self.nb_hard_neg < SINGLE_PASS_SAMPLE_THRESHOLD or config.smoothing ): return SAMPLING_METHOD_REPLACEMENT else: return SAMPLING_METHOD_SINGLE_PASS def _sample_indices(self, by_label: bool = False, single_pass: bool = False): """ Returns the indices of positive and negative scores that we would include in the bootstrap sample. """ if by_label: # Stratified sampling does not change easy : hard and pos : neg ratios. nb_easy_pos = self.nb_easy_pos nb_easy_neg = self.nb_easy_neg nb_hard_pos = self.nb_hard_pos nb_hard_neg = self.nb_hard_neg else: # Non-stratified sampling also takes into account uncertainty about the # pos : neg ratio in the bootstrapped sample. pos_neg_ratio = ( self.nb_all_pos / self.nb_all_samples if self.nb_all_samples > 0 else 0.0 ) nb_pos = np.random.binomial(self.nb_all_samples, pos_neg_ratio) nb_neg = self.nb_all_samples - nb_pos # Try to have at least one positive and one negative sample. if nb_pos == 0 and self.nb_all_pos > 0: nb_pos, nb_neg = 1, self.nb_all_samples - 1 if nb_neg == 0 and self.nb_all_neg > 0: nb_pos, nb_neg = self.nb_all_samples - 1, 1 # Find out how many easy and hard samples there will be. nb_easy_pos = np.random.binomial(nb_pos, self.easy_pos_ratio) nb_easy_neg = np.random.binomial(nb_neg, self.easy_neg_ratio) nb_hard_pos = nb_pos - nb_easy_pos nb_hard_neg = nb_neg - nb_easy_neg # Try to have at least one hard positive and negative sample. if nb_hard_pos == 0 and self.nb_hard_pos > 0: nb_hard_pos, nb_easy_pos = 1, nb_pos - 1 if nb_hard_neg == 0 and self.nb_hard_neg > 0: nb_hard_neg, nb_easy_neg = 1, nb_neg - 1 if single_pass: def _single_pass_sampling(size, n, p): # This threshold is toggling between using the binomial distribution # to choose how often each score will be included in the bootstrap # sample and approximating the binomial distribution with the Poisson # distribution. The Poisson distribution is a sufficiently good # approximation, if n > 100 and n*p < 20 (in our case, n*p is ~1). # # This is different from SINGLE_PASS_SAMPLE_THRESHOLD, which toggles # between single pass and replacement sampling. This is all single # pass sampling. if n < 100: return np.random.binomial(size=size, n=n, p=p) else: return np.random.poisson(size=size, lam=n * p) nb_pos_selected = _single_pass_sampling( size=self.nb_hard_pos, n=nb_hard_pos, p=1.0 / self.nb_hard_pos ) nb_neg_selected = _single_pass_sampling( size=self.nb_hard_neg, n=nb_hard_neg, p=1.0 / self.nb_hard_neg ) pos_idx = np.repeat(np.arange(self.nb_hard_pos), nb_pos_selected) neg_idx = np.repeat(np.arange(self.nb_hard_neg), nb_neg_selected) else: # Sampling hard samples with replacement pos_idx = np.random.choice(self.nb_hard_pos, size=nb_hard_pos, replace=True) neg_idx = np.random.choice(self.nb_hard_neg, size=nb_hard_neg, replace=True) return pos_idx, neg_idx, nb_easy_pos, nb_easy_neg
[docs] def bootstrap_sample( self, config: BootstrapConfig = DEFAULT_BOOTSTRAP_CONFIG, ) -> Scores: """ Creates one bootstrap sample by sampling with the specified configuration. Args: config: Bootstrap configuration. Returns: Scores object with the sampled scores. """ # Resolve "dynamic" sampling to either replacement or single pass sampling. sampling_method = self._sampling_method(config) if sampling_method == SAMPLING_METHOD_REPLACEMENT: pos_idx, neg_idx, nb_easy_pos, nb_easy_neg = self._sample_indices( by_label=config.stratified_sampling == "by_label", single_pass=False ) pos = self.pos[pos_idx] neg = self.neg[neg_idx] if config.smoothing: def _estimate_bandwidth(x: np.ndarray) -> float: # The rule for the bandwidth estimation is taken from the wiki page # https://en.wikipedia.org/wiki/Kernel_density_estimation iqr = np.quantile(x, 0.75) - np.quantile(x, 0.25) h = 0.9 * min(x.std(), iqr / 1.34) * math.pow(len(x), -0.2) return h h_pos = _estimate_bandwidth(pos) h_neg = _estimate_bandwidth(neg) pos = pos + np.random.normal(loc=0.0, scale=h_pos, size=pos.shape) neg = neg + np.random.normal(loc=0.0, scale=h_neg, size=neg.shape) scores = Scores( pos=pos, neg=neg, nb_easy_pos=nb_easy_pos, nb_easy_neg=nb_easy_neg, score_class=self.score_class, equal_class=self.equal_class, ) elif sampling_method == SAMPLING_METHOD_SINGLE_PASS: pos_idx, neg_idx, nb_easy_pos, nb_easy_neg = self._sample_indices( by_label=config.stratified_sampling == "by_label", single_pass=True ) pos = self.pos[pos_idx] neg = self.neg[neg_idx] if config.smoothing: # We don't support smoothing, because after smoothing the scores would # not be sorted anymore, which would remove the main source of speedup # for the method. raise ValueError("Smoothing is not supported for single pass sampling.") scores = Scores( pos=pos, neg=neg, nb_easy_pos=nb_easy_pos, nb_easy_neg=nb_easy_neg, score_class=self.score_class, equal_class=self.equal_class, is_sorted=True, # Single-pass sampling returns already sorted scores. ) elif sampling_method == SAMPLING_METHOD_PROPORTION: if config.ratio is None: raise ValueError("For proportional sampling, ratio has to be defined.") # Sampling a sample defined by ratio, without replacement nb_pos = max(int(config.ratio * self.pos.size), 1) nb_neg = max(int(config.ratio * self.neg.size), 1) pos = np.random.choice(self.pos, size=nb_pos, replace=False) neg = np.random.choice(self.neg, size=nb_neg, replace=False) # We also "sample" a proportional sample of easy sample nb_easy_pos = int(config.ratio * self.nb_easy_pos) nb_easy_neg = int(config.ratio * self.nb_easy_neg) scores = Scores( pos=pos, neg=neg, nb_easy_pos=nb_easy_pos, nb_easy_neg=nb_easy_neg, score_class=self.score_class, equal_class=self.equal_class, ) elif isinstance(sampling_method, str): raise ValueError(f"Unsupported sampling method {config.sampling_method}.") elif callable(sampling_method): scores = sampling_method(self) # Custom sampling method else: raise ValueError("Sampling method must be a string or a callable.") return scores
[docs] def bootstrap_metric( self, metric: Union[str, Callable], config: BootstrapConfig = DEFAULT_BOOTSTRAP_CONFIG, **kwargs, ) -> np.ndarray: """ Calculates nb_samples samples of metric using bootstrapping. Args: metric: Can be a string indicating a member function of the Scores class or a callable with signature:: metric(sample: Scores, **kwargs) -> np.ndarray config: Bootstrap config. **kwargs: Arguments that are passed to the metric function. Returns: Array of samples from metric. If metric returns arrays of shape (X,), the function will return an array of shape (nb_samples, X). """ if isinstance(metric, str): # getattr(self) would resolve the method from Scores, while type(self) will # return the method from the subclass, e.g., from GroupScores. metric = getattr(type(self), metric) m = np.asarray(metric(self, **kwargs)) res = np.empty(shape=(config.nb_samples, *m.shape), dtype=m.dtype) for j in range(config.nb_samples): sample = self.bootstrap_sample(config=config) res[j] = metric(sample, **kwargs) return res
[docs] def bootstrap_ci( self, metric: Union[str, Callable], alpha: float = 0.05, config: BootstrapConfig = DEFAULT_BOOTSTRAP_CONFIG, **kwargs, ) -> np.ndarray: """ Calculates the confidence interval with approximate coverage 1-alpha for metric by bootstrapping nb_samples from the positive and negative scores. Args: metric: Can be a string indicating a member function of the Scores class or a callable with signature:: metric(sample: Scores, **kwargs) -> Union[float, np.ndarray] alpha: Significance level. In range (0, 1). config: Bootstrap config. **kwargs: Arguments that are passed to the metric function. Returns: Returns an array of shape (Y, 2) with lower and upper bounds of the CI, for a metric returning shape (Y,). """ if isinstance(metric, str): metric = getattr(type(self), metric) samples = self.bootstrap_metric(metric, config=config, **kwargs) # (N, Y) ci = utils.bootstrap_ci( theta=samples, theta_hat=metric(self, **kwargs), alpha=alpha, method=config.bootstrap_method, ) return ci
def pointwise_cm( labels, scores, threshold, *, pos_label: Any = 1, score_class: Union[BinaryLabel, str] = "pos", equal_class: Union[BinaryLabel, str] = "pos", ) -> np.ndarray: """ Returns a boolean array, stating for each score, in which field of the confusion matrix it belongs at a given threshold. If scores have shape (X,), threshold has shape (Y,), then we return an array of shape (X, Y, 2, 2). Args: labels: Array with sample labels scores: Array with sample scores threshold: Thresholds at which to classify scores pos_label: The label of the positive class. All other labels are treated as negative labels. score_class: Do scores indicate membership of the positive or the negative class? equal_class: Do samples with score equal to the threshold get assigned to the positive or negative class? Returns: Boolean array of shape (X, Y, 2, 2) specifying whether a given sample at a given threshold belongs in a given cell of the confusion matrix. """ labels = np.asarray(labels) scores = np.asarray(scores) threshold = np.asarray(threshold) score_class = BinaryLabel(score_class) equal_class = BinaryLabel(equal_class) # Save the shape and flatten objects scores_shape = scores.shape labels = np.reshape(labels, -1) labels = labels[:, np.newaxis] scores = np.reshape(scores, -1) scores = scores[:, np.newaxis] threshold_shape = threshold.shape threshold = np.reshape(threshold, -1) threshold = threshold[np.newaxis, :] # True labels pos = labels == pos_label neg = labels != pos_label # Predicted labels if score_class == BinaryLabel.pos and equal_class == BinaryLabel.pos: top = scores >= threshold # Test Outcome Positive ton = scores < threshold # Test Outcome Negative elif score_class == BinaryLabel.pos and equal_class == BinaryLabel.neg: top = scores > threshold ton = scores <= threshold elif score_class == BinaryLabel.neg and equal_class == BinaryLabel.pos: top = scores <= threshold ton = scores > threshold else: # score_class == BinaryLabel.neg and equal_class == BinaryLabel.neg top = scores < threshold ton = scores >= threshold # The confusion matrix cm = np.empty((scores.size, threshold.size, 2, 2), dtype=bool) cm[..., 0, 0] = pos & top cm[..., 0, 1] = pos & ton cm[..., 1, 0] = neg & top cm[..., 1, 1] = neg & ton # Restore shapes cm = np.reshape(cm, (-1, *threshold_shape, 2, 2)) cm = np.reshape(cm, (*scores_shape, *threshold_shape, 2, 2)) return cm