Это основная функция:
Код: Выделить всё
def agent_worker(agent_id, agent_states, shared_stats, shared_env, stop_signal):
"""
Worker function for each agent.
"""
from agent_navigation import agent_run_simulation
agent_run_simulation(agent_id, agent_states, shared_stats, shared_env, stop_signal)
def main(routes, up_inflated_matrix, original_scaled_matrix, original_black_ranges, time_limit_minutes, num_agents):
"""
Main function to manage agents and coordinate the simulation.
"""
# Convert time limit to seconds
time_limit = time_limit_minutes * 60
manager = multiprocessing.Manager()
stop_signal = multiprocessing.Event() # Signal to stop agents
# Shared statistics
shared_stats = manager.dict({
'COUNT_ALL_COLL': 0,
'COUNT_ALL_FAILED': 0,
'COUNT_ALL_ROUTES': 0,
'TOTAL_TASKS_COMPLETED': 0,
})
# Shared environment
shared_env = manager.dict({
'original_scaled_matrix': original_scaled_matrix.tolist() if isinstance(original_scaled_matrix, np.ndarray) else original_scaled_matrix,
'up_inflated_matrix': up_inflated_matrix.tolist() if isinstance(up_inflated_matrix, np.ndarray) else up_inflated_matrix,
'original_black_ranges': original_black_ranges,
'routes': routes,
})
# Shared agent states
agent_states = manager.dict({
i: {
"current_position": None,
"tasks": [],
"completed_tasks": 0,
"collisions": 0,
"failed_tasks": 0,
}
for i in range(num_agents)
})
# Generate start and end points for initial tasks
start_points, end_points = generate_start_and_end(up_inflated_matrix, original_black_ranges, num_agents=num_agents, min_distance=4)
for agent_id in range(num_agents):
# Fetch the current agent state
agent_state = agent_states[agent_id] # Retrieve the dictionary for this agent
# Update the agent's state
agent_state["current_position"] = start_points[agent_id]
agent_state["tasks"] = [{"start_point": start_points[agent_id], "end_point": end_points[agent_id]}]
# Reassign the modified state back to agent_states
agent_states[agent_id] = agent_state
# Debug print
print(f"**** Agent {agent_id} initial position: {agent_states[agent_id]['current_position']}")
# Create and start processes for agents
processes = []
for agent_id in range(num_agents):
process = multiprocessing.Process(
target=agent_worker,
args=(agent_id, agent_states, shared_stats, shared_env, stop_signal)
)
processes.append(process)
process.start()
# Monitor the simulation time
start_time = time.time()
while time.time() - start_time < time_limit:
time.sleep(0.1) # Allow agents to work
# Stop all agents
stop_signal.set()
# Wait for all processes to complete
for process in processes:
process.join()
# Print global statistics
keys_to_print = ['COUNT_ALL_COLL', 'COUNT_ALL_FAILED', 'COUNT_ALL_ROUTES', 'TOTAL_TASKS_COMPLETED']
filtered_stats = {key: shared_stats[key] for key in keys_to_print}
print(f"Global Statistics: {filtered_stats}")
7 агентов:
%%% Агент 3: миссия :960 , провалено:0, всего: миссия: 1324 провалена: 8
%%% Агент 1: миссия:1 , провалена:1, всего: миссия: 1325 провалена: 9
%%% Агент 0: миссия:5, провалена:0, всего: миссия: 1325 провалена: 9
%%% Агент 5: миссия:18, провалена:0, всего: миссия: 1325 провалена: 9
%%% Агент 6: миссия:20 , провалена:1, всего: миссия: 1326 провалена: 10
%%% Агент 2: миссия:108 , провалено:1, всего: миссия: 1327 провалено: 11
%%% Агент 4: миссия:217, провалено:0, всего: миссия: 1327 провалено: 11
Глобальная статистика: {'COUNT_ALL_COLL ': 144, 'COUNT_ALL_FAILED': 11, 'COUNT_ALL_ROUTES': 1359, 'TOTAL_TASKS_COMPLETED': 1327
как я уже упоминал выше, определенный агент выполняет больше задач (960 миссий) по сравнению с остальными, и относительно равномерного разделения задач не существует.
Буду признателен за помощь в решении этой проблемы, так как это неправильные результаты, ведь если один агент выполняет много задач, а остальные мало, это означает, что параллелизма в программе и агенте действительно нет. берет на себя казнь, а остальные стоят в стороне и не двигаются.
Большое спасибо всем, я ценю вашу помощь.
Подробнее здесь: https://stackoverflow.com/questions/792 ... -algorithm
Мобильная версия