Source code for irspack.recommenders.ials

import enum
import logging
import pickle
from typing import IO, TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Union

import numpy as np
import pandas as pd
import scipy.sparse as sps
from optuna import Study
from typing_extensions import Literal  # pragma: no cover, type: ignore

from .. import evaluation
from .._threading import get_n_threads
from ..definitions import (
    DenseMatrix,
    DenseScoreArray,
    InteractionMatrix,
    UserIndexArray,
)
from ..evaluation.evaluator import Evaluator
from ..optimization.parameter_range import (
    LogUniformFloatRange,
    ParameterRange,
    UniformIntegerRange,
)
from ._ials import IALSModelConfigBuilder, IALSSolverConfigBuilder
from ._ials import IALSTrainer as CoreTrainer
from ._ials import LossType, SolverType
from .base import (
    BaseRecommenderWithItemEmbedding,
    BaseRecommenderWithUserEmbedding,
    ParameterSuggestFunctionType,
)
from .base_earlystop import (
    BaseEarlyStoppingRecommenderConfig,
    BaseRecommenderWithEarlyStopping,
    TrainerBase,
)

if TYPE_CHECKING:
    from optuna import Study, Trial
    from optuna.storages import RDBStorage


def str_to_solver_type(t: str) -> SolverType:
    result: SolverType = getattr(SolverType, t.upper())
    assert result in {SolverType.CG, SolverType.CHOLESKY, SolverType.IALSPP}
    return result


def str_to_loss_type(t: str) -> LossType:
    result: LossType = getattr(LossType, t.upper())
    assert result in {LossType.ORIGINAL, LossType.IALSPP}
    return result


class IALSTrainer(TrainerBase):
    def __init__(
        self,
        X: InteractionMatrix,
        n_components: int,
        alpha0: float,
        reg: float,
        nu: float,
        init_std: float,
        solver_type: SolverType,
        max_cg_steps: int,
        ialspp_subspace_dimension: int,
        loss_type: LossType,
        random_seed: int,
        n_threads: int,
        prediction_time_max_cg_steps: int,
        prediction_time_ialspp_iteration: int,
    ):
        X_train_all_f32 = X.astype(np.float32)
        config = (
            IALSModelConfigBuilder()
            .set_K(n_components)
            .set_init_stdev(init_std)
            .set_alpha0(alpha0)
            .set_reg(reg)
            .set_nu(nu)
            .set_loss_type(loss_type)
            .set_random_seed(random_seed)
            .build()
        )
        solver_config = (
            IALSSolverConfigBuilder()
            .set_n_threads(n_threads)
            .set_solver_type(solver_type)
            .set_max_cg_steps(max_cg_steps)
            .set_ialspp_iteration(1)
            .set_ialspp_subspace_dimension(ialspp_subspace_dimension)
            .build()
        )
        self.core_trainer = CoreTrainer(config, X_train_all_f32)
        self.solver_config = solver_config
        self.prediction_time_solver_config = (
            IALSSolverConfigBuilder()
            .set_n_threads(n_threads)
            .set_solver_type(solver_type)
            .set_max_cg_steps(prediction_time_max_cg_steps)
            .set_ialspp_subspace_dimension(ialspp_subspace_dimension)
            .set_ialspp_iteration(prediction_time_ialspp_iteration)
            .build()
        )

    def load_state(self, ifs: IO) -> None:
        params = pickle.load(ifs)
        self.core_trainer.user = params["user"]
        self.core_trainer.item = params["item"]

    def compute_loss(self) -> float:
        return self.core_trainer.compute_loss(self.solver_config)

    def save_state(self, ofs: IO) -> None:
        pickle.dump(
            dict(user=self.core_trainer.user, item=self.core_trainer.item),
            ofs,
            protocol=pickle.HIGHEST_PROTOCOL,
        )

    def run_epoch(self) -> None:
        self.core_trainer.step(self.solver_config)

    def user_scores(self, begin: int, end: int) -> DenseScoreArray:
        return self.core_trainer.user_scores(begin, end, self.solver_config)

    def transform_user(self, X: InteractionMatrix) -> DenseMatrix:
        return self.core_trainer.transform_user(X, self.prediction_time_solver_config)

    def transform_item(self, X: InteractionMatrix) -> DenseMatrix:
        return self.core_trainer.transform_item(X, self.prediction_time_solver_config)


class IALSConfigScaling(enum.Enum):
    none = enum.auto()
    log = enum.auto()


class IALSConfig(BaseEarlyStoppingRecommenderConfig):
    n_components: int = 20
    alpha0: float = 1.0
    reg: float = 1e-3
    nu: float = 1.0
    confidence_scaling: str = "none"
    epsilon: float = 1.0
    init_std: float = 0.1
    solver_type: Literal["CG", "CHOLESKY"] = "CG"
    max_cg_steps: int = 3
    loss_type: Literal["IALSPP", "ORIGINAL"] = "IALSPP"
    nu_star: Optional[float] = None
    random_seed: int = 42
    n_threads: Optional[int] = None
    train_epochs: int = 16
    prediction_time_max_cg_steps: int = 5


def compute_reg_scale(X: sps.csr_matrix, alpha0: float, nu: float) -> float:
    X_csr: sps.csr_matrix = X.tocsr()
    X_csr.sort_indices()
    X_csc = X_csr.tocsc()
    X_csc.sort_indices()
    U, I = X.shape
    nnz_row: np.ndarray = X_csr.indptr[1:] - X_csr.indptr[:-1]
    nnz_col: np.ndarray = X_csc.indptr[1:] - X_csc.indptr[:-1]
    return float(((nnz_row + alpha0 * I) ** nu).sum()) + float(
        ((nnz_col + alpha0 * U) ** nu).sum()
    )


[docs]class IALSRecommender( BaseRecommenderWithEarlyStopping, BaseRecommenderWithUserEmbedding, BaseRecommenderWithItemEmbedding, ): r"""Implementation of implicit Alternating Least Squares (iALS) or Weighted Matrix Factorization (WMF). By default, it tries to minimize the following loss: .. math :: \frac{1}{2} \sum _{u, i \in S} c_{ui} (\mathbf{u}_u \cdot \mathbf{v}_i - 1) ^ 2 + \frac{\alpha_0}{2} \sum_{u, i} (\mathbf{u}_u \cdot \mathbf{v}_i) ^ 2 + \frac{\text{reg}}{2} \left( \sum_u (\alpha_0 I + N_u) ^ \nu || \mathbf{u}_u || ^2 + \sum_i (\alpha_0 U + N_i) ^ \nu || \mathbf{v}_i || ^2 \right) where :math:`S` denotes the set of all pairs wher :math:`X_{ui}` is non-zero. See the seminal paper: - `Collaborative filtering for implicit feedback datasets <http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.167.5120&rep=rep1&type=pdf>`_ By default it uses a conjugate gradient descent version: - `Applications of the conjugate gradient method for implicit feedback collaborative filtering <https://dl.acm.org/doi/abs/10.1145/2043932.2043987>`_ The loss above is slightly different from the original version. See the following paper for the loss used here - `Revisiting the Performance of iALS on Item Recommendation Benchmarks <https://arxiv.org/abs/2110.14037>`_ Args: X_train_all (Union[scipy.sparse.csr_matrix, scipy.sparse.csc_matrix]): Input interaction matrix. n_components (int, optional): The dimension for latent factor. Defaults to 20. alpha0 (float, optional): The "unobserved" weight. reg (float, optional) : Regularization coefficient for both user & item factors. Defaults to 1e-3. nu (float, optional) : Controlles frequency regularization introduced in the paper, "Revisiting the Performance of iALS on Item Recommendation Benchmarks". confidence_scaling (str, optional) : Specifies how to scale confidence scaling :math:`c_{ui}`. Must be either "none" or "log". If "none", the non-zero (not-necessarily 1) :math:`X_{ui}` yields .. math :: c_{ui} = A + X_{ui} If "log", .. math :: c_{ui} = A + \log (1 + X_{ui} / \epsilon ) The constant :math:`A` above will be 0 if ``loss_type`` is ``"IALSPP"``, :math:`\alpha_0` if ``loss_type`` is ``"ORIGINAL"``. Defaults to "none". epsilon (float, optional): The :math:`\epsilon` parameter for log-scaling described above. Will not have any effect if `confidence_scaling` is "none". Defaults to 1.0f. init_std (float, optional): Standard deviation for initialization normal distribution. The actual std for each user/item vector components are scaled by `1 / n_components ** .5`. Defaults to 0.1. solver_type ( "CHOLESKY" | "CG" | "IALSPP", optional): Which solver to use. Defaults to "CG". max_cg_steps (int, optional): Maximal number of conjute gradient descent steps during the training time. Defaults to 3. Used only when ``solver_type=="CG"``. By increasing this parameter, the result will be closer to Cholesky decomposition method (i.e., when ``solver_type == "CHOLESKY"``), but it wll take longer time. ialspp_subspace_dimension (int, optional): The subspace dimension of iALS++ (ignored if the ``solver_type`` is not "IALSPP"). If this value is 1, specialized implementation described in `Fast Matrix Factorization for Online Recommendation with Implicit Feedback <https://arxiv.org/abs/1708.05024>`_ will be used instead. Defaults to 64. loss_type ( Literal["IALSPP", "ORIGINAL"], optional): Specifies the subtle difference between iALS++ vs Original Loss. nu_star (Optional[float], optional): If not `None`, used as the reference scale for nu described in the "Revisiting..." paper. Defaults to None. random_seed (int, optional): The random seed to initialize the parameters. n_threads (Optional[int], optional): Specifies the number of threads to use for the computation. If ``None``, the environment variable ``"IRSPACK_NUM_THREADS_DEFAULT"`` will be looked up, and if the variable is not set, it will be set to ``os.cpu_count()``. Defaults to None. train_epochs (int, optional): Maximal number of epochs. Defaults to 16. prediction_time_max_cg_steps (int, optional): Maximal number of conjute gradient descent steps during the prediction time, i.e., the case when a user unseen at the training time is given as a history matrix. Defaults to 5. Examples: >>> from irspack import IALSRecommender, rowwise_train_test_split, Evaluator >>> from irspack.utils.sample_data import mf_example_data >>> X = mf_example_data(100, 30, random_state=1) >>> X_train, X_test = rowwise_train_test_split(X, random_state=0) >>> rec = IALSRecommender(X_train) >>> rec.learn() >>> evaluator=Evaluator(X_test) >>> print(evaluator.get_scores(rec, [20])) OrderedDict([('hit@20', 1.0), ('recall@20', 0.9003412698412698), ('ndcg@20', 0.6175493479217139), ('map@20', 0.3848785870622406), ('precision@20', 0.3385), ('gini_index@20', 0.0814), ('entropy@20', 3.382497875272383), ('appeared_item@20', 30.0)]) """ config_class = IALSConfig default_tune_range: List[ParameterRange] = [ UniformIntegerRange("n_components", 4, 300), LogUniformFloatRange("alpha0", 3e-3, 1), LogUniformFloatRange("reg", 1e-4, 1e-1), ]
[docs] def __init__( self, X_train_all: InteractionMatrix, n_components: int = 20, alpha0: float = 0.0, reg: float = 1e-3, nu: float = 1.0, confidence_scaling: str = "none", epsilon: float = 1.0, init_std: float = 0.1, solver_type: Literal["CG", "CHOLESKY", "IALSPP"] = "CG", max_cg_steps: int = 3, ialspp_subspace_dimension: int = 64, loss_type: Literal["IALSPP", "ORIGINAL"] = "IALSPP", nu_star: Optional[float] = None, random_seed: int = 42, n_threads: Optional[int] = None, train_epochs: int = 16, prediction_time_max_cg_steps: int = 5, prediction_time_ialspp_iteration: int = 7, ) -> None: super().__init__( X_train_all, train_epochs=train_epochs, ) self.n_components = n_components self.alpha0 = alpha0 self.reg = reg self.nu = nu self.confidence_scaling = IALSConfigScaling[confidence_scaling] self.epsilon = epsilon self.init_std = init_std self.solver_type = str_to_solver_type(solver_type) self.max_cg_steps = max_cg_steps self.ialspp_subspace_dimension = ialspp_subspace_dimension self.random_seed = random_seed self.n_threads = get_n_threads(n_threads) self.loss_type = str_to_loss_type(loss_type) self.nu_star = nu_star self.scaled_reg = self.reg if self.nu_star is not None: self.scaled_reg = ( self.reg * compute_reg_scale(self.X_train_all, alpha0, self.nu_star) / compute_reg_scale(self.X_train_all, alpha0, nu) ) self.prediction_time_max_cg_steps = prediction_time_max_cg_steps self.prediction_time_ialspp_iteration = prediction_time_ialspp_iteration self.trainer: Optional[IALSTrainer] = None
@classmethod def _scale_X( cls, X: sps.csr_matrix, scheme: IALSConfigScaling, epsilon: float ) -> sps.csr_matrix: if scheme is IALSConfigScaling.none: return X else: X_ret: sps.csr_matrix = X.copy() X_ret.data = np.log(1 + X_ret.data / epsilon) return X_ret def _create_trainer(self) -> TrainerBase: return IALSTrainer( X=self._scale_X(self.X_train_all, self.confidence_scaling, self.epsilon), n_components=self.n_components, alpha0=self.alpha0, reg=self.scaled_reg, nu=self.nu, init_std=self.init_std, solver_type=self.solver_type, max_cg_steps=self.max_cg_steps, ialspp_subspace_dimension=self.ialspp_subspace_dimension, loss_type=self.loss_type, random_seed=self.random_seed, n_threads=self.n_threads, prediction_time_max_cg_steps=self.prediction_time_max_cg_steps, prediction_time_ialspp_iteration=self.prediction_time_ialspp_iteration, ) @property def trainer_as_ials(self) -> IALSTrainer: if self.trainer is None: raise RuntimeError("tried to fetch trainer before the training.") return self.trainer
[docs] def get_score(self, user_indices: UserIndexArray) -> DenseScoreArray: res: DenseScoreArray = self.trainer_as_ials.core_trainer.user[user_indices].dot( self.get_item_embedding().T ) return res
[docs] def get_score_block(self, begin: int, end: int) -> DenseScoreArray: return self.trainer_as_ials.user_scores(begin, end)
[docs] def get_score_cold_user(self, X: InteractionMatrix) -> DenseScoreArray: user_vector = self.compute_user_embedding(X) return self.get_score_from_user_embedding(user_vector)
[docs] def get_user_embedding(self) -> DenseMatrix: return self.trainer_as_ials.core_trainer.user
[docs] def get_score_from_user_embedding( self, user_embedding: DenseMatrix ) -> DenseScoreArray: res: DenseScoreArray = user_embedding.dot(self.get_item_embedding().T) return res
[docs] def get_item_embedding(self) -> DenseMatrix: return self.trainer_as_ials.core_trainer.item
[docs] def compute_user_embedding(self, X: InteractionMatrix) -> DenseMatrix: r"""Given an unknown users' interaction with known items, computes the latent factors of the users by least square (fixing item embeddings). parameters: X: The interaction history of the new users. ``X.shape[1]`` must be equal to ``self.n_items``. """ return self.trainer_as_ials.transform_user( self._scale_X( sps.csr_matrix(X).astype(np.float32), self.confidence_scaling, self.epsilon, ) )
[docs] def compute_item_embedding(self, X: InteractionMatrix) -> DenseMatrix: r"""Given an unknown items' interaction with known user, computes the latent factors of the items by least square (fixing user embeddings). parameters: X: The interaction history of the new users. ``X.shape[0]`` must be equal to ``self.n_users``. """ return self.trainer_as_ials.transform_item( self._scale_X( sps.csr_matrix(X).astype(np.float32), self.confidence_scaling, self.epsilon, ) )
def get_score_from_item_embedding( self, user_indices: UserIndexArray, item_embedding: DenseMatrix ) -> DenseScoreArray: res: DenseScoreArray = self.trainer_as_ials.core_trainer.user[user_indices].dot( item_embedding.T ) return res
[docs] @classmethod def tune_doubling_dimension( cls, data: InteractionMatrix, evaluator: Evaluator, initial_dimension: int, maximal_dimension: int, storage: Optional["RDBStorage"] = None, study_name_prefix: Optional[str] = None, n_trials_initial: int = 40, n_trials_following: int = 20, n_startup_trials_initial: int = 10, n_startup_trials_following: int = 5, max_epoch: int = 16, validate_epoch: int = 1, score_degradation_max: int = 3, neighborhood_scale: float = 3.0, suggest_function_initial: Optional[ParameterSuggestFunctionType] = None, random_seed: Optional[int] = None, ) -> Tuple[Dict[str, Any], pd.DataFrame]: r"""Perform tuning gradually doubling `n_components`. Typically, with the initial `n_components`, the search will be more exhaustive, and with larger `n_components`, less exploration will be done around previously found parameters. This strategy is described in `Revisiting the Performance of iALS on Item Recommendation Benchmarks <https://arxiv.org/abs/2110.14037>`_. Args: initial_dimension: The initial dimension. maximal_dimension: The maximal (inclusive) dimension to be tried. storage: The storage where multiple `optuna.Study` will be created corresponding to the various dimensions. If `None`, all `Study` will be created in-memory. study_name_prefix: The prefix for the names of `optuna.Study`. For dimension `d`, the full name of the `Study` will be `"{study_name_prefix}_{d}"`. If `None`, we will use a random string for this prefix. n_trials_initial: The number of trials for the initial dimension. n_trials_following: The number of trials for the following dimensions. n_startup_trials_initial: Passed on to `n_startup_trials` argument of `optuna.pruners.MedianPruner` in the initial `optuna.Study`. Defaults to `10`. n_startup_trials_following: Passed on to `n_startup_trials` argument of `optuna.pruners.MedianPruner` in the following `optuna.Study`. Defaults to `5`. neighborhood_scale: `alpha_0` and `reg` parameters will be searched within the log-uniform range [`previous_dimension_result / neighborhood_scale`, `previous_dimension_result * neighborhood_scale`]. Defaults to `3.0` suggest_overwrite_initial: Overwrites the suggestion parameters in the initial `optuna.Study`. Defaults to `[]`. random_seed: The random seed to control ``optuna.samplers.TPESampler``. Defaults to `None`. Returns: A tuple that consists of 1. A dict containing the best paramaters. This dict can be passed to the recommender as ``**kwargs``. 2. A ``pandas.DataFrame`` that contains the history of optimization for all dimensions. """ from uuid import uuid1 import optuna if study_name_prefix is None: study_name_prefix = str(uuid1()) dimension = initial_dimension prev_params: Optional[Dict[str, Any]] = None results: List[Tuple[float, Dict[str, Any], pd.DataFrame]] = [] while dimension <= maximal_dimension: _suggest: Optional[ParameterSuggestFunctionType] if random_seed is not None: random_seed += 1 if prev_params is None: _suggest = suggest_function_initial n_trials = n_trials_initial n_startup_trials = n_startup_trials_initial else: n_trials = n_trials_following n_startup_trials = n_startup_trials_following _prev_params_copied = prev_params.copy() def _suggest(trial: "Trial") -> Dict[str, Any]: result: Dict[str, Any] = {} for key, value in _prev_params_copied.items(): if isinstance(value, float): result[key] = trial.suggest_float( key, value / neighborhood_scale, value * neighborhood_scale, log=True, ) return result study = optuna.create_study( storage=storage, sampler=optuna.samplers.TPESampler(seed=random_seed), pruner=optuna.pruners.MedianPruner(n_startup_trials=n_startup_trials), study_name=f"{study_name_prefix}_{dimension}", ) bp, df_ = cls.tune_with_study( study, data, evaluator, n_trials, max_epoch=max_epoch, validate_epoch=validate_epoch, score_degradation_max=score_degradation_max, parameter_suggest_function=_suggest, fixed_params=dict(n_components=dimension), ) df_["n_components"] = dimension results.append((float(df_["value"].min()), bp, df_)) prev_params = study.best_params.copy() dimension *= 2 final_result_df = pd.concat([x[2] for x in results]) final_bp = sorted(results)[0][1] return final_bp, final_result_df
@classmethod def tune_with_study( cls, study: "Study", data: Union[InteractionMatrix, None], evaluator: "evaluation.Evaluator", n_trials: int = 20, timeout: Optional[int] = None, data_suggest_function: Optional[Callable[["Trial"], InteractionMatrix]] = None, parameter_suggest_function: Optional[ParameterSuggestFunctionType] = None, fixed_params: Dict[str, Any] = dict(), max_epoch: int = 16, validate_epoch: int = 1, score_degradation_max: int = 3, logger: Optional[logging.Logger] = None, ) -> Tuple[Dict[str, Any], pd.DataFrame]: return super().tune_with_study( study, data, evaluator, n_trials, timeout, data_suggest_function, parameter_suggest_function, fixed_params, max_epoch, validate_epoch, score_degradation_max, logger, )
[docs] @classmethod def tune( cls, data: Union[InteractionMatrix, None], evaluator: "evaluation.Evaluator", n_trials: int = 20, timeout: Optional[int] = None, data_suggest_function: Optional[Callable[["Trial"], InteractionMatrix]] = None, parameter_suggest_function: Optional[ParameterSuggestFunctionType] = None, fixed_params: Dict[str, Any] = dict(), random_seed: Optional[int] = None, prunning_n_startup_trials: int = 10, max_epoch: int = 16, validate_epoch: int = 1, score_degradation_max: int = 3, logger: Optional[logging.Logger] = None, ) -> Tuple[Dict[str, Any], pd.DataFrame]: return super().tune( data, evaluator, n_trials, timeout, data_suggest_function, parameter_suggest_function, fixed_params, random_seed, prunning_n_startup_trials, max_epoch, validate_epoch, score_degradation_max, logger, )