Post

Reinforcement Learning (Part 4)

Reinforcement Learning (Part 4)

In the previous program, our agent achieved its target. Now, we want to measure the metrics of its achievements. This will help us select the best Q-table for training a model under given circumstances.

In this program, we measure all the important metrics like episode number, trailing average, and finally plot a graph using matplotlib.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
    import gym
    import numpy as np
    import matplotlib.pyplot as plt

    env = gym.make("MountainCar-v0")

    LEARNING_RATE = 0.1
    DISCOUNT = 0.95
    EPISODES = 2000

    SHOW_EVERY = 500

    DISCRETE_OS_SIZE = [20] * len(env.observation_space.high)
    discrete_os_win_size = (env.observation_space.high-env.observation_space.low) / DISCRETE_OS_SIZE

    epsilon = 0.5
    START_EPSILON_DECAYING = 1
    END_EPSILON_DECAYING = EPISODES // 2

    epsilon_decay_value = epsilon / (END_EPSILON_DECAYING - START_EPSILON_DECAYING)

    q_table = np.random.uniform(low = -2, high = 0, size = (DISCRETE_OS_SIZE + [env.action_space.n]))

💢 Metrics

Now, we want to track all the metrics over time. How can we do that? Well, it’s a simple programming task. We need to create some lists, and dictionaries, and then we will be good to go.

We have to track episode number, trailing average, worst and best model for every ‘SHOW_EVERY’.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
    ep_rewards = []
    aggr_ep_rewards = {'ep' : [], 'avg' : [], 'min' : [], 'max' : []}

    def get_discrete_state(state):
        discrete_state = (state - env.observation_space.low) / discrete_os_win_size

        return tuple(discrete_state.astype(np.int))

    for episode in range(EPISODES):
        episode_reward = 0
        if episode % SHOW_EVERY == 0:
            print(episode)
            render = True
        else:
            render = False

        discrete_state = get_discrete_state(env.reset())

        done = False

        while not done:
            if np.random.random() > epsilon:
                action = np.argmax(q_table[discrete_state])
            else:
                action = np.random.randint(0, env.action_space.n)
            new_state, reward, done, _ = env.step(action)
            episode_reward += reward
            new_discrete_state = get_discrete_state(new_state)
            if render:
                env.render()

            if not done:
                max_future_q = np.max(q_table[new_discrete_state])

                current_q = q_table[discrete_state + (action, )]

                new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q)

                q_table[discrete_state + (action, )] = new_q

            elif new_state[0] >= env.goal_position:
                print(f"We made it on episode  {episode}")
                q_table[discrete_state + (action, )] = 0

            discrete_state = new_discrete_state

        if END_EPSILON_DECAYING >= episode >= START_EPSILON_DECAYING:
            epsilon -= epsilon_decay_value

        ep_rewards.append(episode_reward)

To save the Q-tables, create a qtables directory, and use the following command. It will save the Q-table of every render.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
        if not episode % SHOW_EVERY:
            # np.save(f"qtables/{episode}-qtable.npy", q_table)

            average_reward = sum(ep_rewards[-SHOW_EVERY:]) / len(ep_rewards[-SHOW_EVERY:])
            aggr_ep_rewards['ep'].append(episode)
            aggr_ep_rewards['avg'].append(average_reward)
            aggr_ep_rewards['min'].append(min(ep_rewards[-SHOW_EVERY:]))
            aggr_ep_rewards['max'].append(max(ep_rewards[-SHOW_EVERY:]))

            print(f"Episode : {episode} Average : {average_reward} Min : {min(ep_rewards[-SHOW_EVERY:])} Max : {max(ep_rewards[-SHOW_EVERY:])}")

    env.close()

    plt.plot(aggr_ep_rewards['ep'], aggr_ep_rewards['avg'], label='avg')
    plt.plot(aggr_ep_rewards['ep'], aggr_ep_rewards['min'], label='min')
    plt.plot(aggr_ep_rewards['ep'], aggr_ep_rewards['max'], label='max')
    plt.legend(loc=4)
    plt.show()

💢 Entire code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import gym
import numpy as np
import matplotlib.pyplot as plt

env = gym.make("MountainCar-v0")

LEARNING_RATE = 0.1
DISCOUNT = 0.95
EPISODES = 2000

SHOW_EVERY = 500

DISCRETE_OS_SIZE = [20] * len(env.observation_space.high)
discrete_os_win_size = (env.observation_space.high-env.observation_space.low) / DISCRETE_OS_SIZE

epsilon = 0.5
START_EPSILON_DECAYING = 1
END_EPSILON_DECAYING = EPISODES // 2

epsilon_decay_value = epsilon / (END_EPSILON_DECAYING - START_EPSILON_DECAYING)

q_table = np.random.uniform(low = -2, high = 0, size = (DISCRETE_OS_SIZE + [env.action_space.n]))

ep_rewards = []
aggr_ep_rewards = {'ep' : [], 'avg' : [], 'min' : [], 'max' : []}       # track episode number, trailing average, worst model for
                                                                        # every 'SHOW_EVERY', and best model for it too

def get_discrete_state(state):
    discrete_state = (state - env.observation_space.low) / discrete_os_win_size

    return tuple(discrete_state.astype(np.int))

for episode in range(EPISODES):
    episode_reward = 0
    if episode % SHOW_EVERY == 0:
        print(episode)
        render = True
    else:
        render = False

    discrete_state = get_discrete_state(env.reset())

    done = False

    while not done:
        if np.random.random() > epsilon:
            action = np.argmax(q_table[discrete_state])
        else:
            action = np.random.randint(0, env.action_space.n)
        new_state, reward, done, _ = env.step(action)
        episode_reward += reward
        new_discrete_state = get_discrete_state(new_state)
        if render:
            env.render()

        if not done:
            max_future_q = np.max(q_table[new_discrete_state])

            current_q = q_table[discrete_state + (action, )]

            new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q)

            q_table[discrete_state + (action, )] = new_q

        elif new_state[0] >= env.goal_position:
            print(f"We made it on episode  {episode}")
            q_table[discrete_state + (action, )] = 0

        discrete_state = new_discrete_state

    if END_EPSILON_DECAYING >= episode >= START_EPSILON_DECAYING:
        epsilon -= epsilon_decay_value

    ep_rewards.append(episode_reward)

    if not episode % SHOW_EVERY:
        # np.save(f"qtables/{episode}-qtable.npy", q_table)
        average_reward = sum(ep_rewards[-SHOW_EVERY:]) / len(ep_rewards[-SHOW_EVERY:])
        aggr_ep_rewards['ep'].append(episode)
        aggr_ep_rewards['avg'].append(average_reward)
        aggr_ep_rewards['min'].append(min(ep_rewards[-SHOW_EVERY:]))
        aggr_ep_rewards['max'].append(max(ep_rewards[-SHOW_EVERY:]))

        print(f"Episode : {episode} Average : {average_reward} Min : {min(ep_rewards[-SHOW_EVERY:])} Max : {max(ep_rewards[-SHOW_EVERY:])}")

env.close()

plt.plot(aggr_ep_rewards['ep'], aggr_ep_rewards['avg'], label='avg')
plt.plot(aggr_ep_rewards['ep'], aggr_ep_rewards['min'], label='min')
plt.plot(aggr_ep_rewards['ep'], aggr_ep_rewards['max'], label='max')
plt.legend(loc=4)
plt.show()
This post is licensed under CC BY 4.0 by the author.