[*] async_iterator :
- Управляет атомным итератором (_index) над предварительно выделенным контейнером (например, непустым вектором или массивом).
- Отслеживает размер данных, заполненных производителями (_filled_size).
- предоставляет функцию next () для увеличения _index , ожидая при необходимости на основе _filled_size .
warps a std :: vector с помощью Async_iterator .
[*] Предоставляет два метода для добавления элементов:
.emplace_back_by_unique_thread(): функция, безопасная для потока, предназначенную для того, чтобы называть одним потоком продюсера (я почти уверен, что эта функция безопасна. Ну, я надеюсь, что: d) .emplace_back(): предназначен для нескольких производителей, , но я не уверен, является ли моя реализация-защитника. />
реализует настройку продюсера-потребителя, где каждый продюсер вызывает EMPLACE_BACK_BY_UNIQUE_THREAD () в своем выделенном Async_Vector .
ancync_vector экземпляры сопоставит число производителей. /> Пример 2: example_multi_camera_producer_one_shared_buffer
реализует настройку производителя-consumer, где несколько производителей делятся emync_vector . общий буфер.main()
запускает оба примера. emplace_back () .
Любое понимание или рекомендации будут высоко оценены.#include
#include
#include
#include
#include
#include
#include
#include
#include
template
struct async_iterator
{
async_iterator() = default;
async_iterator(T capacity) : _capacity(capacity) {}
async_iterator(const async_iterator&) = delete;
async_iterator& operator=(const async_iterator&) = delete;
async_iterator(async_iterator&&) = delete;
async_iterator& operator=(async_iterator&&) = delete;
using value_type = T;
using atomic_value_type = std::atomic;
using optional_value_type = std::optional;
static constexpr value_type ABORT_FLAG = std::numeric_limits::max();
value_type _capacity{0}; // Maximum container size
atomic_value_type _filled_size{0}; // Current container size (dynamic in function of the producer)
atomic_value_type _index{0}; // Current iterator index (used by the consumer)
optional_value_type next()
{
auto _next_index = _index.fetch_add(1, std::memory_order_relaxed);
if (_next_index >= _capacity)
return std::nullopt; // End of iteration
while (true) {
auto _current_size = _filled_size.load(std::memory_order_acquire);
if (_current_size == ABORT_FLAG) return std::nullopt; // Abort check
if (_next_index < _current_size) return _next_index; // The element is available
// Wait for the producer to fill the container
_filled_size.wait(_current_size, std::memory_order_acquire);
}
}
void notify_all() { _filled_size.notify_all(); }
void abort()
{
_filled_size.store(ABORT_FLAG, std::memory_order_release);
_index.store(_capacity, std::memory_order_release);
_filled_size.notify_all();
}
// Check if the iterator is still valid
operator bool() const { return _index.load(std::memory_order_relaxed) < _capacity; }
auto unprocessed_latency() const { return _filled_size.load(std::memory_order_relaxed) - _index.load(std::memory_order_relaxed); }
auto filled_size() const { return _filled_size.load(std::memory_order_relaxed); }
auto remaining_capacity() const { return _capacity - _filled_size.load(std::memory_order_relaxed); }
auto is_aborted() const { return _filled_size.load(std::memory_order_relaxed) == ABORT_FLAG; }
auto is_finished() const { return !(_index.load(std::memory_order_relaxed) < _capacity); }
};
template
struct async_vector : public async_iterator
{
std::vector _data;
async_vector() = default;
async_vector(auto && ... args) : _data(std::forward(args)...) {
_capacity = _data.size();
_filled_size.store(_capacity, std::memory_order_release);
}
async_vector(size_t fixed_size) : _data(fixed_size) {
_capacity = fixed_size;
}
void resize(size_t size)
{
assert((_filled_size == ABORT_FLAG) || (_filled_size >= _capacity && _index >= _capacity) && "Threads are still running on the old vector");
_filled_size.store(0, std::memory_order_release);
_index.store(0, std::memory_order_release);
_data.resize(size);
_data.shrink_to_fit();
_capacity = size;
}
auto capacity() const { assert(_data.size() == _capacity); return _data.size(); }
#ifdef _DEBUG
std::atomic _thread_id;
bool is_same_thread() {
if (_thread_id == std::thread::id{}) {
_thread_id = std::this_thread::get_id();
return true;
}
return (_thread_id == std::this_thread::get_id());
}
#endif
// This function is only safe to call from the same thread to fill the vector
bool emplace_back_by_unique_thread(auto&& elem)
{
#ifdef _DEBUG
assert(is_same_thread() && "emplace_back_by_unique_thread must be called from the same thread");
#endif
auto next_index = _filled_size.load(std::memory_order_relaxed);
if (next_index >= _data.size()) return false; // On abort or complete filled vector
_data[next_index] = std::forward(elem);
_filled_size.fetch_add(1, std::memory_order_release);
_filled_size.notify_all();
return true;
}
// Can be called from multiple threads
bool pop_front(T& elem)
{
if (auto next_index_opt = next())
{
elem = _data[*next_index_opt];
return true;
}
return false; // Abort or empty queue
}
// Well, i am not sure if this is thread safe.
// It is safe to call this function from multiple threads ?
bool emplace_back(auto&& elem)
{
auto next_index = _filled_size.fetch_add(1, std::memory_order_acquire);
// If the vector is full or aborted, revert the increment and return false
if (next_index >= _data.size()) {
_filled_size.fetch_sub(1, std::memory_order_release);
return false;
}
_data[next_index] = std::forward(elem);
std::atomic_thread_fence(std::memory_order_release); // Ensure the data is written before the size is incremented
_filled_size.notify_all(); // Notify all consumers
return true;
}
};
// Variables used inside examples
//--------------------------------
static constexpr int camera_count = 4; // Number of cameras to simulate as producers
static constexpr int frame_count = 25; // Number of frames to store per camera
static constexpr int camera_ti = 50; // camera interval in ms, i.e. camera integration time
static constexpr int consumer_by_camera = 3; // Number of consumers per camera (the consumer processing time is 3 times the camera integration time)
static constexpr int consumer_count = camera_count * consumer_by_camera; // Number of consumers to simulate as executors
static constexpr int consumer_processing_time = camera_ti * consumer_by_camera; // Consumer processing time in ms (3 times the camera integration time)
using mat = std::string; // For example to avoid OpenCV dependency
using async_buffer = async_vector;
// Example 1
//--------------------------------
// The number of producers (cameras) is the same as the number of buffers (async_vector).
void example_one_camera_producer_per_buffer()
{
std::vector vFrameBuffer(camera_count); // Async vector of buffers, one per camera
for(auto & buffer : vFrameBuffer) buffer.resize(frame_count); // Resize each buffer with the frame count
// Producer function to simulate camera frame acquisition and store them in the async vector buffer
auto producer = [&](int buffer_id, int camera_id)
{
for(int i = 0; i < frame_count; ++i)
{
std::this_thread::sleep_for(std::chrono::milliseconds(camera_ti));
auto frame = std::format("Hello from camera {} - Frame {}", camera_id, i);
if (!vFrameBuffer[buffer_id].emplace_back_by_unique_thread(std::move(frame))) // Always the same thread for the same camera
break; // Queue is full, or abort the producer
}
};
std::atomic_int processing_count = 0; // To count the number of frames processed by the consumers
// Consumer function to simulate frame processing
auto consumer = [&](int consumer_id, int buffer_id)
{
auto & async_buffer = vFrameBuffer[buffer_id];
mat frame;
while(async_buffer.pop_front(frame)) {
++processing_count;
std::this_thread::sleep_for(std::chrono::milliseconds(consumer_processing_time));
std::osyncstream(std::cout)
Подробнее здесь: https://stackoverflow.com/questions/794 ... cers-and-c
Мобильная версия