Почему RingBuffer блокируется после добавления условия для уведомления о поступлении новых данныхPython

Программы на Python
Ответить
Anonymous
 Почему RingBuffer блокируется после добавления условия для уведомления о поступлении новых данных

Сообщение Anonymous »

Мне нужен кольцевой буфер, который содержит массивы numpy с отметкой времени, к нему можно добавить, и его можно попросить получить все массивы, которые были добавлены через определенное время.
Это работало правильно, пока я не добавил multiprocess.Condition, чтобы функция получения не возвращала пустой список, если после последнего вызова ничего не было добавлено (просто блокируйте, пока не будет записана хотя бы одна новая запись).
Как только я добавил свое событие, оно перестало работать, поскольку для чтения условия события требуется блокировка, но для добавления также требуется та же блокировка.
Вот мой код:
from typing import NamedTuple, List
import multiprocessing
import ctypes

import numpy as np

class TimedData(NamedTuple):
t: float
val: np.ndarray

class RingBuffer:
def __init__(self, size: int, dim: int):
self._dim = dim + 1
self._size = size
self.buffer = multiprocessing.Array(ctypes.c_float, size * self._dim)
self.np_buffer = np.frombuffer(self.buffer.get_obj(), dtype=np.float32)
self._head = multiprocessing.Value('i', 0)
self._count = multiprocessing.Value('i', 0)
self.lock = multiprocessing.Lock()
self._data_added = multiprocessing.Condition(self.lock)

def append(self, data: TimedData):
assert data.val.size == self._dim - 1
with self._data_added:
head = self._head.value
self.buffer[head] = data.t
self.np_buffer[head + 1:head + self._dim] = data.val[:]
self._head.value = (head + self._dim) % (self._size * self._dim)
if self._count.value < self._size:
self._count.value += 1
self._data_added.notify_all()

def get_measurements_after_time(self, t: float) -> List[TimedData]:
def has_new_data():
latest_idx = (self._head.value - self._dim) % (self._size * self._dim)
return self._count.value > 0 and self.buffer[latest_idx] > t

with self._data_added:
self._data_added.wait_for(has_new_data)

head = self._head.value
count = self._count.value

num_items = count
left, right = 0, num_items
offset = 0 if num_items < self._size else head

while left < right:
mid = (left + right) // 2
idx = (offset + mid * self._dim) % (self._size * self._dim)
if self.buffer[idx] > t:
right = mid
else:
left = mid + 1

result = []
for i in range(left, num_items):
idx = (offset + i * self._dim) % (self._size * self._dim)
result.append(
TimedData(
t=self.buffer[idx],
val=np.array(self.np_buffer[idx + 1:idx + self._dim], copy=True)
))
return result


Вот пример использования:
# Track time for ensuring the test runs for 2 minutes
TEST_DURATION = 12 # seconds
APPEND_INTERVAL = 0.0001 # 0.1 ms
PROCESS_INTERVAL = 0.0001 # 0.1 ms

def append_measurements(buffer: RingBuffer,
stop_event: multiprocessing.Event,
timestamps: multiprocessing.Manager().list):
"""Process that appends measurements every 0.001 seconds."""
start_time = time.time()

while not stop_event.is_set():
# Incremental time to simulate real-time measurement (here we just use time.time)
t = time.time() - start_time
accel = np.array([random(), random(), random()])
gyro = np.array([random(), random(), random()])
magneto = np.array([random(), random(), random()])
measurement = Measurement(t, accel, gyro, magneto)

buffer.append(measurement.to_timed_data())
timestamps.append(t) # Track the timestamp of each measurement appended
time.sleep(APPEND_INTERVAL)

# Stop after the test duration
if time.time() - start_time >= TEST_DURATION:
stop_event.set()

def process_measurements(buffer: RingBuffer,
stop_event: multiprocessing.Event,
timestamps: multiprocessing.Manager().list):
"""Process that retrieves and processes measurements every 100ms, verifying order and continuity."""
last_time = 0
retrieved_times = []

while not stop_event.is_set():
measurements = buffer.get_measurements_after_time(last_time)
if measurements:
# Check continuity and update `last_time`
for measurement in measurements:
retrieved_times.append(measurement.t)
last_time = measurements[-1].t

# Sleep to simulate processing
time.sleep(PROCESS_INTERVAL)

# Assertions after process completes
assert len(retrieved_times) = len(timestamps) - PROCESS_INTERVAL / APPEND_INTERVAL
for i, (rt, tt) in enumerate(zip(retrieved_times[:-2], timestamps[:-2])):
if abs(rt - tt) > 1e-7 * rt:
print(f"{i}: {rt} != {tt} - {retrieved_times[i-2:i+2], timestamps[i-2:i+2]}")
assert np.allclose(retrieved_times, timestamps[:len(retrieved_times)]), "Mismatch between retrieved and written timestamps"
dt = np.diff(retrieved_times)
print(f"Avg dt: {np.mean(dt)}, Std: {np.std(dt)}")
assert np.all(np.diff(dt) < 2 * APPEND_INTERVAL), "Written time stamps are not at the correct timing"
assert abs(len(retrieved_times) - TEST_DURATION / APPEND_INTERVAL) < 0.01 / APPEND_INTERVAL, f"Written only {len(retrieved_times)}, expected {TEST_DURATION / APPEND_INTERVAL}"
print(
f"All {len(retrieved_times)} timestamps are consistent between written and retrieved measurements.")

def run_double_ringbuffer_test():
size = 100000 # Ring buffer size
buffer = RingBuffer(size, Measurement.dim())

# Shared event and manager list for tracking data
stop_event = multiprocessing.Event()
manager = multiprocessing.Manager()
timestamps = manager.list() # Track appended timestamps

# Start append and processing processes
append_proc = multiprocessing.Process(target=append_measurements,
args=(buffer, stop_event, timestamps))
process_proc = multiprocessing.Process(target=process_measurements, args=(
buffer, stop_event, timestamps))

append_proc.start()
process_proc.start()

try:
# Wait for both processes to finish after the specified test duration
append_proc.join()
process_proc.join()
except KeyboardInterrupt:
stop_event.set()
append_proc.terminate()
process_proc.terminate()

print("Test completed successfully.")



Подробнее здесь: https://stackoverflow.com/questions/791 ... w-data-arr
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»