Кодирование и понимание алгоритма TD(0) с использованием Python
Это продолжение моей предыдущей статьи:
В этой статье я хочу познакомить читателя с логикой алгоритма на основе примеров в Reinforcement Learning (RL). Для этого мы создадим сетчатый мир с отверстиями (как на миниатюре) и позволим нашему агенту свободно перемещаться по нашему созданному миру.
Будем надеяться, что к концу путешествия агент узнает, где в мире лучше всего находиться, а каких мест следует избегать. Чтобы помочь нашему агенту в процессе обучения, мы будем использовать знаменитый алгоритм TD(0).
Прежде чем углубляться в алгоритмы, давайте определим цель, которую мы хотим решить.
В этой статье мы создадим мир сетки с 5 строками и 7 столбцами, то есть наш агент сможет находиться в одном из 35 состояний. Правила движения таковы:
- Агент не может выйти за границы мира сетки.
- Агент на каждом временном шаге может двигаться только вверх, вниз, влево или вправо.
- Агент начинается в верхнем левом углу нашего мира сетки.
- Если агент достигает своей цели или падает в яму, игра заканчивается, и он возвращается в исходное состояние.
- Каждое движение приносит награду -1.
- Падение в яму приносит награду -10.
- Достижение цели приносит награду 10.
Конечная цель нашего агента — как можно лучше оценить каждое состояние, в котором он может находиться. Другими словами, наш агент хочет оценить значение каждого состояния при определенной политике перемещения.
Вырезанный ниже код инициализирует среду, указанную в предыдущем разделе:
import numpy as np def init_policy(S: np.array, weight_dict: dict = {'right': 1}) -> dict: # Saving all the unique states to a vector states = np.unique(S) # Getting the number of rows and columns of the S matrix n_row = S.shape[0] n_col = S.shape[1] # Dictionary to hold each action for a given state P = {} for s in states: s_dict = {} # Checking which index is the current state in the S matrix s_index = np.where(S == s) # If the state is in the top left corner, we can only move right and down if s_index == (0, 0): s_dict['right'] = 0.5 * weight_dict['right'] s_dict['down'] = 1 - s_dict['right'] # If the state is in the top right corner, we can only move left and down elif s_index == (0, n_col - 1): s_dict['left'] = 0.5 s_dict['down'] = 0.5 # If the state is in the bottom left corner, we can only move right and up elif s_index == (n_row - 1, 0): s_dict['right'] = 0.5 * weight_dict['right'] s_dict['up'] = 1 - s_dict['right'] # If the state is in the bottom right corner, we can only move left and up elif s_index == (n_row - 1, n_col - 1): s_dict['left'] = 0.5 s_dict['up'] = 0.5 # If the state is in the first row, we can only move left, right, and down elif s_index[0] == 0: s_dict['right'] = 0.333 * weight_dict['right'] s_dict['left'] = (1 - s_dict['right']) / 2 s_dict['down'] = (1 - s_dict['right']) / 2 # If the state is in the last row, we can only move left, right, and up elif s_index[0] == n_row - 1: s_dict['right'] = 0.333 * weight_dict['right'] s_dict['left'] = (1 - s_dict['right']) / 2 s_dict['up'] = (1 - s_dict['right']) / 2 # If the state is in the first column, we can only move up, down, and right elif s_index[1] == 0: s_dict['right'] = 0.333 * weight_dict['right'] s_dict['up'] = (1 - s_dict['right']) / 2 s_dict['down'] = (1 - s_dict['right']) / 2 # If the state is in the last column, we can only move up, down, and left elif s_index[1] == n_col - 1: s_dict['up'] = 0.333 s_dict['down'] = 0.333 s_dict['left'] = 1 - s_dict['up'] - s_dict['down'] # If the state is in the middle, we can move in all directions else: s_dict['right'] = 0.25 * weight_dict['right'] s_dict['up'] = (1 - s_dict['right']) / 3 s_dict['down'] = (1 - s_dict['right']) / 3 s_dict['left'] = (1 - s_dict['right']) / 3 # Saving the current states trasition probabilities P[s] = s_dict return P def generate_holes(nrow: int, ncol: int, start_coords: list, hole_coords: list, nholes: int = 1) -> list: """ Function that generates nholes in a gridworld The holes cannot be: - in the start state - in the goal state """ # Generating the hole coordinates # The hole cannot be in the start or goal state hole_coords = [] for _ in range(nholes): hole_row = np.random.randint(0, nrow - 1) hole_col = np.random.randint(0, ncol - 1) while (hole_row, hole_col) in start_coords or (hole_row, hole_col) in hole_coords: hole_row = np.random.randint(0, nrow - 1) hole_col = np.random.randint(0, ncol - 1) # Appending to the hole coordinates list hole_coords.append((hole_row, hole_col)) return hole_coords def init_env( n_rows: int, n_cols: int, step_reward: float = -1, goal_reward: float = 10, hole_reward: float = -10, n_holes: int = 1, random_seed: int = 42, policy_weights: dict = {'right': 1} ) -> np.array: """ Functionat that returns the initial environment: S - the state matrix indexed by [row, col] V - the initial value matrix indexed by [row, col] R - the reward matrix indexed by [row, col] A - the action matrix indexed by [row, col] P - the probability dictionary where for each state, the keys are the actions and the values are the probabilities of the next state """ # Setting the random seed np.random.seed(random_seed) # Initiating the S matrix S = np.arange(0, n_rows * n_cols).reshape(n_rows, n_cols) # Creating the initial V matrix V = np.zeros((n_rows, n_cols)) # The start state will be always the top left corner # The goal state will be always the bottom right corner # We will generate a random holes that our agent can fall in # Any other state that is not the hole or the goal state will receive a step reward goal_coord = (n_rows - 1, n_cols - 1) R = np.zeros((n_rows, n_cols)) R.fill(step_reward) R[0, 0] = step_reward R[goal_coord] = goal_reward # Generating the hole coordinates # The hole cannot be in the start or goal state hole_coords = generate_holes(n_rows, n_cols, [(0, 0)], [goal_coord], n_holes) # Setting the hole reward for hole_coord in hole_coords: R[hole_coord] = hole_reward # Initiating the policy P = init_policy(S, weight_dict=policy_weights) return S, V, R, P, hole_coords, [goal_coord]
Объекты, которые нам нужны для начала обучения:
- Матрица состояния S
- Матрица значений V
- Матрица вознаграждения R
- Словарь политик P
По умолчанию приведенный выше фрагмент кода инициирует мир со случайной политикой.
Случайная политика означает, что наш агент решает перейти из одного состояния в другое с помощью равномерного распределения вероятностей.
Давайте создадим наш мир и более подробно изучим матрицы:
S, V, R, P, hole_coords, goal_coard = init_env(5, 7, n_holes=4, random_seed=3)
Фрагмент кода ниже используется для построения матриц:
def array_index_to_matplot_coords(i: int, j: int, n_cols: int) -> Tuple[int, int]: """ Converts an array index to a matplot coordinate """ x = j y = n_cols - i - 1 return x, y def plot_matrix( M: np.array, goal_coords: list = [], hole_coords: list = [], img_width: int = 5, img_height: int = 5, title: str = None, ) -> None: """ Plots a matrix as an image. """ height, width = M.shape fig = plt.figure(figsize=(img_width, img_width)) ax = fig.add_subplot(111, aspect='equal') for x in range(height): for y in range(width): # By default, the (0, 0) coordinate in matplotlib is the bottom left corner, # so we need to invert the y coordinate to plot the matrix correctly matplot_x, matplot_y = array_index_to_matplot_coords(x, y, height) # If there is a tuple of (x, y) in the goal_coords list, we color the cell gray if (x, y) in goal_coords: ax.add_patch(matplotlib.patches.Rectangle((matplot_x - 0.5, matplot_y - 0.5), 1, 1, facecolor='gray')) # If there is a tuple of (x, y) in the hole_coords list, we color the cell salmon elif (x, y) in hole_coords: ax.add_patch(matplotlib.patches.Rectangle((matplot_x - 0.5, matplot_y - 0.5), 1, 1, facecolor='salmon')) ax.annotate(str(M[x][y]), xy=(matplot_x, matplot_y), ha='center', va='center') offset = .5 ax.set_xlim(-offset, width - offset) ax.set_ylim(-offset, height - offset) ax.hlines(y=np.arange(height+1)- offset, xmin=-offset, xmax=width-offset) ax.vlines(x=np.arange(width+1) - offset, ymin=-offset, ymax=height-offset) plt.title(title) plt.show() def plot_policy_matrix(P: dict, S:np.array, terminal_coords: list = [], img_width: int = 5, img_height: int = 5, title: str = None) -> None: """ Plots the policy matrix out of the dictionary provided; The dictionary values are used to draw the arrows """ height, width = S.shape fig = plt.figure(figsize=(img_width, img_width)) ax = fig.add_subplot(111, aspect='equal') for x in range(height): for y in range(width): matplot_x, matplot_y = array_index_to_matplot_coords(x, y, height) # If there is a tuple of (x, y) in the goal_coords list, we color the cell gray if (x, y) in terminal_coords: ax.add_patch(matplotlib.patches.Rectangle((matplot_x - 0.5, matplot_y - 0.5), 1, 1, facecolor='gray')) else: try: # Adding the arrows to the plot if 'up' in P[S[x, y]]: plt.arrow(matplot_x, matplot_y, 0, 0.3, head_width = 0.05, head_length = 0.05) if 'down' in P[S[x, y]]: plt.arrow(matplot_x, matplot_y, 0, -0.3, head_width = 0.05, head_length = 0.05) if 'left' in P[S[x, y]]: plt.arrow(matplot_x, matplot_y, -0.3, 0, head_width = 0.05, head_length = 0.05) if 'right' in P[S[x, y]]: plt.arrow(matplot_x, matplot_y, 0.3, 0, head_width = 0.05, head_length = 0.05) except Exception as e: print(f"Error: {e}") print(f"Current x and y: {x}, {y}") offset = .5 ax.set_xlim(-offset, width - offset) ax.set_ylim(-offset, height - offset) ax.hlines(y=np.arange(height+1)- offset, xmin=-offset, xmax=width-offset) ax.vlines(x=np.arange(width+1) - offset, ymin=-offset, ymax=height-offset) plt.title(title)
Давайте сначала визуализируем матрицу состояния:
plot_matrix(S, goal_coords=goal_coard, hole_coords=hole_coords, title='State Matrix')
Красные состояния обозначают координаты дыр — это состояния, которых хочет избежать наш агент.
Серое состояние указывает на цель — это то место, где хочет быть наш агент.
Наш агент всегда будет начинать свое путешествие из состояния 0.
Матрица вознаграждения выглядит следующим образом:
plot_matrix(R, goal_coords=goal_coard, hole_coords=hole_coords, title='Reward Matrix')
Матрица вознаграждения за переход в определенное состояние визуализирована выше.
Например:
- Переход из состояния 1 в состояние 8 даст награду -1.
- Переход из состояния 9 в состояние 10 даст награду -10.
- Переход из состояния 33 в состояние 34 даст награду в размере 10.
И так далее.
Политика, которой будет следовать наш агент, является случайной политикой — переход в каждое из состояний равномерен:
plot_policy_matrix(P, S, terminal_coords=hole_coords + goal_coard, title='Policy Matrix')
Серые состояния в матрице политики представляют терминальные состояния: если агент решит перейти в это состояние, эпизод завершится, и агент будет сброшен в состояние 0.
Цель алгоритма TD(0) — оценить значение каждого состояния для заданной политики.
Другими словами, мы хотим заполнить значения матрицы значений:
plot_matrix(V, goal_coords=goal_coard, hole_coords=hole_coords, title='Value Matrix')
Алгоритм TD(0) является сокращением от алгоритм одношаговых временных различий. Для того, чтобы начать строить интуицию и в более широком смысле, в этом алгоритме наш агент делает один шаг в соответствии с заданной политикой, наблюдает за вознаграждением и обновляет оценки значения состояния после такого шага.
Математически говоря, шаг обновления выглядит следующим образом:
Здесь:
- s prime — состояние, в которое переходит наш агент из текущего состояния s.
- Вознаграждение r равно вознаграждению за переход на s Prime.
- Гамма — это ставка дисконтирования (больше 0, меньше или равна 1).
- Альфа — это размер (больше 0, меньше или равно 1).
Полный алгоритм¹ следующий:
Алгоритм TD(0) — это алгоритм прогнозирования. В RL алгоритм прогнозирования относится к алгоритму, который пытается оценить значения состояния, не изменяя заданную политику (вероятности перехода).
Это также алгоритм начальной загрузки, поскольку мы используем текущую оценку функции значения для оценки функции значения для следующего состояния.
Таким образом, нас интересуют только значения состояния — общее ожидаемое накопленное вознаграждение, если агент перейдет из текущего состояния:
Теперь приступим к реализации алгоритма.
Первое, что нужно нашему агенту, это сделать ход на основе созданной нами политики:
def select_move(s, S, P) -> int: """ Given the current state, returns the coordinates of the next state based on the current policy """ # Getting the current state index s_index = np.where(S == s) # Getting the current state policy s_policy = P[s] # Selecting the next action based on the current policy next_action = np.random.choice(list(s_policy.keys()), p=list(s_policy.values())) # Getting the next state coordinates based on the next action try: if next_action == 'up': next_state = S[s_index[0] - 1, s_index[1]][0] elif next_action == 'down': next_state = S[s_index[0] + 1, s_index[1]][0] elif next_action == 'left': next_state = S[s_index[0], s_index[1] - 1][0] elif next_action == 'right': next_state = S[s_index[0], s_index[1] + 1][0] except Exception as e: print(f"Current state: {s}") print(f'Next action: {next_action}') print(f'Error: {e}') return next_state
Когда агент находится в состоянии s, он может переходить только в состояния, которые присутствуют в словаре матрицы политик. Например, все действия в состоянии 1:
Сумма всех вероятностей равна 1, и наш агент случайным образом выбирает действия вправо, влево или вниз (обратитесь к графику матрицы состояний, чтобы увидеть, где находится состояние).
Приведенное выше действие — это все, что нужно для начала обновления функции значения. Когда наш агент делает ход, он переходит в другое состояние и получает награду этого состояния. Затем применяем уравнение:
def get_state_coords(s, S) -> tuple: """ Returns the state coordinates given the state index """ s_index = np.where(S == s) return s_index[0][0], s_index[1][0] def update_value(s, s_prime, S, P, V, R, alpha: float = 0.1, gamma: float = 0.9) -> float: """ Updates the current value function based on the current policy """ # Getting the CURRENT state's nrow and ncol index s_index_now = get_state_coords(s, S) # Getting the SELECTED state's nrow and ncol index s_index_prime = get_state_coords(s_prime, S) # Getting the reward by moving to the selected state move_reward = R[s_index_prime[0], s_index_prime[1]] # Getting the current estimated value of the selected state current_value = V[s_index_now[0], s_index_now[1]] # The next value prime_value = V[s_index_prime[0], s_index_prime[1]] # Returning the TD(0) current state value return current_value + alpha * (move_reward + gamma * prime_value - current_value)
Последнее, что нужно сделать, это заключить все в цикл while и прекратить исследование только тогда, когда наш агент перейдет в терминальное состояние:
def episode_exploration(S, P, V, R, terminal_state_coords: list, alpha: float = 0.1, gamma: float = 0.9) -> None: """ Agent exploration and value updating using TD(0) equation until a terminal state is reached """ # The starting state is 0 s = 0 # Keeping track of the number of moves n_moves = 0 # Getting the coordinates of the s s_coords = get_state_coords(s, S) while s_coords not in terminal_state_coords: # Selecting the next state based on the current policy s_prime = select_move(s, S, P) # Updating the current state value V[s_coords] = update_value(s, s_prime, S, P, V, R, alpha, gamma) # Updating the current state s = s_prime # Incrementing the number of moves n_moves += 1 # Getting teh new s coords s_coords = get_state_coords(s, S) return n_moves
Теперь у нас есть все необходимое для реализации полного алгоритма TD(0).
Давайте определим 10000 эпизодов и пусть наш агент учится!
# Defining the number of episodes to explore n_episodes = 10000 # We will plot the V matrix after each episode filling the same device plot to make an animation number_of_walks = [] for _ in tqdm(range(n_episodes)): n = episode_exploration(S, P, V, R, terminal_state_coords=hole_coords + goal_coard, alpha=0.1, gamma=0.9) number_of_walks.append(n)
Количество действий, предпринятых нашим агентом до расторжения:
# Ploting the distribution of the number of moves plt.figure(figsize=(10, 5)) sns.kdeplot(number_of_walks, fill=True) plt.title(f'Number of moves distribution | Mean: {np.mean(number_of_walks):.2f} | Std: {np.std(number_of_walks):.2f}') plt.xlabel('Number of moves') plt.ylabel('Frequency') plt.show()
В среднем наш агент делал 10 ходов, прежде чем наткнулся на терминальное состояние.
Окончательная матрица оцененных значений состояния:
Как мы видим, следуя данной политике, очень плохо находиться там, где наш агент начинает свое путешествие. В среднем, начиная с этого состояния, агент получает только вознаграждение -9,96. Однако по мере приближения к целевому состоянию ценность возрастает.
Обратите внимание, что целевое состояние и состояния дыры имеют значение 0, потому что из этих состояний нет исследования — каждый раз, когда агент переходит туда, игра заканчивается.
Что произойдет, если мы выберем другую политику? Например, чаще выбирали направление «направо»:
# Assiging a different policy S, V, R, P, hole_coords, goal_coard = init_env(5, 7, n_holes=4, random_seed=3, policy_weights={'right': 1.5}) # Defining the number of episodes to explore n_episodes = 10000 # We will plot the V matrix after each episode filling the same device plot to make an animation number_of_walks = [] for _ in tqdm(range(n_episodes)): n = episode_exploration(S, P, V, R, terminal_state_coords=hole_coords + goal_coard, alpha=0.1, gamma=0.9) number_of_walks.append(n)
Сумма матрицы значений состояний случайных политик равна -249,29, а сумма политик с большей вероятностью перехода вправо равна -213,51.
В этом смысле мы можем сказать, что чаще двигаться вправо — лучшая политика!
В этой статье я представил первый алгоритм на основе выборки в RL — одношаговый алгоритм временной разности или TD(0).
Это алгоритм прогнозирования, то есть он используется только для оценки состояний для данной политики. Изменение политики приведет к другим результатам по значению состояния.
Всем удачного обучения и удачного кодирования!
[1]
- Автор: Ричард С. Саттон, Эндрю Г. Барто
- Год: 2018
- Страница: 120
- Название: Обучение с подкреплением: введение
- URL-адрес: https://archive.ics.uci.edu/ml