Post

Reinforcement Learning (Part 6)

Reinforcement Learning (Part 6)

Introduction to deep Q-learning.

Deep Q-learning is the combination of deep learning and reinforcement learning. Here, we create a RL model with a neural network similar to that of in deep learning. This program only creates the model. We will train the model in the next program.

1
2
3
4
5
6
7
    from keras.models import Sequential
    from keras.layers import Dense, Dropout, Conv2D, MaxPooling2D, Activation, Flatten
    from keras.callbacks import TensorBoard
    from keras.optimizers import Adam
    from collections import deque
    import time
    import numpy as np

💢 Batch

self.model is getting a .fit() every single step and that too of value 1. And we train neural networks with a batch.

So we are creating a batch of 50,000 steps, that we call REPLAY_SIZE_MEMORY.

1
2
    REPLAY_MEMORY_SIZE = 50_000
    MODEL_NAME = "256x2"

💢 Log file

By default, every time we do a .fit(), keras generates a new TensorBoard file (log file). And we are performing that operation 200 times in 1 episode, and then, there are thousands of episodes. But we just want one log file, that gets updated. Hence, the below class is written.

Own Tensorboard class :

1
    class ModifiedTensorBoard(TensorBoard):

Overriding init to set initial step and writer (we want one log file for all .fit() calls) :

1
2
3
4
        def __init__(self, **kwargs):
            super().__init__(**kwargs)
            self.step = 1
            self.writer = tf.summary.FileWriter(self.log_dir)

Overriding this method to stop creating default log writer :

1
2
        def set_model(self, model):
            pass

Overrided, saves logs with our step number, otherwise every .fit() will start writing from 0th step :

1
2
        def on_epoch_end(self, epoch, logs=None):
            self.update_stats(**logs)

Overrided, we train for one batch only, no need to save anything at epoch end :

1
2
        def on_batch_end(self, batch, logs=None):
            pass

Overrided, so won’t close writer :

1
2
        def on_train_end(self, _):
            pass

Custom method for saving own metrics. Creates writer, writes custom metrics and closes writer :

1
2
        def update_stats(self, **stats):
            self._write_logs(stats, self.step)

💢 Fail over model

The model is expected to go crazy in the beginning of the learning process. Hence, we create 2 models, to ease our understanding, and deal with far less complications. The self.model will change in a drastic manner, the other one, not so much.

1
2
    class DQNAgent:
        def __init__(self):

Main model, gets trained every step :

1
        self.model = self.create_model()

Target model, gets predicted every step :

1
2
3
4
5
6
        self.target_model = self.create_model()
        self.target_model.set_weights(self.model.get_weights())

        self.replay_memory = deque(maxlen = REPLAY_MEMORY_SIZE)

        self.tensorboard = ModifiedTensorBoard(log_dir = f"logs/{MODEL_NAME}-{int(time.time())}")

For internally tracking when to update the target model :

1
2
3
4
5
        self.target_update_counter = 0

        def create_model(self):
            model = Sequential()
            model.add(Conv2D(256, (3, 3), input_shape = env.OBSERVATION_SPACE_VALUES))

Rectified linear activation :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
            model.add(Activation("relu"))
            model.add(MaxPooling2D(2, 2))
            model.add(Dropout(0.2))

            model.add(Conv2D(256, (3, 3)))
            model.add(Activation("relu"))
            model.add(MaxPooling2D(2, 2))
            model.add(Dropout(0.2))

            model.add(Flatten())
            model.add(Dense(64))

            model.add(Dense(env.ACTION_SPACE_SIZE, activation = "linear"))
            model.compile(loss="mse", optimizer = Adam(lr=0.001), metrics=['accuracy'])

            return model

        def update_replay_memory(self, transition):
            self.replay_memory.append(transition)

To normalize the RGB image data that we are passing, we divide by 255 :

1
2
        def get_qs(self, state, step):
            return self.model.predeict(np.array(state).reshape(-1, *state.shape) / 255)[0]

In the next and final part of this series, we will train our deep Q-learning model.

💢 Entire code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from keras.models import Sequential
from keras.layers import Dense, Dropout, Conv2D, MaxPooling2D, Activation, Flatten
from keras.callbacks import TensorBoard
from keras.optimizers import Adam
from collections import deque
import time
import numpy as np

REPLAY_MEMORY_SIZE = 50_000
MODEL_NAME = "256x2"

# Own Tensorboard class
class ModifiedTensorBoard(TensorBoard):

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.step = 1
        self.writer = tf.summary.FileWriter(self.log_dir)

    def set_model(self, model):
        pass

    def on_epoch_end(self, epoch, logs=None):
        self.update_stats(**logs)

    def on_batch_end(self, batch, logs=None):
        pass

    def on_train_end(self, _):
        pass

    def update_stats(self, **stats):
        self._write_logs(stats, self.step)

class DQNAgent:
    def __init__(self):
        self.model = self.create_model()

        self.target_model = self.create_model()
        self.target_model.set_weights(self.model.get_weights())

        self.replay_memory = deque(maxlen = REPLAY_MEMORY_SIZE)

        self.tensorboard = ModifiedTensorBoard(log_dir = f"logs/{MODEL_NAME}-{int(time.time())}")
        self.target_update_counter = 0

    def create_model(self):
        model = Sequential()
        model.add(Conv2D(256, (3, 3), input_shape = env.OBSERVATION_SPACE_VALUES))
        model.add(Activation("relu"))   # Rectified linear activation
        model.add(MaxPooling2D(2, 2))
        model.add(Dropout(0.2))

        model.add(Conv2D(256, (3, 3)))
        model.add(Activation("relu"))
        model.add(MaxPooling2D(2, 2))
        model.add(Dropout(0.2))

        model.add(Flatten())
        model.add(Dense(64))

        model.add(Dense(env.ACTION_SPACE_SIZE, activation = "linear"))
        model.compile(loss="mse", optimizer = Adam(lr=0.001), metrics=['accuracy'])

        return model

    def update_replay_memory(self, transition):
        self.replay_memory.append(transition)

    def get_qs(self, state, step):
        return self.model.predeict(np.array(state).reshape(-1, *state.shape) / 255)[0]
This post is licensed under CC BY 4.0 by the author.