У меня есть многоагентная среда для сбора боидов с использованием Gymnasium. Я использовал PPO с политикой Mlp и 8-уровневой сетью по 512 нейронов на слой.
Хотя я добился группирования, это была централизованная настройка.
Результаты >:
Флокирование (централизованный PPO)
Сеть:
policy_kwargs = dict(
activation_fn=th.nn.Tanh, # Using ReLU activation function
net_arch=[dict(pi=[512, 512, 512, 512, 512, 512, 512, 512],
vf=[512, 512, 512, 512, 512, 512, 512, 512])] # Separate networks for policy (pi) and value function (vf)
)
Код обучения:
model = PPO("MlpPolicy", env, kwargs=policy_kwargs tensorboard_log="./ppo_Agents_tensorboard/", verbose=1, device=device)
model.set_random_seed(SimulationVariables["ModelSeed"])
Однако моя цель — создать его децентрализованную версию По сути, это MAPPO (PPO с децентрализованными участниками, но общим критиком, который, по утверждениям, вместо этого хорош для группового поведения). IPPO – полностью децентрализовано). Вот почему я построил сеть настраиваемых политик с помощью SB3 следующим образом:
class CustomMultiAgentPolicy(ActorCriticPolicy): # Make sure to inherit from the correct parent class
def __init__(self, observation_space, action_space, lr_schedule, **kwargs):
super(CustomMultiAgentPolicy, self).__init__(observation_space, action_space, lr_schedule, **kwargs)
self.obs_size = observation_space.shape[0] #just this according to your observation structure
self.hidden_size = 128 # You can change this based on your needs
self.action_space = action_space
self.actor = CustomActor(observation_space, action_space)
self.critic = SharedCritic(observation_space)
def forward(self, obs, **kwargs):
action_mean = self.actor(obs) # This is the mean of the normal distribution
# Create a normal distribution with mean and log_std (converted to std)
# action_std = th.exp(self.log_std) # Convert log_std to std
action_std = th.clamp(th.exp(self.log_std), min=1e-3, max=1.0) # Stabilize std
action_distribution = th.distributions.Normal(action_mean, action_std)
# Sample actions and get log probabilities
actions = action_distribution.sample()
log_probs = action_distribution.log_prob(actions).sum(dim=-1) # Sum over action dimensions
values = self.critic(obs)
# Ensure actions match the expected shape for the environment (in your case, [1, 6])
return actions, values, log_probs
Независимый актер и общий критик:
class CustomActor(th.nn.Module):
def __init__(self, observation_space, action_space):
super(CustomActor, self).__init__()
self.device = th.device("cuda" if th.cuda.is_available() else "cpu")
# Create 8 layers with 512 neurons each
self.layers = th.nn.ModuleList()
input_size = observation_space.shape[0]
for _ in range(8):
layer = th.nn.Linear(input_size, 512).to(self.device) # Move layer to the device
self.layers.append(layer)
input_size = 512 # Update input size for the next layer
# Update action head based on action space type
if isinstance(action_space, spaces.Box): # Continuous action space
self.action_head = th.nn.Linear(512, action_space.shape[0]).to(self.device)
elif isinstance(action_space, spaces.Discrete): # Discrete action space
self.action_head = th.nn.Linear(512, action_space.n).to(self.device)
else:
raise NotImplementedError("Action space type not supported")
def forward(self, x):
# Convert input to torch tensor if it's a numpy array and move it to the correct device
if isinstance(x, np.ndarray):
x = th.tensor(x, dtype=th.float32).to(self.device) # Move to device here
# Pass the input through the network layers
for layer in self.layers:
# print(f"layer weight device: {layer.weight.device}") # Print inside the loop
x = F.relu(layer(x)) # All layers should already be on self.device
# Get action logits from the action head
action_logits = self.action_head(x) # action_head should also be on the same device
return action_logits
class SharedCritic(th.nn.Module):
def __init__(self, observation_space):
super(SharedCritic, self).__init__()
# Create 8 layers with 512 neurons each
self.layers = th.nn.ModuleList()
input_size = observation_space.shape[0]
for _ in range(8):
self.layers.append(th.nn.Linear(input_size, 512)) # Move to device in forward method if needed
input_size = 512 # Update input size for the next layer
self.value_head = th.nn.Linear(512, 1)
def forward(self, x):
# Convert input to torch tensor if it's a numpy array
if isinstance(x, np.ndarray):
x = th.tensor(x, dtype=th.float32)
for layer in self.layers:
x = F.relu(layer(x))
value = self.value_head(x)
return value
Мои результаты, несмотря на то, что абсолютно ничего не изменилось, кроме сети, составляют 180 от того, что должно быть (пробовал с таким же количеством временных шагов, но и с меньшим).
Результаты (MAPPO) Они движутся в совершенно разных направлениях, несмотря на то, что моя функция вознаграждения не изменилась.
Функция вознаграждения:
CohesionReward = 0
AlignmentReward = 0
total_reward = 0
outofflock = False
midpoint = (SimulationVariables["SafetyRadius"] + SimulationVariables["NeighborhoodRadius"]) / 2
if len(neighbor_positions) > 0:
for neighbor_position in neighbor_positions:
distance = np.linalg.norm(agent.position - neighbor_position)
if distance
Подробнее здесь: https://stackoverflow.com/questions/791 ... with-mappo
Невозможно добиться такой же хорошей производительности PPO с помощью MAPPO. ⇐ Python
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Как отображать/блаженное текст в Pygame для хорошей производительности
Anonymous » » в форуме Python - 0 Ответы
- 4 Просмотры
-
Последнее сообщение Anonymous
-