Проблема, из-за которой реализованный алгоритм PPO не может быть обучен.Python

Программы на Python
Ответить
Anonymous
 Проблема, из-за которой реализованный алгоритм PPO не может быть обучен.

Сообщение Anonymous »

Я написал код обучения с подкреплением на основе PPO для среды Gymnasium CarRacing-v3.
(Код был сгенерирован с помощью Gemini)
Однако даже после 200 000 кадров обучение, похоже, не улучшается.

Кадр: 40000 | Средняя награда: 0,07 | Потери: 3,1248
Кадр: 80000 | Средняя награда: 0,04 | Потеря: 7.0629
Кадр: 120000 | Средняя награда: 0,06 | Потеря: 3,9565
Кадр: 160000 | Средняя награда: 0,07 | Потеря: 5.4525
Кадр: 200000 | Средняя награда: 0,03 | Потеря: 3,8550
Хотелось бы знать, есть ли какие-либо проблемы или ошибки в приведенном ниже коде.
import gymnasium as gym
from collections import deque
from gymnasium.spaces import Box
from gymnasium.wrappers import GrayscaleObservation, ResizeObservation
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.distributions import Normal
import numpy as np

max_frames = 200000
action_repeat = 4

class EarlyStopWrapper(gym.Wrapper):
def __init__(self, env, patience=40):
super().__init__(env)
self.patience = patience
self.neg_counter = 0
self.step_count = 0

def reset(self, **kwargs):
self.neg_counter = 0
self.step_count = 0
return self.env.reset(**kwargs)

def step(self, action):
obs, reward, terminated, truncated, info = self.env.step(action)
self.step_count += 1

if self.step_count > 50:
if reward < 0:
self.neg_counter += 1
else:
self.neg_counter = 0

if self.neg_counter >= self.patience:
truncated = True
reward -= 5.0

return obs, reward, terminated, truncated, info

class ActionRepeat(gym.Wrapper):
def __init__(self, env, repeat):
super().__init__(env)
self.repeat = repeat

def step(self, action):
total_reward = 0.0
terminated = False
truncated = False

for _ in range(self.repeat):
obs, reward, term, trunc, info = self.env.step(action)
total_reward += reward
terminated = term or terminated
truncated = trunc or truncated
if terminated or truncated:
break

return obs, total_reward, terminated, truncated, info

class CustomRewardWrapper(gym.Wrapper):
def step(self, action):
obs, reward, terminated, truncated, info = self.env.step(action)

if reward > 0:
reward *= 1.2

return obs, reward, terminated, truncated, info

class FrameStack(gym.Wrapper):
def __init__(self, env, num_stack):
super().__init__(env)
self.num_stack = num_stack
self.frames = deque(maxlen=num_stack)
low = np.repeat(env.observation_space.low[np.newaxis, ...], num_stack, axis=0)
high = np.repeat(env.observation_space.high[np.newaxis, ...], num_stack, axis=0)
self.observation_space = Box(low=low, high=high, dtype=env.observation_space.dtype)

def reset(self, **kwargs):
obs, info = self.env.reset(**kwargs)
for _ in range(self.num_stack):
self.frames.append(obs)
return self._get_obs(), info

def step(self, action):
obs, reward, terminated, truncated, info = self.env.step(action)
self.frames.append(obs)
return self._get_obs(), reward, terminated, truncated, info

def _get_obs(self):
return np.array(self.frames)

def preprocess(obs):
obs = torch.from_numpy(obs).float() / 255.0
return obs

def layer_init(layer, std=np.sqrt(2), bias_const=0.0):
torch.nn.init.orthogonal_(layer.weight, std)
torch.nn.init.constant_(layer.bias, bias_const)
return layer

class CarPolicy(nn.Module):
def __init__(self):
super().__init__()
self.conv = nn.Sequential(
layer_init(nn.Conv2d(4, 32, kernel_size=3, stride=2)), nn.ReLU(),
layer_init(nn.Conv2d(32, 64, kernel_size=3, stride=2)), nn.ReLU(),
layer_init(nn.Conv2d(64, 128, kernel_size=3, stride=2)), nn.ReLU(),
nn.Flatten()
)
self.fc = nn.Sequential(layer_init(nn.Linear(15488, 256)), nn.ReLU())

self.fc_mu = layer_init(nn.Linear(256, 3), std=0.01)
self.fc_std = layer_init(nn.Linear(256, 3), std=0.01)
self.fc_value = layer_init(nn.Linear(256, 1), std=1)

def forward(self, x):
x = self.fc(self.conv(x))

raw_mu = self.fc_mu(x)
mu_steer = torch.tanh(raw_mu[:, 0:1])
mu_gas = torch.sigmoid(raw_mu[:, 1:2])
mu_brake = torch.sigmoid(raw_mu[:, 2:3])
mu = torch.cat([mu_steer, mu_gas, mu_brake], dim=1)

std = F.softplus(self.fc_std(x)) + 0.001

std = torch.clamp(std, 0.001, 1.0)

dist = Normal(mu, std)
action = dist.sample()
log_prob = dist.log_prob(action).sum(dim=-1)
value = self.fc_value(x)
return action, log_prob, value, dist

def compute_gae(next_value, rewards, masks, values, gamma=0.99, tau=0.95):
values = values + [next_value]
gae = 0
returns = []
advantages = []

for step in reversed(range(len(rewards))):
delta = rewards[step] + gamma * values[step + 1] * masks[step] - values[step]
gae = delta + gamma * tau * masks[step] * gae
advantages.append(gae)
returns.append(gae + values[step])

returns.reverse()
advantages.reverse()

returns = torch.tensor(returns, dtype=torch.float32).to(device)
advantages = torch.tensor(advantages, dtype=torch.float32).to(device)
return returns, advantages

def main():
env = gym.make("CarRacing-v3", render_mode=None)
env = GrayscaleObservation(env, keep_dim=False)

env = EarlyStopWrapper(env, patience=20)
env = CustomRewardWrapper(env)
env = ActionRepeat(env, repeat=action_repeat)
env = FrameStack(env, num_stack=4)

model = CarPolicy().to(device)

optimizer = optim.Adam(model.parameters(), lr=1e-4)
batch_size = 2000
obs, _ = env.reset()
total_frame = 0

while total_frame < max_frames:
states, actions, log_probs, rewards, masks, values = [], [], [], [], [], []

for _ in range(batch_size):
input_tensor = preprocess(obs).unsqueeze(0).to(device)
with torch.no_grad():
action, log_prob, value, _ = model(input_tensor)

real_action = action[0].cpu().numpy()

next_obs, reward, done, truncated, _ = env.step(real_action)

states.append(input_tensor)
actions.append(action)
log_probs.append(log_prob)
rewards.append(reward)
masks.append(1 - (done or truncated))
values.append(value.item())

obs = next_obs
if done or truncated:
obs, _ = env.reset()

next_input = preprocess(obs).unsqueeze(0).to(device)
with torch.no_grad():
_, _, next_value, _ = model(next_input)

returns, advantages = compute_gae(next_value.item(), rewards, masks, values)
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)

states_batch = torch.cat(states)
actions_batch = torch.cat(actions)
old_log_probs_batch = torch.cat(log_probs)

for _ in range(10):
_, new_log_probs, new_values, dist = model(states_batch)
ratio = torch.exp(new_log_probs - old_log_probs_batch)
surr1 = ratio * advantages
surr2 = torch.clamp(ratio, 0.8, 1.2) * advantages

actor_loss = -torch.min(surr1, surr2).mean()
critic_loss = F.mse_loss(new_values.squeeze(), returns)
entropy_loss = dist.entropy().mean()

loss = actor_loss + 0.5 * critic_loss - 0.01 * entropy_loss

optimizer.zero_grad()
loss.backward()
optimizer.step()

total_frame += batch_size * action_repeat

env.close()

if __name__ == "__main__":
main()


Подробнее здесь: https://stackoverflow.com/questions/798 ... s-to-train
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»