Я могу подтвердить, что действительно существует только один потребительский поток. Нет других мест, вызывающих dequeue(). В коде используется std::memory_order_seq_cst для атомарных операций в очереди и sema.release()/sema.acquire() для семафора.
Почему это может произойти?
Это похоже на проблему с упорядочением памяти или видимостью, когда потребитель видит вновь связанный узел в очереди до того, как он увидит соответствующую операцию Release() на семафоре. Но я думал, что seq_cst плюс освобождение/получение семафора будет достаточно, чтобы гарантировать правильный порядок. Любые идеи или разъяснения о том, как гарантировать, что мы никогда не увидим действительный элемент в очереди, но все равно не сможем выполнить try_acquire()?
Код: Выделить всё
#include
#include
#include
#include
#include
template
class MPSCQueue {
struct Node {
T data;
std::atomic next;
// Default constructor for the dummy node
Node() : next(nullptr) {}
// Constructor that moves the data in
Node(T data_) : data(std::move(data_)), next(nullptr) {}
};
// Atomic head pointer for multiple producers
std::atomic head;
// Tail pointer for the single consumer
Node* tail;
public:
std::atomic_size_t enqueue_count = 0;
size_t dequeue_count = 0;
MPSCQueue() {
Node* dummy = new Node();
head.store(dummy, std::memory_order_seq_cst);
tail = dummy;
}
~MPSCQueue() {
Node* node = tail;
while (node) {
Node* next = node->next.load(std::memory_order_seq_cst);
delete node;
node = next;
}
}
// Called by producers
void enqueue(T data) {
enqueue_count.fetch_add(1);
Node* node = new Node(std::move(data));
// Swap in the new node as the head
Node* prev_head = head.exchange(node, std::memory_order_seq_cst);
// Link the old head to the new node
prev_head->next.store(node, std::memory_order_seq_cst);
}
// Called by the single consumer
std::optional dequeue() {
// Check the next pointer of the tail
Node* next = tail->next.load(std::memory_order_seq_cst);
if (next) {
// Move the data out
T res = std::move(next->data);
delete tail;
tail = next;
dequeue_count += 1;
return res;
}
return std::nullopt;
}
size_t size() { return enqueue_count.load() - dequeue_count; }
};
template
class MPSCQueueConsumerLock {
MPSCQueue queue;
std::counting_semaphore sema{0};
public:
void enqueue(T data) {
queue.enqueue(std::move(data));
// Release the semaphore to notify the consumer
sema.release();
}
// Single consumer calls this
T dequeue() {
auto re = queue.dequeue();
if (re.has_value()) {
// We have an item, so we expect the semaphore count to be > 0
if (!sema.try_acquire()) {
// Unexpectedly fails in rare cases
std::cerr
Подробнее здесь: [url]https://stackoverflow.com/questions/79309192/dequeued-item-exists-but-stdcounting-semaphoretry-acquire-fails-in-single[/url]