Source code for irspack.evaluation.evaluator

import warnings
from collections import OrderedDict
from enum import Enum, auto
from typing import TYPE_CHECKING, Dict, List, Optional, Union

import numpy as np
from scipy import sparse as sps

from .._threading import get_n_threads
from ..definitions import DenseScoreArray, InteractionMatrix
from ._core import EvaluatorCore, Metrics

if TYPE_CHECKING:
    from ..recommenders.base import BaseRecommender


class TargetMetric(Enum):
    ndcg = auto()
    recall = auto()
    hit = auto()
    map = auto()
    precision = auto()


METRIC_NAMES = [
    "hit",
    "recall",
    "ndcg",
    "map",
    "precision",
    "gini_index",
    "entropy",
    "appeared_item",
]


[docs]class Evaluator: r"""Evaluates recommenders' performance against validation set. Args: ground_truth (Union[scipy.sparse.csr_matrix, scipy.sparse.csc_matrix]): The ground-truth. offset (int): Where the validation target user block begins. Often the validation set is defined for a subset of users. When offset is not 0, we assume that the users with validation ground truth corresponds to X_train[offset:] where X_train is the matrix feeded into the recommender class. Defaults to 0. cutoff (int, optional): Controls the default number of recommendation. When the evaluator is used for parameter tuning, this cutoff value will be used. Defaults to 10. target_metric (str, optional): Specifies the target metric when this evaluator is used for parameter tuning. Defaults to "ndcg". recommendable_items (Optional[List[int]], optional): Global recommendable items. Defaults to None. If this parameter is not None, evaluator will be concentrating on the recommender's score output for these recommendable_items, and compute the ranking performance within this subset. per_user_recommendable_items: Similar to `recommendable_items`, but this time the recommendable items can vary among users. If a sparse matrix is given, its nonzero indices are regarded as the list of recommendable items. Defaults to `None`. masked_interactions: If set, this matrix masks the score output of recommender model where it is non-zero. If none, the mask will be the training matrix itself owned by the recommender. n_threads: Specifies the Number of threads to sort scores and compute the evaluation metrics. If `None`, the environment variable ``"IRSPACK_NUM_THREADS_DEFAULT"` will be looked up, and if the variable is not set, it will be set to ``os.cpu_count()``. Defaults to `None`. recall_with_cutoff (bool, optional): This affects the definition of recall. If ``True``, for each user, recall will be computed as .. math :: \frac{N_{\text{hit}}}{\min(\text{cutoff}, N_{\text{ground truth}})} If ``False``, this will be .. math :: \frac{N_{\text{hit}}}{N_{\text{ground truth}}} mb_size (int, optional): The rows of chunked user score. Defaults to 1024. """ n_users: int n_items: int masked_interactions: Optional[sps.csr_matrix]
[docs] def __init__( self, ground_truth: InteractionMatrix, offset: int = 0, cutoff: int = 10, target_metric: str = "ndcg", recommendable_items: Optional[List[int]] = None, per_user_recommendable_items: Union[ None, List[List[int]], InteractionMatrix ] = None, masked_interactions: Optional[InteractionMatrix] = None, n_threads: Optional[int] = None, recall_with_cutoff: bool = False, mb_size: int = 128, ) -> None: ground_truth = ground_truth.tocsr().astype(np.float64) ground_truth.sort_indices() if recommendable_items is None: if per_user_recommendable_items is None: recommendable_items_arg: List[List[int]] = [] else: if sps.issparse(per_user_recommendable_items): per_user_as_csr = sps.csr_matrix(per_user_recommendable_items) recommendable_items_arg = [ [int(j) for j in row.nonzero()[1]] for row in per_user_as_csr ] else: recommendable_items_arg = per_user_recommendable_items if len(recommendable_items_arg) != ground_truth.shape[0]: raise ValueError( "ground_truth and per_user_recommendable_items have inconsistent shapes." ) else: recommendable_items_arg = [recommendable_items] self.core = EvaluatorCore(ground_truth, recommendable_items_arg) self.offset = offset self.n_users = ground_truth.shape[0] self.n_items = ground_truth.shape[1] self.target_metric = TargetMetric[target_metric] self.cutoff = cutoff self.target_metric_name = f"{self.target_metric.name}@{self.cutoff}" self.n_threads = get_n_threads(n_threads) self.mb_size = mb_size if masked_interactions is None: self.masked_interactions = None else: if masked_interactions.shape != ground_truth.shape: raise ValueError( "ground_truth and masked_interactions have different shapes. " ) self.masked_interactions = sps.csr_matrix(masked_interactions) self.recall_with_cutoff = recall_with_cutoff
def _get_metrics( self, scores: DenseScoreArray, cutoff: int, ground_truth_begin: int ) -> Metrics: if scores.dtype == np.float64: return self.core.get_metrics_f64( scores, cutoff, ground_truth_begin, self.n_threads, self.recall_with_cutoff, ) elif scores.dtype == np.float32: return self.core.get_metrics_f32( scores, cutoff, ground_truth_begin, self.n_threads, self.recall_with_cutoff, ) else: raise ValueError("score must be either float32 or float64.")
[docs] def get_target_score(self, model: "BaseRecommender") -> float: r"""Compute the optimization target score (self.target_metric) with the cutoff being ``self.cutoff``. Args: model: The evaluated model. Returns: The metric value. """ return self.get_score(model)[self.target_metric.name]
[docs] def get_score(self, model: "BaseRecommender") -> Dict[str, float]: r"""Compute the score with the cutoff being ``self.cutoff``. Args: model : The evaluated recommender. Returns: metric values. """ return self._get_scores_as_list(model, [self.cutoff])[0]
[docs] def get_scores( self, model: "BaseRecommender", cutoffs: List[int] ) -> Dict[str, float]: r"""Compute the score with the specified cutoffs. Args: model : The evaluated recommender. cutoffs : for each value in cutoff, the class computes the metric values. Returns: The Resulting metric values. This time, the result will look like ``{"ndcg@20": 0.35, "map@20": 0.2, ...}``. """ result: Dict[str, float] = OrderedDict() scores = self._get_scores_as_list(model, cutoffs) for cutoff, score in zip(cutoffs, scores): for metric_name in METRIC_NAMES: result[f"{metric_name}@{cutoff}"] = score[metric_name] return result
def _get_scores_as_list( self, model: "BaseRecommender", cutoffs: List[int] ) -> List[Dict[str, float]]: if self.offset + self.n_users > model.n_users: raise ValueError("evaluator offset + n_users exceeds the model's n_users.") if self.n_items != model.n_items: raise ValueError("The model and evaluator assume different n_items.") n_items = self.n_items metrics: List[Metrics] = [] for c in cutoffs: metrics.append(Metrics(n_items)) block_start = self.offset n_validated = self.n_users block_end = block_start + n_validated mb_size = self.mb_size for chunk_start in range(block_start, block_end, mb_size): chunk_end = min(chunk_start + mb_size, block_end) try: # try faster method scores = model.get_score_block(chunk_start, chunk_end) except NotImplementedError: # block-by-block scores = model.get_score(np.arange(chunk_start, chunk_end)) if self.masked_interactions is None: mask = model.X_train_all[chunk_start:chunk_end] else: mask = self.masked_interactions[ chunk_start - self.offset : chunk_end - self.offset ] scores[mask.nonzero()] = -np.inf for i, c in enumerate(cutoffs): chunked_metric = self._get_metrics( scores, c, chunk_start - self.offset, ) metrics[i].merge(chunked_metric) return [item.as_dict() for item in metrics]
[docs]class EvaluatorWithColdUser(Evaluator): r"""Evaluates recommenders' performance against cold (unseen) users. Args: input_interaction (Union[scipy.sparse.csr_matrix, scipy.sparse.csc_matrix]): The cold-users' known interaction with the items. ground_truth (Union[scipy.sparse.csr_matrix, scipy.sparse.csc_matrix]): The held-out ground-truth. offset (int): Where the validation target user block begins. Often the validation set is defined for a subset of users. When offset is not 0, we assume that the users with validation ground truth corresponds to X_train[offset:] where X_train is the matrix feeded into the recommender class. cutoff (int, optional): Controls the number of recommendation. Defaults to 10. target_metric (str, optional): Optimization target metric. Defaults to "ndcg". recommendable_items (Optional[List[int]], optional): Global recommendable items. Defaults to None. If this parameter is not None, evaluator will be concentrating on the recommender's score output for these recommendable_items, and compute the ranking performance within this subset. per_user_recommendable_items (Optional[List[List[int]]], optional): Similar to `recommendable_items`, but this time the recommendable items can vary among users. Defaults to None. masked_interactions (Optional[Union[scipy.sparse.csr_matrix, scipy.sparse.csc_matrix]], optional): If set, this matrix masks the score output of recommender model where it is non-zero. If none, the mask will be the training matrix (``input_interaction``) it self. n_threads (int, optional): Specifies the Number of threads to sort scores and compute the evaluation metrics. If ``None``, the environment variable ``"IRSPACK_NUM_THREADS_DEFAULT"`` will be looked up, and if the variable is not set, it will be set to ``os.cpu_count()``. Defaults to None. recall_with_cutoff (bool, optional): This affects the definition of recall. If ``True``, for each user, recall will be evaluated by .. math :: \frac{N_{\text{hit}}}{\min( \text{cutoff}, N_{\text{ground truth}} )} If ``False``, this will be .. math :: \frac{N_{\text{hit}}}{N_{\text{ground truth}}} mb_size (int, optional): The rows of chunked user score. Defaults to 1024. """
[docs] def __init__( self, input_interaction: InteractionMatrix, ground_truth: InteractionMatrix, cutoff: int = 10, target_metric: str = "ndcg", recommendable_items: Optional[List[int]] = None, per_user_recommendable_items: Union[ None, List[List[int]], InteractionMatrix ] = None, masked_interactions: Optional[InteractionMatrix] = None, n_threads: Optional[int] = None, recall_with_cutoff: bool = False, mb_size: int = 1024, ): super().__init__( ground_truth, offset=0, cutoff=cutoff, target_metric=target_metric, recommendable_items=recommendable_items, per_user_recommendable_items=per_user_recommendable_items, masked_interactions=masked_interactions, n_threads=n_threads, recall_with_cutoff=recall_with_cutoff, mb_size=mb_size, ) self.input_interaction = input_interaction
def _get_scores_as_list( self, model: "BaseRecommender", cutoffs: List[int], ) -> List[Dict[str, float]]: n_items = model.n_items metrics: List[Metrics] = [] for c in cutoffs: metrics.append(Metrics(n_items)) block_start = self.offset n_validated = self.n_users block_end = block_start + n_validated mb_size = self.mb_size for chunk_start in range(block_start, block_end, mb_size): chunk_end = min(chunk_start + mb_size, block_end) scores = model.get_score_cold_user( self.input_interaction[chunk_start:chunk_end] ) if self.masked_interactions is None: mask = self.input_interaction[chunk_start:chunk_end] else: mask = self.masked_interactions[chunk_start:chunk_end] scores[mask.nonzero()] = -np.inf if not scores.flags.c_contiguous: warnings.warn( "Found col-major(fortran-style) score values.\n" "Transforming it to row-major score matrix." ) scores = np.ascontiguousarray(scores, dtype=np.float64) for i, c in enumerate(cutoffs): chunked_metric = self._get_metrics(scores, c, chunk_start) metrics[i].merge(chunked_metric) return [item.as_dict() for item in metrics]