Я провел некоторое время в поисках решения этой простой проблемы, но все еще не мог ее найти. Проблема состоит в том, чтобы имитировать процесс, когда некоторые события могут быть процессами одновременно путем ограниченного количества ресурсов, в то время как некоторые события требуют агрегирования сначала, а затем пакетной обработки. Вот простой пример кодового примера транспортировки парома: < /p>
import simpy
import matplotlib.pyplot as plt
class Person:
def __init__(self, name):
self.name = name
self.timeline = []
def log(self, event, time):
self.timeline.append((event, time))
class BatchResource(simpy.Resource):
def __init__(self, env, capacity, batch_size, process_time):
super().__init__(env, capacity)
self.batch_size = batch_size
self.process_time = process_time
self.batch_queue = []
def request(self):
req = super().request()
self.batch_queue.append(req)
# Start processing only if a full batch is ready
if len(self.batch_queue) >= self.batch_size:
self._env.process(self.process_batch())
return req
def process_batch(self):
batch = self.batch_queue[: self.batch_size] # Take the batch
self.batch_queue = self.batch_queue[self.batch_size :] # Remove from queue
print(f"[{self._env.now}] Ferry is full! Departing with {len(batch)} people.")
yield self._env.timeout(self.process_time) # Simulate crossing time
print(f"[{self._env.now}] Ferry has crossed the river.")
# Release all passengers in the batch
for req in batch:
if not req.triggered: # Ensure it hasn't already been granted
req.succeed()
def person(env, person, ferry, casses_before, casses_after):
# Process before crossing the river
with casses_before.request() as request:
yield request
person.log("start_before", env.now)
yield env.timeout(1)
person.log("end_before", env.now)
# Wait for the ferry (only departs when full)
with ferry.request() as request:
yield request
person.log("start_wait", env.now)
yield env.timeout(1)
person.log("end_wait", env.now)
# Process after crossing the river
with casses_after.request() as request:
yield request
person.log("start_after", env.now)
yield env.timeout(1)
person.log("end_after", env.now)
def setup(env, num_people, capacity, people):
casses_before = simpy.Resource(env, capacity=3)
casses_after = simpy.Resource(env, capacity=3)
ferry = BatchResource(env, capacity=capacity, batch_size=capacity, process_time=5)
for i in range(num_people):
person_instance = Person(f"Person {i}")
people.append(person_instance)
env.process(person(env, person_instance, ferry, casses_before, casses_after))
yield env.timeout(0.5) # New person arrives every 0.5 time units
# Setup and start the simulation
env = simpy.Environment()
people = []
env.process(setup(env, num_people=30, capacity=10, people=people))
env.run(until=50)
# Plot Gantt-like chart
fig, ax = plt.subplots(figsize=(10, 8))
colors = {"before": "tab:blue", "wait": "tab:orange", "after": "tab:green"}
for i, person in enumerate(people):
for event, time in person.timeline:
if "start" in event:
start_time = time
elif "end" in event:
end_time = time
stage = event.split("_")[1]
ax.broken_barh(
[(start_time, end_time - start_time)],
(i * 10, 9),
facecolors=(colors[stage]),
)
ax.set_yticks([i * 10 + 5 for i in range(len(people))])
ax.set_yticklabels([person.name for person in people])
ax.set_xlabel("Time")
ax.set_ylabel("Person")
ax.grid(True)
plt.show()
Это приводит к следующему процессу Gantt:
Но я ищу случай, когда паром (оранжевые блоки) ждали, пока люди полностью его заполнят, а затем выполняют работу.
может кто -нибудь, пожалуйста, предложить способ его реализовать? < /p>
Спасибо! < /p>
Я провел некоторое время в поисках решения этой простой проблемы, но все еще не мог ее найти. Проблема состоит в том, чтобы имитировать процесс, когда некоторые события могут быть процессами одновременно путем ограниченного количества ресурсов, в то время как некоторые события требуют агрегирования сначала, а затем пакетной обработки. Вот простой пример кодового примера транспортировки парома: < /p> [code]import simpy import matplotlib.pyplot as plt
class Person: def __init__(self, name): self.name = name self.timeline = []
# Start processing only if a full batch is ready if len(self.batch_queue) >= self.batch_size: self._env.process(self.process_batch())
return req
def process_batch(self): batch = self.batch_queue[: self.batch_size] # Take the batch self.batch_queue = self.batch_queue[self.batch_size :] # Remove from queue
print(f"[{self._env.now}] Ferry is full! Departing with {len(batch)} people.") yield self._env.timeout(self.process_time) # Simulate crossing time
print(f"[{self._env.now}] Ferry has crossed the river.")
# Release all passengers in the batch for req in batch: if not req.triggered: # Ensure it hasn't already been granted req.succeed()
def person(env, person, ferry, casses_before, casses_after): # Process before crossing the river with casses_before.request() as request: yield request person.log("start_before", env.now) yield env.timeout(1) person.log("end_before", env.now)
# Wait for the ferry (only departs when full) with ferry.request() as request: yield request person.log("start_wait", env.now) yield env.timeout(1) person.log("end_wait", env.now)
# Process after crossing the river with casses_after.request() as request: yield request person.log("start_after", env.now) yield env.timeout(1) person.log("end_after", env.now)
for i in range(num_people): person_instance = Person(f"Person {i}") people.append(person_instance) env.process(person(env, person_instance, ferry, casses_before, casses_after)) yield env.timeout(0.5) # New person arrives every 0.5 time units
# Setup and start the simulation env = simpy.Environment() people = [] env.process(setup(env, num_people=30, capacity=10, people=people)) env.run(until=50)
for i, person in enumerate(people): for event, time in person.timeline: if "start" in event: start_time = time elif "end" in event: end_time = time stage = event.split("_")[1] ax.broken_barh( [(start_time, end_time - start_time)], (i * 10, 9), facecolors=(colors[stage]), )
ax.set_yticks([i * 10 + 5 for i in range(len(people))]) ax.set_yticklabels([person.name for person in people]) ax.set_xlabel("Time") ax.set_ylabel("Person") ax.grid(True)
plt.show() [/code] Это приводит к следующему процессу Gantt:
Но я ищу случай, когда паром (оранжевые блоки) ждали, пока люди полностью его заполнят, а затем выполняют работу. может кто -нибудь, пожалуйста, предложить способ его реализовать? < /p> Спасибо! < /p>
Я провел некоторое время в поисках решения этой простой проблемы, но все еще не мог ее найти. Проблема состоит в том, чтобы имитировать процесс, когда некоторые события могут быть процессами одновременно путем ограниченного количества ресурсов, в то...
Я работаю над симуляцией, работающей с simpy, где у меня есть такой класс:
class memory:
def __init__(self, env: Environment, size: int, memory_name: str):
Instantiation function
Доступны ресурсы между часами открытия (8 утра -8 вечера).
Любые объекты, использующие ресурсы, выпускаются через 1 минуту до 8 вечера, а затем
for -for -recorditions для претензий на все ресурсы с -1
import simpy...
Доступны ресурсы между часами открытия (8 утра -8 вечера).
Любые объекты, использующие ресурсы, выпускаются через 1 минуту до 8 вечера, а затем
for -for -recorditions для претензий на все ресурсы с -1
import simpy...
Доступны ресурсы между часами открытия (8 утра -8 вечера).
Любые объекты, использующие ресурсы, выпускаются через 1 минуту до 8 вечера, а затем
for -for -recorditions для претензий на все ресурсы с -1
import simpy...