import logging
from abc import ABCMeta, abstractmethod
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
List,
Optional,
Tuple,
Type,
TypeVar,
Union,
no_type_check,
)
import numpy as np
import pandas as pd
from pydantic import BaseModel
from scipy import sparse as sps
if TYPE_CHECKING:
from optuna import Study, Trial
from .. import evaluation
ParameterSuggestFunctionType = Callable[["Trial"], Dict[str, Any]]
from ..definitions import (
DenseMatrix,
DenseScoreArray,
InteractionMatrix,
UserIndexArray,
)
from ..optimization.parameter_range import ParameterRange
R = TypeVar("R", bound="BaseRecommender")
def _sparse_to_array(U: Any) -> np.ndarray:
res: np.ndarray
if sps.issparse(U):
res = U.toarray()
return res
else:
res = U
return res
class CallBeforeFitError(Exception):
pass
class RecommenderConfig(BaseModel):
class Config:
extra = "forbid"
class RecommenderMeta(ABCMeta):
recommender_name_vs_recommender_class: Dict[str, "RecommenderMeta"] = {}
@no_type_check
def __new__(
mcs,
name,
bases,
namespace,
register_class: bool = True,
**kwargs,
):
cls = super().__new__(mcs, name, bases, namespace, **kwargs)
if register_class:
mcs.recommender_name_vs_recommender_class[name] = cls
return cls
[docs]class BaseRecommender(object, metaclass=RecommenderMeta):
"""The base class for all (hot) recommenders.
Args:
X_train_all (csr_matrix|csc_matrix|np.ndarray): user/item interaction matrix.
each row correspods to a user's interaction with items.
"""
config_class: Type[RecommenderConfig]
default_tune_range: List[ParameterRange]
X_train_all: sps.csr_matrix
"""The matrix to feed into recommender."""
[docs] def __init__(self, X_train_all: InteractionMatrix, **kwargs: Any) -> None:
self.X_train_all: sps.csr_matrix = sps.csr_matrix(X_train_all).astype(
np.float64
)
self.n_users: int = self.X_train_all.shape[0]
self.n_items: int = self.X_train_all.shape[1]
self.X_train_all.sort_indices()
# this will store configurable parameters learnt during the training,
# e.g., the epoch with the best validation score.
self.learnt_config: Dict[str, Any] = dict()
@classmethod
def from_config(
cls: Type[R],
X_train_all: InteractionMatrix,
config: RecommenderConfig,
) -> R:
if not isinstance(config, cls.config_class):
raise ValueError(
f"Different config has been given. config must be {cls.config_class}"
)
return cls(X_train_all, **config.dict())
[docs] def learn(self: R) -> R:
"""Learns and returns itself.
Returns:
The model after fitting process.
"""
self._learn()
return self
@abstractmethod
def _learn(self) -> None:
raise NotImplementedError("_learn must be implemented.")
[docs] def learn_with_optimizer(
self,
evaluator: Optional["evaluation.Evaluator"],
trial: Optional["Trial"],
max_epoch: int = 128,
validate_epoch: int = 5,
score_degradation_max: int = 5,
) -> None:
r"""Learning procedures with early stopping and pruning.
Args:
evaluator : The evaluator to measure the score.
trial : The current optuna trial under the study (if any.)
max_epoch:
Maximal number of epochs.
If iterative learning procedure is not available, this parameter will be ignored.
Defaults to 128.
validate_epoch:
The frequency of validation score measurement.
If iterative learning procedure is not available, this parameter will be ignored.
Defaults to 5.
validate_epoch:
The frequency of validation score measurement.
If iterative learning procedure is not available, this parameter will be ignored.
Defaults to 5.
score_degradation_max:
Maximal number of allowed score degradation.
If iterative learning procedure is not available, this parameter will be ignored.
Defaults to 5.
"""
# by default, evaluator & trial does not play any role.
self.learn()
@classmethod
def default_suggest_parameter(
cls, trial: "Trial", fixed_params: Dict[str, Any]
) -> Dict[str, Any]:
return {
s.name: s.suggest(trial)
for s in cls.default_tune_range
if s.name not in fixed_params.keys()
}
[docs] @classmethod
def tune(
cls,
data: Union[InteractionMatrix, None],
evaluator: "evaluation.Evaluator",
n_trials: int = 20,
timeout: Optional[int] = None,
data_suggest_function: Optional[Callable[["Trial"], InteractionMatrix]] = None,
parameter_suggest_function: Optional[ParameterSuggestFunctionType] = None,
fixed_params: Dict[str, Any] = dict(),
random_seed: Optional[int] = None,
prunning_n_startup_trials: int = 10,
max_epoch: int = 128,
validate_epoch: int = 5,
score_degradation_max: int = 5,
logger: Optional[logging.Logger] = None,
) -> Tuple[Dict[str, Any], pd.DataFrame]:
r"""Perform the optimization step.
`optuna.Study` object is created inside this function.
Args:
data:
The training data.
You can also provide tunable parameter dependent training data by providing `data_suggest_function`.
In that case, data must be `None`.
evaluator:
The validation evaluator that measures the performance of the recommenders.
n_trials:
The number of expected trials (including pruned ones). Defaults to 20.
timeout:
If set to some value (in seconds), the study will exit after that time period.
Note that the running trials is not interrupted, though. Defaults to `None`.
data_suggest_function:
If not `None`, this must be a function which takes `optuna.Trial` as its argument and returns training data. Defaults to `None`.
parameter_suggest_function:
If not `None`, this must be a function which takes `optuna.Trial` as its argument and returns `Dict[str, Any]` (i.e., some keyword arguments of the recommender class).
If `None`, `cls.default_suggest_parameter` will be used for the parameter suggestion.
Defaults to `None`.
fixed_params:
Fixed parameters passed to recommenders during the optimization procedure.
This will replace the suggested parameter (either by `cls.default_suggest_parameter` or `parameter_suggest_function`).
Defaults to `dict()`.
random_seed:
The random seed to control `optuna.samplers.TPESampler`. Defaults to `None`.
prunning_n_startup_trials:
`n_startup_trials` argument passed to the constructor of `optuna.pruners.MedianPruner`.
max_epoch:
The maximal number of epochs for the training.
If iterative learning procedure is not available, this parameter will be ignored.
validate_epoch (int, optional):
The frequency of validation score measurement.
If iterative learning procedure is not available, this parameter will be ignored.
Defaults to 5.
score_degradation_max (int, optional):
Maximal number of allowed score degradation.
If iterative learning procedure is not available, this parameter will be ignored.
Defaults to 5. Defaults to 5.
Returns:
A tuple that consists of
1. A dict containing the best paramaters.
This dict can be passed to the recommender as ``**kwargs``.
2. A ``pandas.DataFrame`` that contains the history of optimization.
"""
from optuna import create_study, pruners, samplers
study = create_study(
sampler=samplers.TPESampler(seed=random_seed),
pruner=pruners.MedianPruner(n_startup_trials=prunning_n_startup_trials),
)
return cls.tune_with_study(
study,
data=data,
evaluator=evaluator,
n_trials=n_trials,
timeout=timeout,
data_suggest_function=data_suggest_function,
parameter_suggest_function=parameter_suggest_function,
fixed_params=fixed_params,
max_epoch=max_epoch,
validate_epoch=validate_epoch,
score_degradation_max=score_degradation_max,
logger=logger,
)
@classmethod
def tune_with_study(
cls,
study: "Study",
data: Union[InteractionMatrix, None],
evaluator: "evaluation.Evaluator",
n_trials: int = 20,
timeout: Optional[int] = None,
data_suggest_function: Optional[Callable[["Trial"], InteractionMatrix]] = None,
parameter_suggest_function: Optional[ParameterSuggestFunctionType] = None,
fixed_params: Dict[str, Any] = dict(),
max_epoch: int = 128,
validate_epoch: int = 5,
score_degradation_max: int = 5,
logger: Optional[logging.Logger] = None,
) -> Tuple[Dict[str, Any], pd.DataFrame]:
from ..optimization.optimizer import Optimizer
if data is None:
if data_suggest_function is None:
raise ValueError(
"Either `data` or `data_sugget_function` must be provided."
)
_data_suggest_function = data_suggest_function
else:
def _data_suggest_function(trial: "Trial") -> InteractionMatrix:
return data
if parameter_suggest_function is not None:
_parameter_suggest_function = parameter_suggest_function
else:
def _parameter_suggest_function(trial: "Trial") -> Dict[str, Any]:
return cls.default_suggest_parameter(trial, fixed_params)
optim = Optimizer(
_data_suggest_function,
_parameter_suggest_function,
fixed_params,
evaluator,
max_epoch=max_epoch,
validate_epoch=validate_epoch,
score_degradation_max=score_degradation_max,
logger=logger,
)
return optim.optimize_with_study(study, cls, n_trials=n_trials, timeout=timeout)
[docs] @abstractmethod
def get_score(self, user_indices: UserIndexArray) -> DenseScoreArray:
"""Compute the item recommendation score for a subset of users.
Args:
user_indices : The index defines the subset of users.
Returns:
The item scores. Its shape will be (len(user_indices), self.n_items)
"""
raise NotImplementedError("get_score must be implemented") # pragma: no cover
[docs] def get_score_block(self, begin: int, end: int) -> DenseScoreArray:
"""Compute the score for a block of the users.
Args:
begin (int): where the evaluated user block begins.
end (int): where the evaluated user block ends.
Returns:
The item scores. Its shape will be (end - begin, self.n_items)
"""
raise NotImplementedError("get_score_block not implemented!")
[docs] def get_score_remove_seen(self, user_indices: UserIndexArray) -> DenseScoreArray:
"""Compute the item score and mask the item in the training set. Masked items will have the score -inf.
Args:
user_indices : Specifies the subset of users.
Returns:
The masked item scores. Its shape will be (len(user_indices), self.n_items)
"""
scores = self.get_score(user_indices)
if sps.issparse(scores):
scores = sps.csr_matrix(scores).toarray()
m = self.X_train_all[user_indices].tocsr()
scores[m.nonzero()] = -np.inf
return scores
[docs] def get_score_remove_seen_block(self, begin: int, end: int) -> DenseScoreArray:
"""Compute the score for a block of the users, and mask the items in the training set. Masked items will have the score -inf.
Args:
begin (int): where the evaluated user block begins.
end (int): where the evaluated user block ends.
Returns:
The masked item scores. Its shape will be (end - begin, self.n_items)
"""
scores = _sparse_to_array(self.get_score_block(begin, end))
m = self.X_train_all[begin:end]
scores[m.nonzero()] = -np.inf
return scores
[docs] def get_score_cold_user(self, X: InteractionMatrix) -> DenseScoreArray:
"""Compute the item recommendation score for unseen users whose profiles are given as another user-item relation matrix.
Args:
X : The profile user-item relation matrix for unseen users.
Its number of rows is arbitrary, but the number of columns must be self.n_items.
Returns:
Computed item scores for users. Its shape is equal to X.
"""
raise NotImplementedError(
f"get_score_cold_user is not implemented for {self.__class__.__name__}!"
) # pragma: no cover
[docs] def get_score_cold_user_remove_seen(self, X: InteractionMatrix) -> DenseScoreArray:
"""Compute the item recommendation score for unseen users whose profiles are given as another user-item relation matrix. The score will then be masked by the input.
Args:
X : The profile user-item relation matrix for unseen users.
Its number of rows is arbitrary, but the number of columns must be self.n_items.
Returns:
Computed & masked item scores for users. Its shape is equal to X.
"""
score = self.get_score_cold_user(X)
score[X.nonzero()] = -np.inf
return score
class BaseSimilarityRecommender(BaseRecommender):
"""The computed item-item similarity. Might not be initialized before `learn()` is called."""
_W: Optional[Union[sps.csr_matrix, sps.csc_matrix, np.ndarray]]
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self._W = None
@property
def W(self) -> Union[sps.csr_matrix, sps.csc_matrix, np.ndarray]:
"""The computed item-item similarity weight matrix."""
if self._W is None:
raise RuntimeError("W fetched before fit.")
return self._W
def get_score(self, user_indices: UserIndexArray) -> DenseScoreArray:
return _sparse_to_array(self.X_train_all[user_indices].dot(self.W))
def get_score_cold_user(self, X: InteractionMatrix) -> DenseScoreArray:
return _sparse_to_array(X.dot(self.W))
def get_score_block(self, begin: int, end: int) -> DenseScoreArray:
return _sparse_to_array(self.X_train_all[begin:end].dot(self.W))
class BaseUserSimilarityRecommender(BaseRecommender):
"""The computed user-user similarity. Might not be initialized before `learn()` is called."""
U_: Optional[Union[sps.csr_matrix, sps.csc_matrix, np.ndarray]]
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self._X_csc: sps.csc_matrix = self.X_train_all.tocsc()
self.U_ = None
@property
def U(self) -> Union[sps.csr_matrix, sps.csc_matrix, np.ndarray]:
"""The computed user-user similarity weight matrix."""
if self.U_ is None:
raise RuntimeError("W fetched before fit.")
return self.U_
def get_score(self, user_indices: UserIndexArray) -> DenseScoreArray:
return _sparse_to_array(self.U[user_indices].dot(self._X_csc).toarray())
def get_score_block(self, begin: int, end: int) -> DenseScoreArray:
return _sparse_to_array(self.U[begin:end].dot(self._X_csc))
class BaseRecommenderWithUserEmbedding:
r"""Defines a recommender with user embedding (e.g., matrix factorization.)."""
@abstractmethod
def get_user_embedding(
self,
) -> DenseMatrix:
"""Get user embedding vectors.
Returns:
The latent vector representation of users.
Its number of rows is equal to the number of the users.
"""
@abstractmethod
def get_score_from_user_embedding(
self, user_embedding: DenseMatrix
) -> DenseScoreArray:
"""Compute the item score from user embedding. Mainly used for cold-start scenario.
Args:
user_embedding : Latent user representation obtained elsewhere.
Returns:
DenseScoreArray: The score array. Its shape will be ``(user_embedding.shape[0], self.n_items)``
"""
raise NotImplementedError("get_score_from_item_embedding must be implemtented.")
class BaseRecommenderWithItemEmbedding:
"""Defines a recommender with item embedding (e.g., matrix factorization.)."""
@abstractmethod
def get_item_embedding(
self,
) -> DenseMatrix:
"""Get item embedding vectors.
Returns:
The latent vector representation of items.
Its number of rows is equal to the number of the items.
"""
raise NotImplementedError(
"get_item_embedding must be implemented."
) # pragma: no cover
@abstractmethod
def get_score_from_item_embedding(
self, user_indices: UserIndexArray, item_embedding: DenseMatrix
) -> DenseScoreArray:
raise NotImplementedError("get_score_from_item_embedding must be implemented.")
[docs]def get_recommender_class(recommender_name: str) -> Type[BaseRecommender]:
r"""Get recommender class from its class name.
Args:
recommender_name: The class name of the recommender.
Returns:
The recommender class with its class name being `recommender_name`.
"""
result: Type[
BaseRecommender
] = RecommenderMeta.recommender_name_vs_recommender_class[recommender_name]
return result