Правильное использование атомики для асинхронного итератора с несколькими производителями и потребителями в C ++ 20C++

Программы на C++. Форум разработчиков
Ответить
Anonymous
 Правильное использование атомики для асинхронного итератора с несколькими производителями и потребителями в C ++ 20

Сообщение Anonymous »

Я пытаюсь создать асинхронный итератор над вектором Std :: с фиксированным размером распределения (известный перед любым поток, который начинает выполнять). Цель состоит в том, чтобы несколько производителей увеличивали доступные данные внутри вектора, в то время как несколько потребителей собирают полученные данные. В частности, я обеспокоен тем, правильно ли я использую механизмы заказа памяти, чтобы обеспечить правильную синхронизацию между доступом к атомным и неатомным данным. Любое руководство или улучшения будут очень оценены. Кроме того, я добавил объяснения для ясности. Код состоит из следующих компонентов: < /p>

[*] async_iterator :
  • Управляет атомным итератором (_index) над предварительно выделенным контейнером (например, непустым вектором или массивом).
  • Отслеживает размер данных, заполненных производителями (_filled_size).
  • предоставляет функцию next () для увеличения _index , ожидая при необходимости на основе _filled_size .
[*] async_vector (получено из async_iterator ):

warps a std :: vector с помощью Async_iterator .
[*] Предоставляет два метода для добавления элементов:


.emplace_back_by_unique_thread(): функция, безопасная для потока, предназначенную для того, чтобы называть одним потоком продюсера (я почти уверен, что эта функция безопасна. Ну, я надеюсь, что: d) .emplace_back(): предназначен для нескольких производителей, , но я не уверен, является ли моя реализация-защитника. />
реализует настройку продюсера-потребителя, где каждый продюсер вызывает EMPLACE_BACK_BY_UNIQUE_THREAD () в своем выделенном Async_Vector .
ancync_vector экземпляры сопоставит число производителей. /> Пример 2: example_multi_camera_producer_one_shared_buffer


реализует настройку производителя-consumer, где несколько производителей делятся emync_vector . общий буфер.main()

запускает оба примера. emplace_back () .
Любое понимание или рекомендации будут высоко оценены.#include
#include
#include
#include
#include
#include
#include
#include
#include

template
struct async_iterator
{
async_iterator() = default;
async_iterator(T capacity) : _capacity(capacity) {}
async_iterator(const async_iterator&) = delete;
async_iterator& operator=(const async_iterator&) = delete;
async_iterator(async_iterator&&) = delete;
async_iterator& operator=(async_iterator&&) = delete;

using value_type = T;
using atomic_value_type = std::atomic;
using optional_value_type = std::optional;

static constexpr value_type ABORT_FLAG = std::numeric_limits::max();

value_type _capacity{0}; // Maximum container size
atomic_value_type _filled_size{0}; // Current container size (dynamic in function of the producer)
atomic_value_type _index{0}; // Current iterator index (used by the consumer)

optional_value_type next()
{
auto _next_index = _index.fetch_add(1, std::memory_order_relaxed);
if (_next_index >= _capacity)
return std::nullopt; // End of iteration

while (true) {
auto _current_size = _filled_size.load(std::memory_order_acquire);

if (_current_size == ABORT_FLAG) return std::nullopt; // Abort check
if (_next_index < _current_size) return _next_index; // The element is available

// Wait for the producer to fill the container
_filled_size.wait(_current_size, std::memory_order_acquire);
}
}

void notify_all() { _filled_size.notify_all(); }

void abort()
{
_filled_size.store(ABORT_FLAG, std::memory_order_release);
_index.store(_capacity, std::memory_order_release);
_filled_size.notify_all();
}

// Check if the iterator is still valid
operator bool() const { return _index.load(std::memory_order_relaxed) < _capacity; }

auto unprocessed_latency() const { return _filled_size.load(std::memory_order_relaxed) - _index.load(std::memory_order_relaxed); }
auto filled_size() const { return _filled_size.load(std::memory_order_relaxed); }
auto remaining_capacity() const { return _capacity - _filled_size.load(std::memory_order_relaxed); }
auto is_aborted() const { return _filled_size.load(std::memory_order_relaxed) == ABORT_FLAG; }
auto is_finished() const { return !(_index.load(std::memory_order_relaxed) < _capacity); }
};

template
struct async_vector : public async_iterator
{
std::vector _data;

async_vector() = default;
async_vector(auto && ... args) : _data(std::forward(args)...) {
_capacity = _data.size();
_filled_size.store(_capacity, std::memory_order_release);
}

async_vector(size_t fixed_size) : _data(fixed_size) {
_capacity = fixed_size;
}

void resize(size_t size)
{
assert((_filled_size == ABORT_FLAG) || (_filled_size >= _capacity && _index >= _capacity) && "Threads are still running on the old vector");
_filled_size.store(0, std::memory_order_release);
_index.store(0, std::memory_order_release);
_data.resize(size);
_data.shrink_to_fit();
_capacity = size;
}

auto capacity() const { assert(_data.size() == _capacity); return _data.size(); }

#ifdef _DEBUG
std::atomic _thread_id;

bool is_same_thread() {
if (_thread_id == std::thread::id{}) {
_thread_id = std::this_thread::get_id();
return true;
}
return (_thread_id == std::this_thread::get_id());
}
#endif

// This function is only safe to call from the same thread to fill the vector
bool emplace_back_by_unique_thread(auto&& elem)
{
#ifdef _DEBUG
assert(is_same_thread() && "emplace_back_by_unique_thread must be called from the same thread");
#endif

auto next_index = _filled_size.load(std::memory_order_relaxed);
if (next_index >= _data.size()) return false; // On abort or complete filled vector

_data[next_index] = std::forward(elem);
_filled_size.fetch_add(1, std::memory_order_release);
_filled_size.notify_all();
return true;
}

// Can be called from multiple threads
bool pop_front(T& elem)
{
if (auto next_index_opt = next())
{
elem = _data[*next_index_opt];
return true;
}
return false; // Abort or empty queue
}

// Well, i am not sure if this is thread safe.
// It is safe to call this function from multiple threads ?
bool emplace_back(auto&& elem)
{
auto next_index = _filled_size.fetch_add(1, std::memory_order_acquire);

// If the vector is full or aborted, revert the increment and return false
if (next_index >= _data.size()) {
_filled_size.fetch_sub(1, std::memory_order_release);
return false;
}

_data[next_index] = std::forward(elem);
std::atomic_thread_fence(std::memory_order_release); // Ensure the data is written before the size is incremented
_filled_size.notify_all(); // Notify all consumers
return true;
}
};

// Variables used inside examples
//--------------------------------

static constexpr int camera_count = 4; // Number of cameras to simulate as producers
static constexpr int frame_count = 25; // Number of frames to store per camera
static constexpr int camera_ti = 50; // camera interval in ms, i.e. camera integration time

static constexpr int consumer_by_camera = 3; // Number of consumers per camera (the consumer processing time is 3 times the camera integration time)
static constexpr int consumer_count = camera_count * consumer_by_camera; // Number of consumers to simulate as executors
static constexpr int consumer_processing_time = camera_ti * consumer_by_camera; // Consumer processing time in ms (3 times the camera integration time)

using mat = std::string; // For example to avoid OpenCV dependency
using async_buffer = async_vector;

// Example 1
//--------------------------------
// The number of producers (cameras) is the same as the number of buffers (async_vector).

void example_one_camera_producer_per_buffer()
{
std::vector vFrameBuffer(camera_count); // Async vector of buffers, one per camera
for(auto & buffer : vFrameBuffer) buffer.resize(frame_count); // Resize each buffer with the frame count

// Producer function to simulate camera frame acquisition and store them in the async vector buffer
auto producer = [&](int buffer_id, int camera_id)
{
for(int i = 0; i < frame_count; ++i)
{
std::this_thread::sleep_for(std::chrono::milliseconds(camera_ti));
auto frame = std::format("Hello from camera {} - Frame {}", camera_id, i);
if (!vFrameBuffer[buffer_id].emplace_back_by_unique_thread(std::move(frame))) // Always the same thread for the same camera
break; // Queue is full, or abort the producer
}
};

std::atomic_int processing_count = 0; // To count the number of frames processed by the consumers

// Consumer function to simulate frame processing
auto consumer = [&](int consumer_id, int buffer_id)
{
auto & async_buffer = vFrameBuffer[buffer_id];
mat frame;
while(async_buffer.pop_front(frame)) {
++processing_count;
std::this_thread::sleep_for(std::chrono::milliseconds(consumer_processing_time));
std::osyncstream(std::cout)

Подробнее здесь: https://stackoverflow.com/questions/794 ... cers-and-c
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «C++»