-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathQ_learning.py
More file actions
145 lines (106 loc) · 4.1 KB
/
Q_learning.py
File metadata and controls
145 lines (106 loc) · 4.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Written and commented by: R.Ma.
Ver 0.1 20/02/2023
Ver 0.2 22/02/2023
Ver 0.9 24/02/2023
"""
import numpy as np
from Environment import StochasticWindyGridworld
from Helper import softmax, argmax
from Helper import linear_anneal
class QLearningAgent:
def __init__(self, n_states, n_actions, learning_rate, gamma):
self.n_states = n_states
self.n_actions = n_actions
self.learning_rate = learning_rate
self.gamma = gamma
self.Q_sa = np.zeros((n_states,n_actions))
def select_action(self, s, policy='egreedy', epsilon=None, temp=None):
if policy == 'egreedy':
if epsilon is None:
raise KeyError("Provide an epsilon")
if np.random.rand() < epsilon:
a = np.random.randint(0,self.n_actions)
else:
a = argmax(self.Q_sa[s,:])
elif policy == 'softmax':
if temp is None:
raise KeyError("Provide a temperature")
a = np.random.choice(self.n_actions, p=softmax(self.Q_sa[s,:], temp))
return a
def update(self,s,a,r,s_next,done):
self.Q_sa[s,a] = self.Q_sa[s,a] + self.learning_rate * (r + self.gamma * np.max(self.Q_sa[s_next,:]) - self.Q_sa[s,a])
pass
def q_learning(n_timesteps, learning_rate, gamma, policy='egreedy', epsilon=None, temp=None, plot=True):
''' runs a single repetition of q_learning
Return: rewards, a vector with the observed rewards at each timestep '''
rewards = []
env = StochasticWindyGridworld(initialize_model=False)
agent = QLearningAgent(env.n_states, env.n_actions, learning_rate, gamma)
agent.Q_sa = np.zeros((env.n_states, env.n_actions))
t = 0
s_start = env.reset()
s = s_start
while t < n_timesteps:
done = False
a = agent.select_action(s, policy, epsilon, temp)
s_next, r, done = env.step(a)
agent.update(s, a, r, s_next, done)
if done:
s = env.reset()
else:
s = s_next
rewards.append(r)
# print("timestep: {}, reward: {}".format(t, rewards[t]))
t += 1
if plot:
env.render(Q_sa=agent.Q_sa, plot_optimal_policy=True, step_pause=3)
return rewards
# The following function is used to test the effect of parameter annealing. To run this, follow the instructions in the experiment.py
def q_learning_with_annealing(n_timesteps, learning_rate, gamma, policy='egreedy', epsilon=None, temp=None, plot=True, percent=0.5):
''' runs a single repetition of q_learning
Return: rewards, a vector with the observed rewards at each timestep '''
rewards = []
env = StochasticWindyGridworld(initialize_model=False)
agent = QLearningAgent(env.n_states, env.n_actions, learning_rate, gamma)
agent.Q_sa = np.zeros((env.n_states, env.n_actions))
t = 0
s_start = env.reset()
s = s_start
while t < n_timesteps:
done = False
epsilon = linear_anneal(t, n_timesteps, 1.0, 0.02, percent)
# print(epsilon)
temp = linear_anneal(t, n_timesteps, 1.0, 0.01, percent)
a = agent.select_action(s, policy, epsilon, temp)
s_next, r, done = env.step(a)
agent.update(s, a, r, s_next, done)
if done:
s = env.reset()
else:
s = s_next
rewards.append(r)
# print("timestep: {}, reward: {}".format(t, rewards[t]))
t += 1
if plot:
env.render(Q_sa=agent.Q_sa, plot_optimal_policy=True, step_pause=3)
return rewards
def test():
# show the rewards in different time steps
n_timesteps = 50000
gamma = 1.0
learning_rate = 0.01 #origianl 0.1
# Exploration
policy = 'egreedy' # 'egreedy' or 'softmax'
epsilon = 0.1
temp = 1.0
# Plotting parameters
plot = True
rewards = q_learning(n_timesteps, learning_rate, gamma, policy, epsilon, temp, plot)
#print("Obtained rewards: {}".format(rewards))
print("Average reward: {}".format(np.mean(rewards)))
print(len(rewards))
if __name__ == '__main__':
test()