Recsim Catalyst

We will create a recommender bot by neural networks, and use RL methods to train it.

Abstract

We propose RecSim, a configurable platform for authoring simulation environments for recommender systems (RSs) that naturally supports sequential interaction with users. RecSim allows the creation of new environments that reflect particular aspects of user behavior and item structure at a level of abstraction well-suited to pushing the limits of current reinforcement learning (RL) and RS techniques in sequential interactive recommendation problems. Environments can be easily configured that vary assumptions about: user preferences and item familiarity; user latent state and its dynamics; and choice models and other user response behavior. We outline how RecSim offers value to RL and RS researchers and practitioners, and how it can serve as a vehicle for academic-industrial collaboration.

https://arxiv.org/abs/1909.04847

GitHub, Video, Medium

RecSim is a configurable platform for authoring simulation environments for recommender systems (RSs) that naturally supports sequential interaction with users. RecSim allows the creation of new environments that reflect particular aspects of user behavior and item structure at a level of abstraction well-suited to pushing the limits of current reinforcement learning (RL) and RS techniques in sequential interactive recommendation problems. Environments can be easily configured that vary assumptions about: user preferences and item familiarity; user latent state and its dynamics; and choice models and other user response behavior. We outline how RecSim offers value to RL and RS researchers and practitioners, and how it can serve as a vehicle for academic-industrial collaboration. For a detailed description of the RecSim architecture please read Ie et al. Please cite the paper if you use the code from this repository in your work.

RecSim simulates a recommender agent’s interaction with an environment where the agent interacts by doing some recommendations to users. Both the user and the subject of recommendations are simulated. The simulations are done based on popularity, interests, demographics, frequency and other traits. When an RL agent recommends something to a user, then depending on the user’s acceptance, few traits are scored high. This still sounds like a typical recommendation system. However, with RecSim, a developer can author these traits. The features in a user choice model can be made more customised as the agent gets rewarded for making the right recommendation.

Green and blue boxes show the environment. We need to implement special classes, User and Document. Our bot("Agent") have to choose from several documents the most relevant for the user. The user can move to the offered document if he accepts it, to random document overwise or stay on the current document.

Green and blue boxes show the environment. We need to implement special classes, User and Document. Our bot(“Agent”) have to choose from several documents the most relevant for the user. The user can move to the offered document if he accepts it, to random document overwise or stay on the current document.

Recsim is a configurable simulation platform for recommender systems make by Google, which utilized the document and user database directly. We can break Recsim into two parts,

  • The environment consists of a user model, a document (item) model and a user-choice model. The user model samples users from a prior distribution of observable and latent user features; the document model samples items from a prior over observable and latent document features; and the user-choice model determines the user’s response, which is dependent on observable document features, observable and latent user features.

  • The SlateQ Simulation Environment, which uses the SlateQ Algorithm to return a slate of items back to the simulation environment.

Unlike virtual Taobao, Recsim has a concrete representation of items, and the actions returned by the reinforcement learning agent can be directly associated with items. However, the user model and item model of Recsim are too simple, and without sufficient data support, the prior probability distribution for generating simulated users and virtual items is difficult to be accurate.

Setup

!pip install -Uq catalyst gym recsim
from collections import deque, namedtuple
import random
import numpy as np
import gym

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader

from catalyst import dl, utils

from gym import spaces

from recsim import document, user
from recsim.choice_model import AbstractChoiceModel
from recsim.simulator import recsim_gym, environment

Params

device = utils.get_device()
utils.set_global_seed(42)

DOC_NUM = 10
EMB_SIZE = 4
P_EXIT_ACCEPTED = 0.1
P_EXIT_NOT_ACCEPTED = 0.2

# let's define a matrix W for simulation of users' respose
# (based on the section 7.3 of the paper https://arxiv.org/pdf/1512.07679.pdf)
# W_ij defines the probability that a user will accept recommendation j
# given that he is consuming item i at the moment

W = (np.ones((DOC_NUM, DOC_NUM)) - np.eye(DOC_NUM)) * \
     np.random.uniform(0.0, P_EXIT_NOT_ACCEPTED, (DOC_NUM, DOC_NUM)) + \
     np.diag(np.random.uniform(1.0 - P_EXIT_ACCEPTED, 1.0, DOC_NUM))
W = W[:, np.random.permutation(DOC_NUM)]

Document Model

class Document(document.AbstractDocument):

    def __init__(self, doc_id):
        super().__init__(doc_id)

    def create_observation(self):
        return (self._doc_id,)

    @staticmethod
    def observation_space():
        return spaces.Discrete(DOC_NUM)

    def __str__(self):
        return "Document #{}".format(self._doc_id)


class DocumentSampler(document.AbstractDocumentSampler):

    def __init__(self, doc_ctor=Document):
        super().__init__(doc_ctor)
        self._doc_count = 0

    def sample_document(self):
        doc = self._doc_ctor(self._doc_count % DOC_NUM)
        self._doc_count += 1
        return doc

User Model

class UserState(user.AbstractUserState):
    def __init__(self, user_id, current, active_session=True):
        self.user_id = user_id
        self.current = current
        self.active_session = active_session

    def create_observation(self):
        return (self.current,)

    def __str__(self):
        return "User #{}".format(self.user_id)

    @staticmethod
    def observation_space():
        return spaces.Discrete(DOC_NUM)

    def score_document(self, doc_obs):
        return W[self.current, doc_obs[0]]


class StaticUserSampler(user.AbstractUserSampler):
    def __init__(self, user_ctor=UserState):
        super().__init__(user_ctor)
        self.user_count = 0

    def sample_user(self):
        self.user_count += 1
        sampled_user = self._user_ctor(
            self.user_count, np.random.randint(DOC_NUM))
        return sampled_user


class Response(user.AbstractResponse):
    def __init__(self, accept=False):
        self.accept = accept

    def create_observation(self):
        return (int(self.accept),)

    @classmethod
    def response_space(cls):
        return spaces.Discrete(2)


class UserChoiceModel(AbstractChoiceModel):
    def __init__(self):
        super().__init__()
        self._score_no_click = P_EXIT_ACCEPTED

    def score_documents(self, user_state, doc_obs):
        if len(doc_obs) != 1:
            raise ValueError(
                "Expecting single document, but got: {}".format(doc_obs))
        self._scores = np.array(
            [user_state.score_document(doc) for doc in doc_obs])

    def choose_item(self):
        if np.random.random() < self.scores[0]:
            return 0


class UserModel(user.AbstractUserModel):
    def __init__(self):
        super().__init__(Response, StaticUserSampler(), 1)
        self.choice_model = UserChoiceModel()

    def simulate_response(self, slate_documents):
        if len(slate_documents) != 1:
            raise ValueError("Expecting single document, but got: {}".format(
                slate_documents))

        responses = [self._response_model_ctor() for _ in slate_documents]

        self.choice_model.score_documents(
            self._user_state,
            [doc.create_observation() for doc in slate_documents]
        )
        selected_index = self.choice_model.choose_item()

        if selected_index is not None:
            responses[selected_index].accept = True

        return responses

    def update_state(self, slate_documents, responses):
        if len(slate_documents) != 1:
            raise ValueError(
                f"Expecting single document, but got: {slate_documents}"
            )

        response = responses[0]
        doc = slate_documents[0]
        if response.accept:
            self._user_state.current = doc.doc_id()
            self._user_state.active_session = bool(
                np.random.binomial(1, 1 - P_EXIT_ACCEPTED))
        else:
            self._user_state.current = np.random.choice(DOC_NUM)
            self._user_state.active_session = bool(
                np.random.binomial(1, 1 - P_EXIT_NOT_ACCEPTED))

    def is_terminal(self):
        """Returns a boolean indicating if the session is over."""
        return not self._user_state.active_session


def clicked_reward(responses):
    reward = 0.0
    for response in responses:
        if response.accept:
            reward += 1
    return reward

RecSim Environment

def make_env():
    env = recsim_gym.RecSimGymEnv(
        environment.Environment(
            UserModel(), 
            DocumentSampler(), 
            DOC_NUM, 
            1, 
            resample_documents=False
        ),
        clicked_reward
    )
    return env

Actor-Critic Policy

The actor is a simple NN, that generate embedding action vector based on current state. The critic model is more complicated. In our implementation, we need action embeddings. Our actions is a picking a document. So, we just need a embedding vector for each document. They can be trained as well as a critic model. And we have to implement choosing process by choosing top-k variants and calculate q-value on them.

from catalyst.contrib.nn import Normalize


inner_fn = utils.get_optimal_inner_init(nn.ReLU)
outer_fn = utils.outer_init


class ActorModel(nn.Module):
    def __init__(self, hidden=64, doc_num=10, doc_emb_size=4):
        super().__init__()
        
        self.actor = nn.Sequential(
            nn.Linear(doc_num, hidden),
            nn.ReLU(),
            nn.Linear(hidden, hidden),
            nn.ReLU(),
        )
        self.head = nn.Sequential(
            nn.Linear(hidden, doc_emb_size),
            Normalize()
        )
        
        self.actor.apply(inner_fn)
        self.head.apply(outer_fn)
        
        self.doc_num = doc_num
        self.doc_emb_size = doc_emb_size
        
    def forward(self, states):
        return self.head(self.actor(states))
class CriticModel(nn.Module):
    def __init__(self, hidden=64, doc_num=10, doc_emb_size=4):
        super().__init__()
        
        self.critic = nn.Sequential(
            nn.Linear(doc_num + doc_emb_size, hidden),
            nn.ReLU(),
            nn.Linear(hidden, hidden),
            nn.ReLU(),
        )
        
        self.head = nn.Linear(hidden, 1)
        
        self.critic.apply(inner_fn)
        self.head.apply(outer_fn)
        
        self.doc_embs = nn.Sequential(
            nn.Embedding(doc_num, doc_emb_size),
            Normalize()
        )
        
        self.doc_num = doc_num
        self.doc_emb_size = doc_emb_size
        
    def _generate_input(self, states, proto_actions):
        return torch.cat([states, proto_actions], 1)
    
    def forward(self, states, proto_actions):
        inputs = self._generate_input(states, proto_actions)
        return self.head(self.critic(inputs))
    
    def get_topk(self, states, proto_actions, top_k=1):
        # Instead of kNN algorithm we can calculate distance across all of the objects.
        dist = torch.cdist(proto_actions, self.doc_embs[0].weight)
        indexes = torch.topk(dist, k=top_k, largest=False)[1]
        return torch.cat([self.doc_embs(index).unsqueeze(0) for index in indexes]), indexes
    
    def get_best(self, states, proto_actions, top_k=1):
        doc_embs, indexes = self.get_topk(states, proto_actions, top_k)
        top_k = doc_embs.size(1)
        best_values = torch.empty(states.size(0)).to(states.device)
        best_indexes = torch.empty(states.size(0)).to(states.device)
        for num, (state, actions, idx) in enumerate(zip(states, doc_embs, indexes)):
            new_states = state.repeat(top_k, 1)
            # for each pair of state and action we use critic to calculate values
            values = self(new_states, actions)
            best = values.max(0)[1].item()
            best_values[num] = values[best]
            best_indexes[num] = idx[best]
        return best_indexes, best_values

Training

import numpy as np
from collections import deque, namedtuple

Transition = namedtuple(
    'Transition', 
    field_names=[
        'state', 
        'action', 
        'reward',
        'done', 
        'next_state'
    ]
)

class ReplayBuffer:
    def __init__(self, capacity: int):
        self.buffer = deque(maxlen=capacity)
    
    def append(self, transition: Transition):
        self.buffer.append(transition)
    
    def sample(self, batch_size: int):
        indices = np.random.choice(
            len(self.buffer), 
            batch_size, 
            replace=batch_size > len(self.buffer)
        )
        states, actions, rewards, dones, next_states = \
            zip(*[self.buffer[idx] for idx in indices])
        return (
            np.array(states, dtype=np.float32), 
            np.array(actions, dtype=np.int64), 
            np.array(rewards, dtype=np.float32),
            np.array(dones, dtype=np.bool), 
            np.array(next_states, dtype=np.float32)
        )
    
    def __len__(self):
        return len(self.buffer)
from torch.utils.data.dataset import IterableDataset


class ReplayDataset(IterableDataset):

    def __init__(self, buffer: ReplayBuffer, epoch_size: int = int(1e3)):
        self.buffer = buffer
        self.epoch_size = epoch_size

    def __iter__(self):
        states, actions, rewards, dones, next_states = \
            self.buffer.sample(self.epoch_size)
        for i in range(len(dones)):
            yield states[i], actions[i], rewards[i], dones[i], next_states[i]
    
    def __len__(self):
        return self.epoch_size
def extract_state(env, state):
    user_space = env.observation_space.spaces["user"]
    return spaces.flatten(user_space, state["user"])

def get_action(env, actor, critic, state, top_k=10, epsilon=None):
    # Our framework is created by PG process and it must be trained with 
    # a noise added to the actor's output.
    # But in our framework it's better to sample action from the enviroment.
    state = torch.tensor(state, dtype=torch.float32).to(device).unsqueeze(0)
    if epsilon is None or random.random() < epsilon:
        proto_action = actor(state)
        action = critic.get_best(state, proto_action, top_k)[0]
        action = action.detach().cpu().numpy().astype(int)
    else:
        action = env.action_space.sample()
    return action


def generate_session(
    env, 
    actor,
    critic,
    replay_buffer=None,
    epsilon=None,
    top_k=10
):
    total_reward = 0
    s = env.reset()
    s = extract_state(env, s)

    for t in range(1000):
        a = get_action(env, actor, critic, epsilon=epsilon, state=s, top_k=top_k)
        next_s, r, done, _ = env.step(a)
        next_s = extract_state(env, next_s)

        if replay_buffer is not None:
            transition = Transition(s, a, r, done, next_s)
            replay_buffer.append(transition)

        total_reward += r
        s = next_s
        if done:
            break

    return total_reward

def generate_sessions(
    env, 
    actor,
    critic,
    replay_buffer=None,
    num_sessions=100,
    epsilon=None,
    top_k=10
):
    sessions_reward = 0
    for i_episone in range(num_sessions):
        reward = generate_session(
            env=env, 
            actor=actor,
            critic=critic,
            epsilon=epsilon,
            replay_buffer=replay_buffer,
            top_k=top_k
        )
        sessions_reward += reward
    sessions_reward /= num_sessions
    return sessions_reward

def soft_update(target, source, tau):
    """Updates the target data with smoothing by ``tau``"""
    for target_param, param in zip(target.parameters(), source.parameters()):
        target_param.data.copy_(
            target_param.data * (1.0 - tau) + param.data * tau
        )

It’s a standart GameCallback!

class RecSimCallback(dl.Callback):
    def __init__(self, order=0, session_period=1):
        super().__init__(order=0)
        self.session_period = session_period
        
    def on_stage_start(self, runner: dl.IRunner):
        generate_sessions(
            env=runner.env, 
            actor=runner.model["origin_actor"],
            critic=runner.model["origin_critic"],
            replay_buffer=runner.replay_buffer,
            top_k=runner.k,
            epsilon=runner.epsilon,
        )
        
    def on_batch_end(self, runner: dl.IRunner):
        if runner.global_batch_step % self.session_period == 0:
            session_reward = generate_session(
                env=runner.env, 
                actor=runner.model["origin_actor"],
                critic=runner.model["origin_critic"],
                replay_buffer=runner.replay_buffer,
                top_k=runner.k,
                epsilon=runner.epsilon,
            )
            runner.batch_metrics.update({"s_reward": session_reward})
            
    def on_epoch_end(self, runner: dl.IRunner):
        valid_reward = generate_sessions(
            env=runner.env, 
            actor=runner.model["origin_actor"],
            critic=runner.model["origin_critic"],
            top_k=runner.k,
            epsilon=None
        )
        runner.epoch_metrics["_epoch_"]["train_v_reward"] = valid_reward
class CustomRunner(dl.Runner):
    
    def __init__(self, *, env, replay_buffer, gamma, tau, epsilon=0.2, tau_period=1, k=5, **kwargs):
        super().__init__(**kwargs)
        self.env = env
        self.replay_buffer = replay_buffer
        self.gamma = gamma
        self.tau = tau
        self.tau_period = tau_period
        self.epsilon = epsilon
        self.k = k
    
    def on_stage_start(self, runner: dl.IRunner):
        super().on_stage_start(runner)
        soft_update(self.model["origin_actor"], self.model["target_actor"], 1.0)
        soft_update(self.model["origin_critic"], self.model["target_critic"], 1.0)

    def handle_batch(self, batch):
        # model train/valid step
        states, actions, rewards, dones, next_states = batch
        
        proto_actions = self.model["origin_actor"](states)
        policy_loss = (-self.model["origin_critic"](states, proto_actions)).mean()
        
        with torch.no_grad():
            target_proto_actions = self.model["target_actor"](next_states)
            target_values = self.model["target_critic"].get_best(next_states, target_proto_actions, self.k)[1].detach()

        dones = dones * 1.0
        expected_values = target_values * self.gamma * (1 - dones) + rewards
        actions = self.model["origin_critic"].doc_embs(actions.squeeze())
        values = self.model["origin_critic"](states, actions).squeeze()
        
        value_loss = self.criterion(
            values,
            expected_values
        )
        
        self.batch_metrics.update(
            {
                "critic_loss": value_loss, 
                "actor_loss": policy_loss,
            }
        )

        if self.is_train_loader:
            self.optimizer["actor"].zero_grad()
            policy_loss.backward()
            self.optimizer["actor"].step()
            
            self.optimizer["critic"].zero_grad()
            value_loss.backward()
            self.optimizer["critic"].step()
            
            if self.global_batch_step % self.tau_period == 0:
                soft_update(self.model["target_critic"], self.model["origin_critic"], self.tau)
                soft_update(self.model["target_actor"], self.model["origin_actor"], self.tau)

Let’s train our model and check the results.

utils.set_global_seed(42)

env = make_env()
replay_buffer = ReplayBuffer(int(1e5))
gamma = 0.99
tau = 0.001
tau_period = 1
session_period = 1
epoch_size = int(1e4)


models = {
    "origin_actor": ActorModel(doc_num=DOC_NUM, doc_emb_size=EMB_SIZE),
    "origin_critic": CriticModel(doc_num=DOC_NUM, doc_emb_size=EMB_SIZE),
    "target_actor": ActorModel(doc_num=DOC_NUM, doc_emb_size=EMB_SIZE),
    "target_critic": CriticModel(doc_num=DOC_NUM, doc_emb_size=EMB_SIZE),
}
with torch.no_grad():
  models["origin_critic"].doc_embs[0].weight.copy_(models["target_critic"].doc_embs[0].weight)

utils.set_requires_grad(models["target_actor"], requires_grad=False)
utils.set_requires_grad(models["target_critic"], requires_grad=False)

criterion = torch.nn.MSELoss()
optimizer = {
    "actor": torch.optim.Adam(models["origin_actor"].parameters(), lr=1e-3),
    "critic": torch.optim.Adam(models["origin_critic"].parameters(), lr=1e-3),
}

loaders = {
    "train": DataLoader(
        ReplayDataset(replay_buffer, epoch_size=epoch_size), 
        batch_size=32,
    ),
}


runner = CustomRunner(
    env=env, 
    replay_buffer=replay_buffer, 
    gamma=gamma, 
    tau=tau,
    tau_period=tau_period
)

runner.train(
    model=models,
    criterion=criterion,
    optimizer=optimizer,
    loaders=loaders,
    logdir="./logs_rl",
    valid_loader="_epoch_",
    valid_metric="train_v_reward",
    minimize_valid_metric=False,
    load_best_on_end=True,
    num_epochs=20,
    verbose=False,
    callbacks=[RecSimCallback(order=0, session_period=session_period)]
)
/usr/local/lib/python3.7/dist-packages/catalyst/core/runner.py:624: UserWarning: No ``ICriterionCallback/CriterionCallback`` were found while runner.criterion is not None.Do you compute the loss during ``runner.handle_batch``?
  "No ``ICriterionCallback/CriterionCallback`` were found "
train (1/20) 
* Epoch (1/20) train_v_reward: 4.45
train (2/20) 
* Epoch (2/20) train_v_reward: 5.48
train (3/20) 
* Epoch (3/20) train_v_reward: 4.35
train (4/20) 
* Epoch (4/20) train_v_reward: 4.73
train (5/20) 
* Epoch (5/20) train_v_reward: 5.52
train (6/20) 
* Epoch (6/20) train_v_reward: 4.36
train (7/20) 
* Epoch (7/20) train_v_reward: 4.88
train (8/20) 
* Epoch (8/20) train_v_reward: 4.33
train (9/20) 
* Epoch (9/20) train_v_reward: 5.01
train (10/20) 
* Epoch (10/20) train_v_reward: 4.78
train (11/20) 
* Epoch (11/20) train_v_reward: 4.9
train (12/20) 
* Epoch (12/20) train_v_reward: 5.41
train (13/20) 
* Epoch (13/20) train_v_reward: 4.63
train (14/20) 
* Epoch (14/20) train_v_reward: 4.86
train (15/20) 
* Epoch (15/20) train_v_reward: 4.38
train (16/20) 
* Epoch (16/20) train_v_reward: 4.15
train (17/20) 
* Epoch (17/20) train_v_reward: 4.45
train (18/20) 
* Epoch (18/20) train_v_reward: 4.27
train (19/20) 
* Epoch (19/20) train_v_reward: 5.27
train (20/20) 
* Epoch (20/20) train_v_reward: 5.14
Top best models:
logs_rl/checkpoints/train.5.pth	5.5200

In our case, we can compare RL bot results with the optimal recommender agent. The agent can be built by the relation matrix W. We need to chose an index with the maximum value in the column.

from recsim.agent import AbstractEpisodicRecommenderAgent

class OptimalRecommender(AbstractEpisodicRecommenderAgent):

    def __init__(self, environment, W):
        super().__init__(environment.action_space)
        self._observation_space = environment.observation_space
        self._W = W

    def step(self, reward, observation):
        return [self._W[observation["user"], :].argmax()]
def run_agent(
    env, 
    agent, 
    num_steps: int = int(1e4), 
    log_every: int = int(1e3)
):
    reward_history = []
    step, episode = 1, 1

    observation = env.reset()
    while step < num_steps:
        action = agent.begin_episode(observation)
        episode_reward = 0
        while True:
            observation, reward, done, info = env.step(action)
            episode_reward += reward

            if step % log_every == 0:
                print(step, np.mean(reward_history[-50:]))
            step += 1
            if done:
                break
            else:
                action = agent.step(reward, observation)

        agent.end_episode(reward, observation)
        reward_history.append(episode_reward)

    return reward_history
env = make_env()
agent = OptimalRecommender(env, W)

reward_history = run_agent(env, agent)
1000 8.26
2000 7.22
3000 8.18
4000 9.14
5000 8.22
6000 8.22
7000 10.76
8000 9.44
9000 8.2
10000 9.5