Reinforcement Learning (Part 5)
This is a tutorial to create your own Q-learning environment.
We need opencv for this, so make sure to ‘pip install’ it. We also need a python imaging library, called pillow, so also install that before starting.
💢 Intro
This environment is like a snakes game with a slight modification. There is a player, represented as a blue blob, an enemy, represented as a red blob, and food, represented as a green blob. When the environment renders, we will see all these 3 blobs on a black background. When we start training, we will start to see results. Result analysis :
- If red and blue blob remain on sceen, means our agent got to the food
- If green and red blob remain, means our agent got to the enemy
🎨 Interesting thing to note : We will restrict our agent’s movement to diagonals only, so it can’t move up or down in a straight line. But, it will learn on its own, to use the boundaries of the grid (edges of the render window), even though it doesn’t know about it, to correct its direction and go to the required place.
1
2
3
4
5
6
7
8
9
import numpy as np
from PIL import Image as Img
import cv2
import matplotlib.pyplot as plt
import pickle
from matplotlib import style
import time
style.use("ggplot")
💢 Constants
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
SIZE = 10 # make a 10x10 grid
HM_EPISODES = 25000 # number of episodes
MOVE_PENALTY = 1
ENEMY_PENALTY = 300
FOOD_REWARD = 25
epsilon = 0.9
EPS_DECAY = 0.9998
SHOW_EVERY = 500
LEARNING_RATE = 0.1
DISCOUNT = 0.95
PLAYER_N = 1
FOOD_N = 2
ENEMY_N = 3
💢 Add Q-table
We can add an existing Q-table by writing the full path in place of ‘None’ :
1
2
3
4
5
6
7
start_q_table = None
# Defining colour for our objects. This is in BGR format.
d = {1: (255, 175, 0),
2: (0, 255, 0),
3: (0, 0, 255)}
💢 Blob class
Now we need a blob class. All the blobs have a lot of same attributes, like, movement directions, starting locations, etc. We are defining observation in our environment as the relative location of food, and the relative location of the enemy. So, we need to create a blob class to handle all of the repeated things with ease.
Few things to note here that we want to avoid:
- The player could spawn on the food;
- Player could spawn on enemy;
- Enemy could spawn on the food.
We want to avoid all the above scenarios. But since we are making a very simple environment here, we won’t worry about that for now.
1
2
3
4
5
6
7
8
9
10
class Blob:
def __init__(self):
self.x = np.random.randint(0, SIZE)
self.y = np.random.randint(0, SIZE)
def __str__(self):
return f"{self.x}, {self.y}"
def __sub__(self, other):
return (self.x - other.x, self.y - other.y)
We need to make a method that interacts with other methods in case we want to add more players. Note, the moves we are adding here only allow diagonal movement. To make a blob go up-down, or, side-to-side, add the codes for only x and only y.
1
2
3
4
5
6
7
8
9
def action(self, choice):
if choice == 0:
self.move(x = 1, y = 1)
elif choice == 1:
self.move(x = -1, y = -1)
elif choice == 2:
self.move(x = -1, y = 1)
elif choice == 3:
self.move(x = 1, y = -1)
💢 Random movements
We should be able to move randomly if ‘not’ value is passed, or very specifically otherwise. This can attain the value 0, hence random movement can be up-down.
1
2
3
4
5
6
7
8
9
def move(self, x = False, y = False):
if not x:
self.x += np.random.randint(-1, 2)
else:
self.x += x
if not y:
self.y += np.random.randint(-1, 2)
else:
self.y += y
Since we are creating a 10 x 10 grid, we need to create movement boundaries for our agent.
1
2
3
4
5
6
7
8
9
if self.x < 0:
self.x = 0
elif self.x > SIZE - 1:
self.x = SIZE - 1
if self.y < 0:
self.y = 0
elif self.y > SIZE - 1:
self.y = SIZE - 1
💢 Creating a Q-table
We are finished with our blob class. Now, we either want to create a Q-table, or load an existing one.
Also, our observation space will look like :
(x1, y1), (x2, y2)
So, to iterate through every possibility, we need 4 nested FOR loops.
1
2
3
4
5
6
7
8
9
10
11
if start_q_table is None:
q_table = {}
for x1 in range(-SIZE + 1, SIZE):
for y1 in range(-SIZE + 1, SIZE):
for x2 in range(-SIZE + 1, SIZE):
for y2 in range(-SIZE + 1, SIZE):
q_table[((x1, y1), (x2, y2))] = [np.random.uniform(-5, 0) for i in range(4)]
else:
with open(start_q_table, "rb") as f:
q_table = pickle.load(f)
💢 Training the model
We shall begin our training now.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
episode_rewards = []
for episode in range(HM_EPISODES):
player = Blob()
food = Blob()
enemy = Blob()
if episode % SHOW_EVERY == 0:
print(f"on # {episode}, epsilon : {epsilon}")
print(f"{SHOW_EVERY} ep mean {np.mean(episode_rewards[-SHOW_EVERY:])}")
show = True
else:
show = False
episode_reward = 0
for i in range(200):
obs = (player - food, player - enemy) # Operator overloading
if np.random.random() > epsilon:
action = np.argmax(q_table[obs])
else:
action = np.random.randint(0, 4)
player.action(action)
Later, we might wanna make the enemy and the food move. For that, use the commands below :
1
2
enemy.move()
food.move()
But for training purposes now, it is better to not let them move initially in order to keep things simple.
We need to assign the rewards to actions :
1
2
3
4
5
6
if player.x == enemy.x and player.y == enemy.y:
reward = -ENEMY_PENALTY
elif player.x == food.x and player.y == food.y:
reward = FOOD_REWARD
else:
reward = -MOVE_PENALTY
To make our Q function, we need to be able to make a new observation based on the movement :
1
2
3
new_obs = (player - food, player - enemy)
max_future_q = np.max(q_table[new_obs])
current_q = q_table[obs][action]
Calculating our Q-function :
1
2
3
4
5
6
7
8
if reward == FOOD_REWARD:
new_q = FOOD_REWARD
elif reward == -ENEMY_PENALTY:
new_q = -ENEMY_PENALTY
else:
new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q)
q_table[obs][action] = new_q
The Q-learning part is over. We now want to see the environment and track the metrics. Initially, we get a black environment because we initialize with all zeros :
1
2
if show:
env = np.zeros((SIZE, SIZE, 3), dtype = np.uint8)
If we want to add multiple foods, the dictionary method will help :
1
2
3
4
env[food.y][food.x] = d[FOOD_N]
env[player.y][player.x] = d[PLAYER_N]
env[enemy.y][enemy.x] = d[ENEMY_N]
💢 Converting grid to image
We get a coloured grid, but it is still just a 10 x 10 grid. We want to make it an image now.
Even though we define ‘RGB’ below, the image is still defined as ‘BGR’, this is the reason we defined colours in the ‘BGR’ format in the beginning :
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
img = Img.fromarray(env, "RGB")
img = img.resize((300, 300))
cv2.imshow("", np.array(img))
if reward == FOOD_REWARD or reward == -ENEMY_PENALTY:
if cv2.waitKey(500) & 0xFF == ord("q"):
break
else:
if cv2.waitKey(1) & 0xFF == ord("q"):
break
episode_reward += reward
if reward == FOOD_REWARD or reward == -ENEMY_PENALTY:
break
episode_rewards.append(episode_reward)
epsilon *= EPS_DECAY
moving_avg = np.convolve(episode_rewards, np.ones((SHOW_EVERY, )) / SHOW_EVERY, mode = "valid")
plt.plot([i for i in range(len(moving_avg))], moving_avg)
plt.ylabel(f"Reward {SHOW_EVERY} mov_avg")
plt.xlabel("Episode")
plt.show()
Saving our Q-table :
1
2
with open(f"qt-{int(time.time())}.pickle", "wb") as f:
pickle.dump(q_table, f)
💢 Entire code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import numpy as np
from PIL import Image as Img
import cv2
import matplotlib.pyplot as plt
import pickle
from matplotlib import style
import time
style.use("ggplot")
SIZE = 10
HM_EPISODES = 25000
MOVE_PENALTY = 1
ENEMY_PENALTY = 300
FOOD_REWARD = 25
epsilon = 0.9
EPS_DECAY = 0.9998
SHOW_EVERY = 500
LEARNING_RATE = 0.1
DISCOUNT = 0.95
PLAYER_N = 1
FOOD_N = 2
ENEMY_N = 3
start_q_table = None # We can add an existing Q-table by writing the full path here
d = {1: (255, 175, 0),
2: (0, 255, 0),
3: (0, 0, 255)}
class Blob:
def __init__(self):
self.x = np.random.randint(0, SIZE)
self.y = np.random.randint(0, SIZE)
def __str__(self):
return f"{self.x}, {self.y}"
def __sub__(self, other):
return (self.x - other.x, self.y - other.y)
def action(self, choice):
if choice == 0:
self.move(x = 1, y = 1)
elif choice == 1:
self.move(x = -1, y = -1)
elif choice == 2:
self.move(x = -1, y = 1)
elif choice == 3:
self.move(x = 1, y = -1)
def move(self, x = False, y = False):
if not x:
self.x += np.random.randint(-1, 2)
else:
self.x += x
if not y:
self.y += np.random.randint(-1, 2)
else:
self.y += y
if self.x < 0:
self.x = 0
elif self.x > SIZE - 1:
self.x = SIZE - 1
if self.y < 0:
self.y = 0
elif self.y > SIZE - 1:
self.y = SIZE - 1
if start_q_table is None:
q_table = {}
for x1 in range(-SIZE + 1, SIZE):
for y1 in range(-SIZE + 1, SIZE):
for x2 in range(-SIZE + 1, SIZE):
for y2 in range(-SIZE + 1, SIZE):
q_table[((x1, y1), (x2, y2))] = [np.random.uniform(-5, 0) for i in range(4)]
else:
with open(start_q_table, "rb") as f:
q_table = pickle.load(f)
episode_rewards = []
for episode in range(HM_EPISODES):
player = Blob()
food = Blob()
enemy = Blob()
if episode % SHOW_EVERY == 0:
print(f"on # {episode}, epsilon : {epsilon}")
print(f"{SHOW_EVERY} ep mean {np.mean(episode_rewards[-SHOW_EVERY:])}")
show = True
else:
show = False
episode_reward = 0
for i in range(200):
obs = (player - food, player - enemy) # Operator overloading
if np.random.random() > epsilon:
action = np.argmax(q_table[obs])
else:
action = np.random.randint(0, 4)
player.action(action)
if player.x == enemy.x and player.y == enemy.y:
reward = -ENEMY_PENALTY
elif player.x == food.x and player.y == food.y:
reward = FOOD_REWARD
else:
reward = -MOVE_PENALTY
new_obs = (player - food, player - enemy)
max_future_q = np.max(q_table[new_obs])
current_q = q_table[obs][action]
# Now we are ready to calculate our Q-function
if reward == FOOD_REWARD:
new_q = FOOD_REWARD
elif reward == -ENEMY_PENALTY:
new_q = -ENEMY_PENALTY
else:
new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q)
q_table[obs][action] = new_q
if show:
env = np.zeros((SIZE, SIZE, 3), dtype = np.uint8)
env[food.y][food.x] = d[FOOD_N]
env[player.y][player.x] = d[PLAYER_N]
env[enemy.y][enemy.x] = d[ENEMY_N]
img = Img.fromarray(env, "RGB")
img = img.resize((300, 300))
cv2.imshow("", np.array(img))
if reward == FOOD_REWARD or reward == -ENEMY_PENALTY:
if cv2.waitKey(500) & 0xFF == ord("q"):
break
else:
if cv2.waitKey(1) & 0xFF == ord("q"):
break
episode_reward += reward
if reward == FOOD_REWARD or reward == -ENEMY_PENALTY:
break
episode_rewards.append(episode_reward)
epsilon *= EPS_DECAY
moving_avg = np.convolve(episode_rewards, np.ones((SHOW_EVERY, )) / SHOW_EVERY, mode = "valid")
plt.plot([i for i in range(len(moving_avg))], moving_avg)
plt.ylabel(f"Reward {SHOW_EVERY} mov_avg")
plt.xlabel("Episode")
plt.show()
with open(f"qt-{int(time.time())}.pickle", "wb") as f:
pickle.dump(q_table, f)