Код: Выделить всё
#include
#include
#include // for std::move, std::exchange
#include // for std::min
#include
#include
#include
#include
// ==========================================
// 1. Constants & Configuration
// ==========================================
const intptr_t DISCONNECTED = std::numeric_limits::min();
const intptr_t FUDGE = 1024;
const intptr_t MAX_STEALS = 1 head.exchange(node, std::memory_order_acq_rel);
// Link previous node to new node
pre->next.store(node, std::memory_order_release);
}
// Consumer side
PopResult pop() {
auto t = this->tail;
auto next = t->next.load(std::memory_order_acquire);
// 1. Check if there is a next node (Data available)
if (next) {
this->tail = next;
// Retrieve data from the *next* node (the new stub)
assert(next->value.has_value());
auto ret = std::move(*next->value);
// Delete the old stub
delete t;
return {Status::Data, std::move(ret)};
}
// 2. Check consistency
if (this->head.load(std::memory_order_acquire) == t) {
return {Status::Empty, std::nullopt};
} else {
// Head has moved, but prev->next linkage isn't visible yet.
// This is a temporary inconsistent state.
return {Status::Inconsistent, std::nullopt};
}
}
// Destructor to clean up leaking nodes would be needed in production
};
// ==========================================
// 4. Packet (Shared Channel Logic)
// ==========================================
template
struct Packet {
Queue queue;
std::atomic cnt; // Total items count
std::atomic to_wake; // Token for waking up sleeper
long steals; // Local steal count (consumer only)
Packet() : cnt(0), to_wake(0), steals(0) {}
void send(T t) {
// Prevent sending if disconnected
if (this->cnt.load(std::memory_order_seq_cst) < DISCONNECTED + FUDGE) {
throw "Err: Disconnected";
}
this->queue.push(std::move(t));
// Atomic increment. If previous value was -1, it means receiver is sleeping.
if (this->cnt.fetch_add(1, std::memory_order_seq_cst) == -1) {
this->take_to_wake().signal();
}
}
std::optional recv() {
// Optimistic check
auto t = this->try_recv();
if (t.has_value()) return t;
auto [wait_token, signal_token] = blocking::tokens();
// Attempt to sleep
if (this->decrement(signal_token) == StartResult::Installed) {
wait_token.wait();
}
// Woke up (or aborted sleep), try again
t = this->try_recv();
// If we got data after waking up, decrement steals
// (logic: we paid the price of blocking, so reset the "debt")
if (t.has_value()) {
this->steals--;
}
return t;
}
// Logic to prepare for sleeping
StartResult decrement(SignalToken token) {
assert(this->to_wake.load(std::memory_order_seq_cst) == 0);
auto ptr = token.cast_to_usize();
this->to_wake.store(ptr, std::memory_order_seq_cst);
auto current_steals = std::exchange(this->steals, 0);
// Subtract 1 (for current wait) + any accumulated steals
auto prev = this->cnt.fetch_sub(1 + current_steals, std::memory_order_seq_cst);
if (prev == DISCONNECTED) {
this->cnt.store(DISCONNECTED, std::memory_order_seq_cst);
} else {
// If (prev - steals) queue.pop();
if (res.status == Queue::Data) {
// Maintenance: synchronize steals with global cnt occasionally
if (this->steals > MAX_STEALS) {
auto n = this->cnt.swap(0, std::memory_order_seq_cst);
if (n == DISCONNECTED) {
this->cnt.store(DISCONNECTED, std::memory_order_seq_cst);
} else {
auto m = std::min(n, this->steals);
this->steals -= m;
this->bump(n - m);
}
}
this->steals++;
return std::move(res.data);
}
if (res.status == Queue::Empty) {
return std::nullopt;
}
if (res.status == Queue::Inconsistent) {
std::this_thread::yield();
// Loop continues...
}
}
}
void bump(intptr_t amt) {
if (this->cnt.fetch_add(amt, std::memory_order_seq_cst) == DISCONNECTED) {
this->cnt.store(DISCONNECTED, std::memory_order_seq_cst);
}
}
SignalToken take_to_wake() {
auto ptr = this->to_wake.load(std::memory_order_seq_cst);
this->to_wake.store(0, std::memory_order_seq_cst);
return *SignalToken::cast_from_usize(ptr);
}
};
Предполагая, что данные отправляются тремя потоками. Согласно [intro.races] p14
Если побочный эффект X на атомарный объект M происходит до вычисления значения B для M, то оценка B берет свое значение из X или из побочного эффекта Y, который следует за X в порядке модификации M.
Если нет других дополнительных синхронизаций, поток pop не гарантирует, что увидит последующие узлы, установленные в нажимать потоки, потому что принцип «прежде чем ничего» не обеспечивает видимость; то есть увидеть эти изменения после первоначального побочного эффекта в порядке модификаций. Однако заголовок начального значения и его поле, следующее за ним со значением null, гарантированно будут видны.
Согласно [atomic.order] p10
Атомарные операции чтения-изменения-записи всегда должны считывать последнее значение (в порядке модификации), записанное перед записью, связанной с операцией чтения-изменения-записи.
Если одна из них RMW считывает начальное значение 0, другие операции RMW должны считывать более позднюю модификацию в порядке модификации, и никакие две операции RMW не могут считывать одну и ту же модификацию. Все операции над cnt являются операциями RMW; это гарантирует, что загрузочная часть операции RMW всегда может двигаться вперед и не может наступать на другую (т. е. все операции сериализуются и не могут перекрываться). При синхронизации, установленной сериализованными операциями RMW (т. е. cnt.fetch_xxx), при загрузке в next и head гарантированно будут отображаться более поздние значения в их порядках модификации, поскольку между побочными эффектами next или head и загрузками существуют отношения «до того, как произошло».
Каждый обмен в push может установить синхронизацию с другим. Однако, поскольку fetch_add(1,...) является упорядоченным после вызова push, не происходит событие-before, установленное синхронизацией в push для любых двух fetch_add, следовательно, нет никаких ограничений на эти fetch_add(1,...) для их порядка в порядке модификации; может быть случай, когда fetch_add, соответствующий первому узлу, отличному от head, может быть заказан позже в порядке его модификации. В частности, я попытался проиллюстрировать этот случай на следующем изображении:

Первое всплывающее окно начинается с этой головы и head->next имеет значение null, cnt.fetch_sub(1) синхронизируется с M1, это может гарантировать только то, что node2->next = node3 произойдет до try_recv(), который располагается после cnt.fetch_sub(1). Если я не ошибся в своем изображении, head->next = node1 по-прежнему не упорядочен по head->next.load(...) в try_recv (т. е. в очереди.pop()), однако self.head.load гарантированно увидит node3 из-за синхронизации, установленной операциями RMW на cnt. Таким образом, try_recv должен перейти в ветку Inconsistent, чтобы выполнить цикл и дождаться обновления, даже если это все еще не может гарантировать появление нового значения, как указано в [atomics].order] p12.
Реализация должна сделать атомарные хранилища видимыми для атомарных загрузок, а атомарные загрузки должны наблюдать за атомарными хранилищами в течение разумного периода времени.
То есть цикл технически может быть бесконечным (хотя в реальной реализации это невозможно).
Не только в этом случае, если мы поменяем местами порядок M1 и cnt.fetch_sub(1) на картинке, в этом случае мы все равно можем перейти в противоречивое состояние? Верны ли мои рассуждения?
Подробнее здесь: https://stackoverflow.com/questions/798 ... for-this-m
Мобильная версия