-
Notifications
You must be signed in to change notification settings - Fork 640
/
r_learning_python.py
132 lines (124 loc) · 4.32 KB
/
r_learning_python.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import gym
import numpy as np
from keras.models import Sequential
from keras.layers import Dense, InputLayer
import matplotlib.pylab as plt
env = gym.make('NChain-v0')
def naive_sum_reward_agent(env, num_episodes=500):
# this is the table that will hold our summated rewards for
# each action in each state
r_table = np.zeros((5, 2))
for g in range(num_episodes):
s = env.reset()
done = False
while not done:
if np.sum(r_table[s, :]) == 0:
# make a random selection of actions
a = np.random.randint(0, 2)
else:
# select the action with highest cummulative reward
a = np.argmax(r_table[s, :])
new_s, r, done, _ = env.step(a)
r_table[s, a] += r
s = new_s
return r_table
def q_learning_with_table(env, num_episodes=500):
q_table = np.zeros((5, 2))
y = 0.95
lr = 0.8
for i in range(num_episodes):
s = env.reset()
done = False
while not done:
if np.sum(q_table[s,:]) == 0:
# make a random selection of actions
a = np.random.randint(0, 2)
else:
# select the action with largest q value in state s
a = np.argmax(q_table[s, :])
new_s, r, done, _ = env.step(a)
q_table[s, a] += r + lr*(y*np.max(q_table[new_s, :]) - q_table[s, a])
s = new_s
return q_table
def eps_greedy_q_learning_with_table(env, num_episodes=500):
q_table = np.zeros((5, 2))
y = 0.95
eps = 0.5
lr = 0.8
decay_factor = 0.999
for i in range(num_episodes):
s = env.reset()
eps *= decay_factor
done = False
while not done:
if np.random.random() < eps or np.sum(q_table[s, :]) == 0:
a = np.random.randint(0, 2)
else:
a = np.argmax(q_table[s, :])
# pdb.set_trace()
new_s, r, done, _ = env.step(a)
q_table[s, a] += r + lr * (y * np.max(q_table[new_s, :]) - q_table[s, a])
s = new_s
return q_table
def test_methods(env, num_iterations=100):
winner = np.zeros((3,))
for g in range(num_iterations):
m0_table = naive_sum_reward_agent(env, 500)
m1_table = q_learning_with_table(env, 500)
m2_table = eps_greedy_q_learning_with_table(env, 500)
m0 = run_game(m0_table, env)
m1 = run_game(m1_table, env)
m2 = run_game(m2_table, env)
w = np.argmax(np.array([m0, m1, m2]))
winner[w] += 1
print("Game {} of {}".format(g + 1, num_iterations))
return winner
def run_game(table, env):
s = env.reset()
tot_reward = 0
done = False
while not done:
a = np.argmax(table[s, :])
s, r, done, _ = env.step(a)
tot_reward += r
return tot_reward
def q_learning_keras(env, num_episodes=1000):
# create the keras model
model = Sequential()
model.add(InputLayer(batch_input_shape=(1, 5)))
model.add(Dense(10, activation='sigmoid'))
model.add(Dense(2, activation='linear'))
model.compile(loss='mse', optimizer='adam', metrics=['mae'])
# now execute the q learning
y = 0.95
eps = 0.5
decay_factor = 0.999
r_avg_list = []
for i in range(num_episodes):
s = env.reset()
eps *= decay_factor
if i % 100 == 0:
print("Episode {} of {}".format(i + 1, num_episodes))
done = False
r_sum = 0
while not done:
if np.random.random() < eps:
a = np.random.randint(0, 2)
else:
a = np.argmax(model.predict(np.identity(5)[s:s + 1]))
new_s, r, done, _ = env.step(a)
target = r + y * np.max(model.predict(np.identity(5)[new_s:new_s + 1]))
target_vec = model.predict(np.identity(5)[s:s + 1])[0]
target_vec[a] = target
model.fit(np.identity(5)[s:s + 1], target_vec.reshape(-1, 2), epochs=1, verbose=0)
s = new_s
r_sum += r
r_avg_list.append(r_sum / 1000)
plt.plot(r_avg_list)
plt.ylabel('Average reward per game')
plt.xlabel('Number of games')
plt.show()
for i in range(5):
print("State {} - action {}".format(i, model.predict(np.identity(5)[i:i + 1])))
if __name__ == "__main__":
q_learning_keras(env)