Я написал код обучения с подкреплением на основе PPO для среды Gymnasium CarRacing-v3.
(Код был сгенерирован с помощью Gemini)
Однако даже после 200 000 кадров обучение, похоже, не улучшается.
Кадр: 40000 | Средняя награда: 0,07 | Потери: 3,1248
Кадр: 80000 | Средняя награда: 0,04 | Потеря: 7.0629
Кадр: 120000 | Средняя награда: 0,06 | Потеря: 3,9565
Кадр: 160000 | Средняя награда: 0,07 | Потеря: 5.4525
Кадр: 200000 | Средняя награда: 0,03 | Потеря: 3,8550
Хотелось бы знать, есть ли какие-либо проблемы или ошибки в приведенном ниже коде.
import gymnasium as gym
from collections import deque
from gymnasium.spaces import Box
from gymnasium.wrappers import GrayscaleObservation, ResizeObservation
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.distributions import Normal
import numpy as np
max_frames = 200000
action_repeat = 4
class EarlyStopWrapper(gym.Wrapper):
def __init__(self, env, patience=40):
super().__init__(env)
self.patience = patience
self.neg_counter = 0
self.step_count = 0
def reset(self, **kwargs):
self.neg_counter = 0
self.step_count = 0
return self.env.reset(**kwargs)
def step(self, action):
obs, reward, terminated, truncated, info = self.env.step(action)
self.step_count += 1
if self.step_count > 50:
if reward < 0:
self.neg_counter += 1
else:
self.neg_counter = 0
if self.neg_counter >= self.patience:
truncated = True
reward -= 5.0
return obs, reward, terminated, truncated, info
class ActionRepeat(gym.Wrapper):
def __init__(self, env, repeat):
super().__init__(env)
self.repeat = repeat
def step(self, action):
total_reward = 0.0
terminated = False
truncated = False
for _ in range(self.repeat):
obs, reward, term, trunc, info = self.env.step(action)
total_reward += reward
terminated = term or terminated
truncated = trunc or truncated
if terminated or truncated:
break
return obs, total_reward, terminated, truncated, info
class CustomRewardWrapper(gym.Wrapper):
def step(self, action):
obs, reward, terminated, truncated, info = self.env.step(action)
if reward > 0:
reward *= 1.2
return obs, reward, terminated, truncated, info
class FrameStack(gym.Wrapper):
def __init__(self, env, num_stack):
super().__init__(env)
self.num_stack = num_stack
self.frames = deque(maxlen=num_stack)
low = np.repeat(env.observation_space.low[np.newaxis, ...], num_stack, axis=0)
high = np.repeat(env.observation_space.high[np.newaxis, ...], num_stack, axis=0)
self.observation_space = Box(low=low, high=high, dtype=env.observation_space.dtype)
def reset(self, **kwargs):
obs, info = self.env.reset(**kwargs)
for _ in range(self.num_stack):
self.frames.append(obs)
return self._get_obs(), info
def step(self, action):
obs, reward, terminated, truncated, info = self.env.step(action)
self.frames.append(obs)
return self._get_obs(), reward, terminated, truncated, info
def _get_obs(self):
return np.array(self.frames)
def preprocess(obs):
obs = torch.from_numpy(obs).float() / 255.0
return obs
def layer_init(layer, std=np.sqrt(2), bias_const=0.0):
torch.nn.init.orthogonal_(layer.weight, std)
torch.nn.init.constant_(layer.bias, bias_const)
return layer
class CarPolicy(nn.Module):
def __init__(self):
super().__init__()
self.conv = nn.Sequential(
layer_init(nn.Conv2d(4, 32, kernel_size=3, stride=2)), nn.ReLU(),
layer_init(nn.Conv2d(32, 64, kernel_size=3, stride=2)), nn.ReLU(),
layer_init(nn.Conv2d(64, 128, kernel_size=3, stride=2)), nn.ReLU(),
nn.Flatten()
)
self.fc = nn.Sequential(layer_init(nn.Linear(15488, 256)), nn.ReLU())
self.fc_mu = layer_init(nn.Linear(256, 3), std=0.01)
self.fc_std = layer_init(nn.Linear(256, 3), std=0.01)
self.fc_value = layer_init(nn.Linear(256, 1), std=1)
def forward(self, x):
x = self.fc(self.conv(x))
raw_mu = self.fc_mu(x)
mu_steer = torch.tanh(raw_mu[:, 0:1])
mu_gas = torch.sigmoid(raw_mu[:, 1:2])
mu_brake = torch.sigmoid(raw_mu[:, 2:3])
mu = torch.cat([mu_steer, mu_gas, mu_brake], dim=1)
std = F.softplus(self.fc_std(x)) + 0.001
std = torch.clamp(std, 0.001, 1.0)
dist = Normal(mu, std)
action = dist.sample()
log_prob = dist.log_prob(action).sum(dim=-1)
value = self.fc_value(x)
return action, log_prob, value, dist
def compute_gae(next_value, rewards, masks, values, gamma=0.99, tau=0.95):
values = values + [next_value]
gae = 0
returns = []
advantages = []
for step in reversed(range(len(rewards))):
delta = rewards[step] + gamma * values[step + 1] * masks[step] - values[step]
gae = delta + gamma * tau * masks[step] * gae
advantages.append(gae)
returns.append(gae + values[step])
returns.reverse()
advantages.reverse()
returns = torch.tensor(returns, dtype=torch.float32).to(device)
advantages = torch.tensor(advantages, dtype=torch.float32).to(device)
return returns, advantages
def main():
env = gym.make("CarRacing-v3", render_mode=None)
env = GrayscaleObservation(env, keep_dim=False)
env = EarlyStopWrapper(env, patience=20)
env = CustomRewardWrapper(env)
env = ActionRepeat(env, repeat=action_repeat)
env = FrameStack(env, num_stack=4)
model = CarPolicy().to(device)
optimizer = optim.Adam(model.parameters(), lr=1e-4)
batch_size = 2000
obs, _ = env.reset()
total_frame = 0
while total_frame < max_frames:
states, actions, log_probs, rewards, masks, values = [], [], [], [], [], []
for _ in range(batch_size):
input_tensor = preprocess(obs).unsqueeze(0).to(device)
with torch.no_grad():
action, log_prob, value, _ = model(input_tensor)
real_action = action[0].cpu().numpy()
next_obs, reward, done, truncated, _ = env.step(real_action)
states.append(input_tensor)
actions.append(action)
log_probs.append(log_prob)
rewards.append(reward)
masks.append(1 - (done or truncated))
values.append(value.item())
obs = next_obs
if done or truncated:
obs, _ = env.reset()
next_input = preprocess(obs).unsqueeze(0).to(device)
with torch.no_grad():
_, _, next_value, _ = model(next_input)
returns, advantages = compute_gae(next_value.item(), rewards, masks, values)
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
states_batch = torch.cat(states)
actions_batch = torch.cat(actions)
old_log_probs_batch = torch.cat(log_probs)
for _ in range(10):
_, new_log_probs, new_values, dist = model(states_batch)
ratio = torch.exp(new_log_probs - old_log_probs_batch)
surr1 = ratio * advantages
surr2 = torch.clamp(ratio, 0.8, 1.2) * advantages
actor_loss = -torch.min(surr1, surr2).mean()
critic_loss = F.mse_loss(new_values.squeeze(), returns)
entropy_loss = dist.entropy().mean()
loss = actor_loss + 0.5 * critic_loss - 0.01 * entropy_loss
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_frame += batch_size * action_repeat
env.close()
if __name__ == "__main__":
main()
Подробнее здесь: https://stackoverflow.com/questions/798 ... s-to-train
Проблема, из-за которой реализованный алгоритм PPO не может быть обучен. ⇐ Python
Программы на Python
1771731080
Anonymous
Я написал код обучения с подкреплением на основе PPO для среды Gymnasium CarRacing-v3.
(Код был сгенерирован с помощью Gemini)
Однако даже после 200 000 кадров обучение, похоже, не улучшается.
Кадр: 40000 | Средняя награда: 0,07 | Потери: 3,1248
Кадр: 80000 | Средняя награда: 0,04 | Потеря: 7.0629
Кадр: 120000 | Средняя награда: 0,06 | Потеря: 3,9565
Кадр: 160000 | Средняя награда: 0,07 | Потеря: 5.4525
Кадр: 200000 | Средняя награда: 0,03 | Потеря: 3,8550
Хотелось бы знать, есть ли какие-либо проблемы или ошибки в приведенном ниже коде.
import gymnasium as gym
from collections import deque
from gymnasium.spaces import Box
from gymnasium.wrappers import GrayscaleObservation, ResizeObservation
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.distributions import Normal
import numpy as np
max_frames = 200000
action_repeat = 4
class EarlyStopWrapper(gym.Wrapper):
def __init__(self, env, patience=40):
super().__init__(env)
self.patience = patience
self.neg_counter = 0
self.step_count = 0
def reset(self, **kwargs):
self.neg_counter = 0
self.step_count = 0
return self.env.reset(**kwargs)
def step(self, action):
obs, reward, terminated, truncated, info = self.env.step(action)
self.step_count += 1
if self.step_count > 50:
if reward < 0:
self.neg_counter += 1
else:
self.neg_counter = 0
if self.neg_counter >= self.patience:
truncated = True
reward -= 5.0
return obs, reward, terminated, truncated, info
class ActionRepeat(gym.Wrapper):
def __init__(self, env, repeat):
super().__init__(env)
self.repeat = repeat
def step(self, action):
total_reward = 0.0
terminated = False
truncated = False
for _ in range(self.repeat):
obs, reward, term, trunc, info = self.env.step(action)
total_reward += reward
terminated = term or terminated
truncated = trunc or truncated
if terminated or truncated:
break
return obs, total_reward, terminated, truncated, info
class CustomRewardWrapper(gym.Wrapper):
def step(self, action):
obs, reward, terminated, truncated, info = self.env.step(action)
if reward > 0:
reward *= 1.2
return obs, reward, terminated, truncated, info
class FrameStack(gym.Wrapper):
def __init__(self, env, num_stack):
super().__init__(env)
self.num_stack = num_stack
self.frames = deque(maxlen=num_stack)
low = np.repeat(env.observation_space.low[np.newaxis, ...], num_stack, axis=0)
high = np.repeat(env.observation_space.high[np.newaxis, ...], num_stack, axis=0)
self.observation_space = Box(low=low, high=high, dtype=env.observation_space.dtype)
def reset(self, **kwargs):
obs, info = self.env.reset(**kwargs)
for _ in range(self.num_stack):
self.frames.append(obs)
return self._get_obs(), info
def step(self, action):
obs, reward, terminated, truncated, info = self.env.step(action)
self.frames.append(obs)
return self._get_obs(), reward, terminated, truncated, info
def _get_obs(self):
return np.array(self.frames)
def preprocess(obs):
obs = torch.from_numpy(obs).float() / 255.0
return obs
def layer_init(layer, std=np.sqrt(2), bias_const=0.0):
torch.nn.init.orthogonal_(layer.weight, std)
torch.nn.init.constant_(layer.bias, bias_const)
return layer
class CarPolicy(nn.Module):
def __init__(self):
super().__init__()
self.conv = nn.Sequential(
layer_init(nn.Conv2d(4, 32, kernel_size=3, stride=2)), nn.ReLU(),
layer_init(nn.Conv2d(32, 64, kernel_size=3, stride=2)), nn.ReLU(),
layer_init(nn.Conv2d(64, 128, kernel_size=3, stride=2)), nn.ReLU(),
nn.Flatten()
)
self.fc = nn.Sequential(layer_init(nn.Linear(15488, 256)), nn.ReLU())
self.fc_mu = layer_init(nn.Linear(256, 3), std=0.01)
self.fc_std = layer_init(nn.Linear(256, 3), std=0.01)
self.fc_value = layer_init(nn.Linear(256, 1), std=1)
def forward(self, x):
x = self.fc(self.conv(x))
raw_mu = self.fc_mu(x)
mu_steer = torch.tanh(raw_mu[:, 0:1])
mu_gas = torch.sigmoid(raw_mu[:, 1:2])
mu_brake = torch.sigmoid(raw_mu[:, 2:3])
mu = torch.cat([mu_steer, mu_gas, mu_brake], dim=1)
std = F.softplus(self.fc_std(x)) + 0.001
std = torch.clamp(std, 0.001, 1.0)
dist = Normal(mu, std)
action = dist.sample()
log_prob = dist.log_prob(action).sum(dim=-1)
value = self.fc_value(x)
return action, log_prob, value, dist
def compute_gae(next_value, rewards, masks, values, gamma=0.99, tau=0.95):
values = values + [next_value]
gae = 0
returns = []
advantages = []
for step in reversed(range(len(rewards))):
delta = rewards[step] + gamma * values[step + 1] * masks[step] - values[step]
gae = delta + gamma * tau * masks[step] * gae
advantages.append(gae)
returns.append(gae + values[step])
returns.reverse()
advantages.reverse()
returns = torch.tensor(returns, dtype=torch.float32).to(device)
advantages = torch.tensor(advantages, dtype=torch.float32).to(device)
return returns, advantages
def main():
env = gym.make("CarRacing-v3", render_mode=None)
env = GrayscaleObservation(env, keep_dim=False)
env = EarlyStopWrapper(env, patience=20)
env = CustomRewardWrapper(env)
env = ActionRepeat(env, repeat=action_repeat)
env = FrameStack(env, num_stack=4)
model = CarPolicy().to(device)
optimizer = optim.Adam(model.parameters(), lr=1e-4)
batch_size = 2000
obs, _ = env.reset()
total_frame = 0
while total_frame < max_frames:
states, actions, log_probs, rewards, masks, values = [], [], [], [], [], []
for _ in range(batch_size):
input_tensor = preprocess(obs).unsqueeze(0).to(device)
with torch.no_grad():
action, log_prob, value, _ = model(input_tensor)
real_action = action[0].cpu().numpy()
next_obs, reward, done, truncated, _ = env.step(real_action)
states.append(input_tensor)
actions.append(action)
log_probs.append(log_prob)
rewards.append(reward)
masks.append(1 - (done or truncated))
values.append(value.item())
obs = next_obs
if done or truncated:
obs, _ = env.reset()
next_input = preprocess(obs).unsqueeze(0).to(device)
with torch.no_grad():
_, _, next_value, _ = model(next_input)
returns, advantages = compute_gae(next_value.item(), rewards, masks, values)
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
states_batch = torch.cat(states)
actions_batch = torch.cat(actions)
old_log_probs_batch = torch.cat(log_probs)
for _ in range(10):
_, new_log_probs, new_values, dist = model(states_batch)
ratio = torch.exp(new_log_probs - old_log_probs_batch)
surr1 = ratio * advantages
surr2 = torch.clamp(ratio, 0.8, 1.2) * advantages
actor_loss = -torch.min(surr1, surr2).mean()
critic_loss = F.mse_loss(new_values.squeeze(), returns)
entropy_loss = dist.entropy().mean()
loss = actor_loss + 0.5 * critic_loss - 0.01 * entropy_loss
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_frame += batch_size * action_repeat
env.close()
if __name__ == "__main__":
main()
Подробнее здесь: [url]https://stackoverflow.com/questions/79893806/the-issue-where-the-implemented-ppo-algorithm-fails-to-train[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия