Выведенный из очереди элемент существует, но std::counting_semaphore::try_acquire() завершается сбоем в очереди MPSC с оC++

Программы на C++. Форум разработчиков
Ответить Пред. темаСлед. тема
Anonymous
 Выведенный из очереди элемент существует, но std::counting_semaphore::try_acquire() завершается сбоем в очереди MPSC с о

Сообщение Anonymous »

У меня есть незаблокированная очередь с одним потребителем и несколькими производителями (MPSCQueue) в сочетании с std::counting_semaphore для уведомления потребителя о появлении новых элементов в очереди. Потребитель использует dequeue() для попытки получить элемент; если это удастся, он вызывает sema.try_acquire(), который, как я ожидаю, будет успешным каждый раз, когда в очереди действительно есть элемент. Однако в очень редких случаях я получаю ошибку утверждения, указывающую, что try_acquire() вернул ложное значение, даже если очередь вернула действительный элемент.
Я могу подтвердить, что действительно существует только один потребительский поток. Нет других мест, вызывающих dequeue(). В коде используется std::memory_order_seq_cst для атомарных операций в очереди и sema.release()/sema.acquire() для семафора.
Почему это может произойти?
Это похоже на проблему с упорядочением памяти или видимостью, когда потребитель видит вновь связанный узел в очереди до того, как он увидит соответствующую операцию Release() на семафоре. Но я думал, что seq_cst плюс освобождение/получение семафора будет достаточно, чтобы гарантировать правильный порядок. Любые идеи или разъяснения о том, как гарантировать, что мы никогда не увидим действительный элемент в очереди, но все равно не сможем выполнить try_acquire()?

Код: Выделить всё

#include 
#include 
#include 
#include 
#include 

template 
class MPSCQueue {
struct Node {
T data;
std::atomic next;

// Default constructor for the dummy node
Node() : next(nullptr) {}
// Constructor that moves the data in
Node(T data_) : data(std::move(data_)), next(nullptr) {}
};

// Atomic head pointer for multiple producers
std::atomic head;
// Tail pointer for the single consumer
Node* tail;

public:
std::atomic_size_t enqueue_count = 0;
size_t dequeue_count = 0;

MPSCQueue() {
Node* dummy = new Node();
head.store(dummy, std::memory_order_seq_cst);
tail = dummy;
}

~MPSCQueue() {
Node* node = tail;
while (node) {
Node* next = node->next.load(std::memory_order_seq_cst);
delete node;
node = next;
}
}

// Called by producers
void enqueue(T data) {
enqueue_count.fetch_add(1);
Node* node = new Node(std::move(data));
// Swap in the new node as the head
Node* prev_head = head.exchange(node, std::memory_order_seq_cst);
// Link the old head to the new node
prev_head->next.store(node, std::memory_order_seq_cst);
}

// Called by the single consumer
std::optional dequeue() {
// Check the next pointer of the tail
Node* next = tail->next.load(std::memory_order_seq_cst);
if (next) {
// Move the data out
T res = std::move(next->data);
delete tail;
tail = next;
dequeue_count += 1;
return res;
}
return std::nullopt;
}

size_t size() { return enqueue_count.load() - dequeue_count; }
};

template 
class MPSCQueueConsumerLock {
MPSCQueue queue;
std::counting_semaphore sema{0};

public:
void enqueue(T data) {
queue.enqueue(std::move(data));
// Release the semaphore to notify the consumer
sema.release();
}

// Single consumer calls this
T dequeue() {
auto re = queue.dequeue();
if (re.has_value()) {
// We have an item, so we expect the semaphore count to be > 0
if (!sema.try_acquire()) {
// Unexpectedly fails in rare cases
std::cerr 

Подробнее здесь: [url]https://stackoverflow.com/questions/79309192/dequeued-item-exists-but-stdcounting-semaphoretry-acquire-fails-in-single[/url]
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «C++»