I finally got around to trying out a reinforcement learning exercise this weekend in an attempt to learn about the technique. One of the most interesting blog posts I read is Andrej Karpathy’s post on using reinforcement learning to play Pong on the Atari 2600. In it, Andrej uses the Gym package in Python to play the game.

This won’t be a post diving into the details of how reinforcement learning works; Andrej does that far better than I possibly could, so read the post. Instead, the purpose of this post is to provide a minor update to Andrej’s code to switch it from Python 2 to Python 3. In doing this, I went with the most convenient answer over a potentially better solution (e.g., switching `xrange()`

to `range()`

rather then re-working the code), but it does work. I also bumped up the learning rate a little bit to pick up the pace a bit.

The code is available as a GitHub Gist, which I’ve reproduced below.

import numpy as np | |

import pickle | |

import gym | |

# hyperparameters | |

H = 200 # number of hidden layer neurons | |

batch_size = 10 # after how many episodes do we do a parameter update? | |

learning_rate = 3e-4 | |

gamma = 0.99 # discount factor for reward | |

decay_rate = 0.99 # decay factor for RMSProp leaky sum of grad^2 | |

resume = False # resume from prior checkpoint? | |

render = False | |

# model initialization | |

D = 80 * 80 # input dimensionality: 80×80 grid | |

if resume: | |

model = pickle.load(open('save.p', 'rb')) | |

else: | |

model = {} | |

model['W1'] = np.random.randn(H,D) / np.sqrt(D) # "Xavier" initialization | |

model['W2'] = np.random.randn(H) / np.sqrt(H) | |

grad_buffer = { k : np.zeros_like(v) for k,v in model.items() } # update buffers that add up gradients over a batch | |

rmsprop_cache = { k : np.zeros_like(v) for k,v in model.items() } # rmsprop memory | |

def sigmoid(x): | |

return 1.0 / (1.0 + np.exp(–x)) # sigmoid "squashing" function to interval [0,1] | |

def prepro(I): | |

""" prepro 210x160x3 uint8 frame into 6400 (80×80) 1D float vector """ | |

I = I[35:195] # crop | |

I = I[::2, ::2, 0] # downsample by a factor of 2 | |

I[I == 144] = 0 # erase background (background type 1) | |

I[I == 109] = 0 # erase background (background type 2) | |

I[I != 0] = 1 # everything else (paddles, ball) just set to 1 | |

return I.astype(np.float64).ravel() | |

def discount_rewards(r): | |

""" take 1D float array of rewards and compute discounted reward """ | |

discounted_r = np.zeros_like(r) | |

running_add = 0 | |

for t in reversed(range(0, r.size)): | |

if r[t] != 0: running_add = 0 # reset the sum, since this was a game boundary (specific to Pong!) | |

running_add = running_add * gamma + r[t] | |

discounted_r[t] = running_add | |

return discounted_r | |

def policy_forward(x): | |

h = np.dot(model['W1'], x) | |

h[h<0] = 0 # ReLU nonlinearity | |

logp = np.dot(model['W2'], h) | |

p = sigmoid(logp) | |

return p,h # return probability of taking action 2, as well as hidden state | |

def policy_backward (eph, epdlogp): | |

""" backward pass. (eph is an array of intermediate hidden states) """ | |

dW2 = np.dot(eph.T, epdlogp).ravel() | |

dh = np.outer(epdlogp, model['W2']) | |

dh[eph <= 0] = 0 # backpro prelu | |

dW1 = np.dot(dh.T, epx) | |

return {'W1':dW1, 'W2':dW2} | |

env = gym.make("Pong-v0") | |

observation = env.reset() | |

prev_x = None # used in computing the difference frame | |

xs,hs,dlogps,drs = [],[],[],[] | |

running_reward = None | |

reward_sum = 0 | |

episode_number = 0 | |

while True: | |

if render: env.render() | |

# preprocess the observation, set input to network to be difference image | |

cur_x = prepro(observation) | |

x = cur_x – prev_x if prev_x is not None else np.zeros(D) | |

prev_x = cur_x | |

# forward the policy network and sample an action from the returned probability | |

aprob, h = policy_forward(x) | |

action = 2 if np.random.uniform() < aprob else 3 # roll the dice! | |

# record various intermediaries (needed later for backprop) | |

xs.append(x) # observation | |

hs.append(h) # hidden state | |

y = 1 if action == 2 else 0 # a "fake label" | |

dlogps.append(y – aprob) # grad that encourages the action that was taken to be taken | |

# step the environment and get new measurements | |

observation, reward, done, info = env.step(action) | |

reward_sum += reward | |

drs.append(reward) # record reward (has to be done after we call step() to get the reward for the previous action) | |

if done: # an episode finished | |

episode_number += 1 | |

# stack together all inputs, hidden states, action gradients, and rewards for this episode | |

epx = np.vstack(xs) | |

eph = np.vstack(hs) | |

epdlogp = np.vstack(dlogps) | |

epr = np.vstack(drs) | |

xs,hs,dlogps,drs = [],[],[],[] # reset array memory | |

# compute the discounted reward backwards through time | |

discounted_epr = discount_rewards(epr) | |

# standardize the rewards to be unit normal (helps control the gradient estimator variance) | |

discounted_epr -= np.mean(discounted_epr) | |

discounted_epr /= np.std(discounted_epr) | |

epdlogp *= discounted_epr # modulate the gradient with advantage (PG magic happens right here.) | |

grad = policy_backward(eph, epdlogp) | |

for k in model: grad_buffer[k] += grad[k] # accumulate grad over batch | |

# perform rmsprop parameter update every batch_size episodes | |

if episode_number % batch_size == 0: | |

for k,v in model.items(): | |

g = grad_buffer[k] # gradient | |

rmsprop_cache[k] = decay_rate * rmsprop_cache[k] + (1 – decay_rate) * g**2 | |

model[k] += learning_rate * g / (np.sqrt(rmsprop_cache[k]) + 1e-5) | |

grad_buffer[k] = np.zeros_like(v) # reset batch gradient buffer | |

# book-keeping work | |

running_reward = reward_sum if running_reward is None else running_reward * 0.99 + reward_sum * 0.01 | |

print('resetting env. episode reward total was %f. running mean: %f' % (reward_sum, running_reward)) | |

if episode_number % 100 == 0: pickle.dump(model, open('save.p', 'wb')) | |

reward_sum = 0 | |

observation = env.reset() # reset environment | |

prev_x = None | |

if reward != 0: # Pong has either +1 or -1 reward exactly when the game ends. | |

print('ep %d: game finished, reward: %f' % (episode_number, reward) + ('' if reward == –1 else ' !!!!!!' )) | |

After running the code for a solid weekend, I was able to build an agent which can hold its own against the CPU, though won’t dominate the game. Still, it’s nice to see an example of training a computer to perform a reasonably complex task (deflecting a ball into the opponent’s goal while preventing the same) when all you provide is a set of possible instructions on how to act (move the paddle up or down) and an indication of how you did in the prior round.