FrozenLake using Cross-Entropy¶
The next environment that we will try to solve using the cross-entropy method is FrozenLake. Its world is from the so-called grid world category, when your agent lives in a grid of size 4×4 and can move in four directions: up, down, left, and right. The agent always starts at a top-left position, and its goal is to reach the bottom-right cell of the grid. There are holes in the fixed cells of the grid and if you get into those holes, the episode ends and your reward is zero. If the agent reaches the destination cell, then it obtains a reward of 1.0 and the episode ends.
To make life more complicated, the world is slippery (it’s a frozen lake after all), so the agent’s actions do not always turn out as expected—there is a 33% chance that it will slip to the right or to the left. If you want the agent to move left, for example, there is a 33% probability that it will, indeed, move left, a 33% chance that it will end up in the cell above, and a 33% chance that it will end up in the cell below. As you will see at the end of the section, this makes progress difficult.
Imports¶
import gym
import gym.spaces
import gym.wrappers
import gym.envs.toy_text.frozen_lake
from collections import namedtuple
import numpy as np
import random
from torch.utils.tensorboard import SummaryWriter
import torch
import torch.nn as nn
import torch.optim as optim
Naive method¶
Params¶
HIDDEN_SIZE = 128
BATCH_SIZE = 16
PERCENTILE = 70
Model¶
class DiscreteOneHotWrapper(gym.ObservationWrapper):
def __init__(self, env):
super(DiscreteOneHotWrapper, self).__init__(env)
assert isinstance(env.observation_space,
gym.spaces.Discrete)
shape = (env.observation_space.n, )
self.observation_space = gym.spaces.Box(
0.0, 1.0, shape, dtype=np.float32)
def observation(self, observation):
res = np.copy(self.observation_space.low)
res[observation] = 1.0
return res
class Net(nn.Module):
def __init__(self, obs_size, hidden_size, n_actions):
super(Net, self).__init__()
self.net = nn.Sequential(
nn.Linear(obs_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, n_actions)
)
def forward(self, x):
return self.net(x)
Episode = namedtuple('Episode', field_names=['reward', 'steps'])
EpisodeStep = namedtuple('EpisodeStep', field_names=['observation', 'action'])
def iterate_batches(env, net, batch_size):
batch = []
episode_reward = 0.0
episode_steps = []
obs = env.reset()
sm = nn.Softmax(dim=1)
while True:
obs_v = torch.FloatTensor([obs])
act_probs_v = sm(net(obs_v))
act_probs = act_probs_v.data.numpy()[0]
action = np.random.choice(len(act_probs), p=act_probs)
next_obs, reward, is_done, _ = env.step(action)
episode_reward += reward
episode_steps.append(EpisodeStep(observation=obs, action=action))
if is_done:
batch.append(Episode(reward=episode_reward, steps=episode_steps))
episode_reward = 0.0
episode_steps = []
next_obs = env.reset()
if len(batch) == batch_size:
yield batch
batch = []
obs = next_obs
def filter_batch(batch, percentile):
rewards = list(map(lambda s: s.reward, batch))
reward_bound = np.percentile(rewards, percentile)
reward_mean = float(np.mean(rewards))
train_obs = []
train_act = []
for example in batch:
if example.reward < reward_bound:
continue
train_obs.extend(map(lambda step: step.observation, example.steps))
train_act.extend(map(lambda step: step.action, example.steps))
train_obs_v = torch.FloatTensor(train_obs)
train_act_v = torch.LongTensor(train_act)
return train_obs_v, train_act_v, reward_bound, reward_mean
Run¶
if __name__ == "__main__":
env = DiscreteOneHotWrapper(gym.make("FrozenLake-v0"))
# env = gym.wrappers.Monitor(env, directory="mon", force=True)
obs_size = env.observation_space.shape[0]
n_actions = env.action_space.n
net = Net(obs_size, HIDDEN_SIZE, n_actions)
objective = nn.CrossEntropyLoss()
optimizer = optim.Adam(params=net.parameters(), lr=0.01)
writer = SummaryWriter(comment="-frozenlake-naive")
for iter_no, batch in enumerate(iterate_batches(env, net, BATCH_SIZE)):
obs_v, acts_v, reward_b, reward_m = filter_batch(batch, PERCENTILE)
optimizer.zero_grad()
action_scores_v = net(obs_v)
loss_v = objective(action_scores_v, acts_v)
loss_v.backward()
optimizer.step()
print("%d: loss=%.3f, reward_mean=%.1f, reward_bound=%.1f" % (
iter_no, loss_v.item(), reward_m, reward_b))
writer.add_scalar("loss", loss_v.item(), iter_no)
writer.add_scalar("reward_bound", reward_b, iter_no)
writer.add_scalar("reward_mean", reward_m, iter_no)
if reward_m > 0.8:
print("Solved!")
break
writer.close()
Tweaked method¶
Params¶
HIDDEN_SIZE = 128
BATCH_SIZE = 100
PERCENTILE = 30
GAMMA = 0.9
Model¶
class DiscreteOneHotWrapper(gym.ObservationWrapper):
def __init__(self, env):
super(DiscreteOneHotWrapper, self).__init__(env)
assert isinstance(env.observation_space, gym.spaces.Discrete)
self.observation_space = gym.spaces.Box(0.0, 1.0, (env.observation_space.n, ), dtype=np.float32)
def observation(self, observation):
res = np.copy(self.observation_space.low)
res[observation] = 1.0
return res
class Net(nn.Module):
def __init__(self, obs_size, hidden_size, n_actions):
super(Net, self).__init__()
self.net = nn.Sequential(
nn.Linear(obs_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, n_actions)
)
def forward(self, x):
return self.net(x)
Episode = namedtuple('Episode', field_names=['reward', 'steps'])
EpisodeStep = namedtuple('EpisodeStep', field_names=['observation', 'action'])
def iterate_batches(env, net, batch_size):
batch = []
episode_reward = 0.0
episode_steps = []
obs = env.reset()
sm = nn.Softmax(dim=1)
while True:
obs_v = torch.FloatTensor([obs])
act_probs_v = sm(net(obs_v))
act_probs = act_probs_v.data.numpy()[0]
action = np.random.choice(len(act_probs), p=act_probs)
next_obs, reward, is_done, _ = env.step(action)
episode_reward += reward
episode_steps.append(EpisodeStep(observation=obs, action=action))
if is_done:
batch.append(Episode(reward=episode_reward, steps=episode_steps))
episode_reward = 0.0
episode_steps = []
next_obs = env.reset()
if len(batch) == batch_size:
yield batch
batch = []
obs = next_obs
def filter_batch(batch, percentile):
filter_fun = lambda s: s.reward * (GAMMA ** len(s.steps))
disc_rewards = list(map(filter_fun, batch))
reward_bound = np.percentile(disc_rewards, percentile)
train_obs = []
train_act = []
elite_batch = []
for example, discounted_reward in zip(batch, disc_rewards):
if discounted_reward > reward_bound:
train_obs.extend(map(lambda step: step.observation,
example.steps))
train_act.extend(map(lambda step: step.action,
example.steps))
elite_batch.append(example)
return elite_batch, train_obs, train_act, reward_bound
Run¶
if __name__ == "__main__":
random.seed(12345)
env = DiscreteOneHotWrapper(gym.make("FrozenLake-v0"))
# env = gym.wrappers.Monitor(env, directory="mon", force=True)
obs_size = env.observation_space.shape[0]
n_actions = env.action_space.n
net = Net(obs_size, HIDDEN_SIZE, n_actions)
objective = nn.CrossEntropyLoss()
optimizer = optim.Adam(params=net.parameters(), lr=0.001)
writer = SummaryWriter(comment="-frozenlake-tweaked")
full_batch = []
for iter_no, batch in enumerate(iterate_batches(
env, net, BATCH_SIZE)):
reward_mean = float(np.mean(list(map(
lambda s: s.reward, batch))))
full_batch, obs, acts, reward_bound = \
filter_batch(full_batch + batch, PERCENTILE)
if not full_batch:
continue
obs_v = torch.FloatTensor(obs)
acts_v = torch.LongTensor(acts)
full_batch = full_batch[-500:]
optimizer.zero_grad()
action_scores_v = net(obs_v)
loss_v = objective(action_scores_v, acts_v)
loss_v.backward()
optimizer.step()
print("%d: loss=%.3f, rw_mean=%.3f, "
"rw_bound=%.3f, batch=%d" % (
iter_no, loss_v.item(), reward_mean,
reward_bound, len(full_batch)))
writer.add_scalar("loss", loss_v.item(), iter_no)
writer.add_scalar("reward_mean", reward_mean, iter_no)
writer.add_scalar("reward_bound", reward_bound, iter_no)
if reward_mean > 0.8:
print("Solved!")
break
writer.close()
Non-slippery method¶
Params¶
HIDDEN_SIZE = 128
BATCH_SIZE = 100
PERCENTILE = 30
GAMMA = 0.9
Model¶
class DiscreteOneHotWrapper(gym.ObservationWrapper):
def __init__(self, env):
super(DiscreteOneHotWrapper, self).__init__(env)
assert isinstance(env.observation_space, gym.spaces.Discrete)
self.observation_space = gym.spaces.Box(0.0, 1.0, (env.observation_space.n, ), dtype=np.float32)
def observation(self, observation):
res = np.copy(self.observation_space.low)
res[observation] = 1.0
return res
class Net(nn.Module):
def __init__(self, obs_size, hidden_size, n_actions):
super(Net, self).__init__()
self.net = nn.Sequential(
nn.Linear(obs_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, n_actions)
)
def forward(self, x):
return self.net(x)
Episode = namedtuple('Episode', field_names=['reward', 'steps'])
EpisodeStep = namedtuple('EpisodeStep', field_names=['observation', 'action'])
def iterate_batches(env, net, batch_size):
batch = []
episode_reward = 0.0
episode_steps = []
obs = env.reset()
sm = nn.Softmax(dim=1)
while True:
obs_v = torch.FloatTensor([obs])
act_probs_v = sm(net(obs_v))
act_probs = act_probs_v.data.numpy()[0]
action = np.random.choice(len(act_probs), p=act_probs)
next_obs, reward, is_done, _ = env.step(action)
episode_reward += reward
episode_steps.append(EpisodeStep(observation=obs, action=action))
if is_done:
batch.append(Episode(reward=episode_reward, steps=episode_steps))
episode_reward = 0.0
episode_steps = []
next_obs = env.reset()
if len(batch) == batch_size:
yield batch
batch = []
obs = next_obs
def filter_batch(batch, percentile):
disc_rewards = list(map(lambda s: s.reward * (GAMMA ** len(s.steps)), batch))
reward_bound = np.percentile(disc_rewards, percentile)
train_obs = []
train_act = []
elite_batch = []
for example, discounted_reward in zip(batch, disc_rewards):
if discounted_reward > reward_bound:
train_obs.extend(map(lambda step: step.observation, example.steps))
train_act.extend(map(lambda step: step.action, example.steps))
elite_batch.append(example)
return elite_batch, train_obs, train_act, reward_bound
Run¶
if __name__ == "__main__":
random.seed(12345)
env = gym.envs.toy_text.frozen_lake.FrozenLakeEnv(
is_slippery=False)
env.spec = gym.spec("FrozenLake-v0")
env = gym.wrappers.TimeLimit(env, max_episode_steps=100)
env = DiscreteOneHotWrapper(env)
# env = gym.wrappers.Monitor(env, directory="mon", force=True)
obs_size = env.observation_space.shape[0]
n_actions = env.action_space.n
net = Net(obs_size, HIDDEN_SIZE, n_actions)
objective = nn.CrossEntropyLoss()
optimizer = optim.Adam(params=net.parameters(), lr=0.001)
writer = SummaryWriter(comment="-frozenlake-nonslippery")
full_batch = []
for iter_no, batch in enumerate(iterate_batches(env, net, BATCH_SIZE)):
reward_mean = float(np.mean(list(map(lambda s: s.reward, batch))))
full_batch, obs, acts, reward_bound = filter_batch(full_batch + batch, PERCENTILE)
if not full_batch:
continue
obs_v = torch.FloatTensor(obs)
acts_v = torch.LongTensor(acts)
full_batch = full_batch[-500:]
optimizer.zero_grad()
action_scores_v = net(obs_v)
loss_v = objective(action_scores_v, acts_v)
loss_v.backward()
optimizer.step()
print("%d: loss=%.3f, reward_mean=%.3f, reward_bound=%.3f, batch=%d" % (
iter_no, loss_v.item(), reward_mean, reward_bound, len(full_batch)))
writer.add_scalar("loss", loss_v.item(), iter_no)
writer.add_scalar("reward_mean", reward_mean, iter_no)
writer.add_scalar("reward_bound", reward_bound, iter_no)
if reward_mean > 0.8:
print("Solved!")
break
writer.close()
Visualization¶
%load_ext tensorboard
%tensorboard --logdir runs