Проблемы с искусственным интеллектом клипа PPOPython

Программы на Python
Ответить
Anonymous
 Проблемы с искусственным интеллектом клипа PPO

Сообщение Anonymous »

Некоторое время назад я пытался создать AI для клипов PPO, используя псевдокод openai, но это не очень хорошо. Я пробую только простую среду (колес), и проблема, по-видимому, заключается в том, что после того, как он поработает немного, у него появляется смещение в сторону первоначального движения, что означает, что он движется в одном направлении больше, чем в другом.Я пробовал менять гиперпараметры разными способами, но результат не сильно изменился, поэтому я считаю, что в коде есть фундаментальный недостаток, вызывающий проблему, или я что-то пропустил с параметрами.< /p>
Мой код состоит из двух файлов main.py и PPO_discrete.py:
main.py
import gymnasium as gym
from PPO_discrete import PPOAgent

def main():
env = gym.make('CartPole-v1', render_mode="rgb_array") # Create CartPole environment
state_dim = 4 # Dimension of state space
action_dim = 2 # Number of discrete actions in the action space

# Initialize PPO agent
agent = PPOAgent(state_dim, action_dim, env)

agent.learn(20500, True) # Train the agent
print("Done training")

env.close() # Close the environment after training

# Reopen the environment for testing
env = gym.make('CartPole-v1', render_mode="human")

for _ in range(1000): # Test for 100 episodes
obs, _ = env.reset(seed=49)
done = False

while not done:
action, _ = agent.get_action(obs) # Extract only the action
next_obs, _, terminated, truncated, _ = env.step(action) # Pass only action to step()
obs = next_obs
done = terminated or truncated

env.close() # Properly close the environment after testing

if __name__ == "__main__":
main()

PPO_discrete.py
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np

# Policy Network for Discrete Environments
class DiscretePolicyNetwork(nn.Module):
def __init__(self, state_dim, action_dim):
super(DiscretePolicyNetwork, self).__init__()
self.fc1 = nn.Linear(state_dim, 64)
self.fc2 = nn.Linear(64, 64)
self.logits = nn.Linear(64, action_dim)

def forward(self, state):
x = F.relu(self.fc1(state))
x = F.relu(self.fc2(x))
logits = self.logits(x) # Output logits for each action
return logits

class ValueNetwork(nn.Module):
def __init__(self, state_dim):
super(ValueNetwork, self).__init__()
self.fc1 = nn.Linear(state_dim, 64)
self.fc2 = nn.Linear(64, 64)
self.fc3 = nn.Linear(64, 1)

def forward(self, state):
x = F.relu(self.fc1(state))
x = F.relu(self.fc2(x))
value = self.fc3(x)
return value

# PPOAgent with implemented optimizer parameter values
class PPOAgent:
def __init__(self, state_dim, action_dim, env):
# Initialize policy and value networks
self.policy_net = DiscretePolicyNetwork(state_dim, action_dim)
self.value_net = ValueNetwork(state_dim)

# Initialize optimizers with the specified parameter values
self.policy_optimizer = optim.Adam(self.policy_net.parameters(), lr=0.0003, eps=1e-5)
self.value_optimizer = optim.Adam(self.value_net.parameters(), lr=0.0003, eps=1e-5)

# Manually set the optimizer state values
for optimizer in [self.policy_optimizer, self.value_optimizer]:
for state in optimizer.state.values():
state['step'] = torch.tensor(1600.)
state['exp_avg'] = torch.tensor([-0.0153])
state['exp_avg_sq'] = torch.tensor([0.0024])

# Environment and other hyperparameters
self.env = env
self.mean = np.zeros(env.observation_space.shape)
self.std = np.ones(env.observation_space.shape)
self.batch_size = 64
self.epoch_count = 4
self.gamma = 0.99 # Discount factor
self.lambda_ = 0.95 # GAE parameter
self.entropy_bonus = True
self.epsilon = 0.2 # Clipping range
self.beta = 0.01 # Entropy coefficient
self.n_steps = 1024 # Number of steps per rollout

def learn(self, k, frames):
timesteps_perm = 0
timesteps = 0
states, actions, rewards, log_probs = [], [], [], []
playing = True
episode_rewards = [] # Track rewards per episode
episode_reward = 0 # Track reward for the current episode

while playing:
obs, _ = self.env.reset(seed=49)
done = False

while not done:
timesteps_perm += 1
timesteps += 1
action, log_prob = self.get_action(obs)

next_obs, reward, terminated, truncated, _ = self.env.step(action)

# Collect data
states.append(obs)
actions.append(action)
rewards.append(reward)
log_probs.append(log_prob)

episode_reward += reward
obs = next_obs
done = terminated or truncated

if done:
episode_rewards.append(episode_reward)
episode_reward = 0

# Process batch if enough timesteps collected
if timesteps >= self.n_steps:
timesteps = 0
self._process_batch(states, actions, rewards, log_probs)
avg_action = np.mean(actions)
states, actions, rewards, log_probs = [], [], [], [] # Reset batch

# Print stats after each batch
avg_reward = np.mean(episode_rewards[-10:]) if episode_rewards else 0
print(f"Batch Processed: Avg Reward (last 10 eps): {avg_reward:.2f}, Timesteps: {self.n_steps}, Total: {timesteps_perm}, Average action: {avg_action}")
episode_rewards = []

if timesteps_perm >= k: # Stop training after collecting enough states
playing = False

def get_action(self, obs):
state_tensor = torch.tensor(obs, dtype=torch.float32).unsqueeze(0)
logits = self.policy_net(state_tensor)
distribution = torch.distributions.Categorical(logits=logits)
action = distribution.sample() # Get action as a tensor
log_prob = distribution.log_prob(action)

return action.item(), log_prob.detach()

def _process_batch(self, states, actions, rewards, log_probs):
rewards = (rewards - np.mean(rewards)) / (np.std(rewards) + 1e-8)
states_tensor = torch.tensor(states, dtype=torch.float32)

# Compute rewards-to-go (RTGs)
rtgs = self._compute_rtgs(rewards)

# Compute values from the value network
values = self.value_net(states_tensor).squeeze(-1).detach()

# Compute advantage estimates using GAE
advantages = self._compute_advantage_estimates(rtgs, rewards, values)

# Normalize advantages for stability
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)

self._update_policy(advantages, actions, log_probs, states)

# Fit the value function to RTGs
self._fit_value_function(rtgs, states_tensor)

def _compute_rtgs(self, rewards):
rtgs = []
running_rtg = 0

for r in reversed(rewards):
running_rtg = r + self.gamma * running_rtg
rtgs.insert(0, running_rtg)

return torch.tensor(rtgs, dtype=torch.float32)

def _compute_advantage_estimates(self, rtgs, rewards, values):
advantages = torch.zeros_like(torch.tensor(rewards, dtype=torch.float32))

gae = 0
for t in reversed(range(len(rewards))):
next_value = 0 if t == len(rewards) - 1 else values[t + 1]
delta_t = rewards[t] + self.gamma * next_value - values[t]
gae = delta_t + self.gamma * self.lambda_ * gae
advantages[t] = gae

return advantages

def _update_policy(self, advantages, actions, log_probs, states):
states_tensor = torch.tensor(states, dtype=torch.float32)
actions_tensor = torch.tensor(actions, dtype=torch.int64) # Actions are now integers
old_log_probs_tensor = torch.tensor(log_probs, dtype=torch.float32)

for _ in range(self.epoch_count): # Use multiple epochs for stability
logits = self.policy_net(states_tensor)
distribution = torch.distributions.Categorical(logits=logits)
new_log_probs = distribution.log_prob(actions_tensor)

# Compute the probability ratio
ratios = torch.exp(new_log_probs - old_log_probs_tensor)

# Compute the surrogate objective
surrogate_1 = ratios * advantages
surrogate_2 = torch.clamp(ratios, 1 - self.epsilon, 1 + self.epsilon) * advantages
clip_loss = torch.min(surrogate_1, surrogate_2).mean()

# Add entropy bonus
entropy_bonus = distribution.entropy().mean() if self.entropy_bonus else 0
loss = -clip_loss - self.beta * entropy_bonus

self.policy_optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(self.policy_net.parameters(), max_norm=0.5)
torch.nn.utils.clip_grad_norm_(self.value_net.parameters(), max_norm=0.5)
self.policy_optimizer.step()

def _fit_value_function(self, rtgs, states_tensor, reps=2):
rtgs = rtgs.detach() # Detach the targets, they don't require gradients
for _ in range(reps):
pred_vals = self.value_net(states_tensor).squeeze(-1)
mse_loss = torch.mean((rtgs - pred_vals) ** 2)

self.value_optimizer.zero_grad()
mse_loss.backward()
self.value_optimizer.step()


Подробнее здесь: https://stackoverflow.com/questions/792 ... -ai-issues
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»