Разве `cnt` все еще не избегает противоречивого статуса для этой реализации с несколькими производителями и одним потребC++

Программы на C++. Форум разработчиков
Ответить
Anonymous
 Разве `cnt` все еще не избегает противоречивого статуса для этой реализации с несколькими производителями и одним потреб

Сообщение Anonymous »

Глядя на эту реализацию системы с несколькими производителями и одним потребителем, поставляемый код написан на Rust; однако его модель порядка памяти заимствована из C++. Таким образом, формальные рассуждения должны быть основаны на стандарте C++ (их невозможно переписать на C++ для обсуждения). Фрагменты кодов Recv и send:

Код: Выделить всё

pub fn send(&self, t: T) -> Result {
// See Port::drop for what's going on
if self.port_dropped.load(Ordering::SeqCst) {
return Err(t);
}

if self.cnt.load(Ordering::SeqCst) < DISCONNECTED + FUDGE {
return Err(t);
}

self.queue.push(t);
match self.cnt.fetch_add(1, Ordering::SeqCst) {
-1 => {
self.take_to_wake().signal();
}

// Can't make any assumptions about this case like in the SPSC case.
_ => {}
}

Ok(())
}

pub fn recv(&self, deadline: Option) -> Result {
match self.try_recv() {
Err(Empty) => {}
data => return data,
}

let (wait_token, signal_token) = blocking::tokens();
if self.decrement(signal_token) == Installed {
if let Some(deadline) = deadline {
let timed_out = !wait_token.wait_max_until(deadline);
if timed_out {
self.abort_selection(false);
}
} else {
wait_token.wait();
}
}

match self.try_recv() {
data @ Ok(..) => unsafe {
*self.steals.get() -= 1;
data
},
data => data,
}
}
fn decrement(&self, token: SignalToken) -> StartResult {
unsafe {
assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
let ptr = token.cast_to_usize();
self.to_wake.store(ptr, Ordering::SeqCst);

let steals = ptr::replace(self.steals.get(), 0);

match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) {
DISCONNECTED => {
self.cnt.store(DISCONNECTED, Ordering::SeqCst);
}
// If we factor in our steals and notice that the channel has no
// data, we successfully sleep
n => {
assert!(n >= 0);
if n - steals  Result {
let ret = match self.queue.pop() {
mpsc::Data(t) => Some(t),
mpsc::Empty => None,
mpsc::Inconsistent => {
let data;
loop {
thread::yield_now();
match self.queue.pop() {
mpsc::Data(t) => {
data = t;
break;
}
mpsc::Empty => panic!("inconsistent => empty"),
mpsc::Inconsistent => {}
}
}
Some(data)
}
};
match ret {
// See the discussion in the stream implementation for why we
// might decrement steals.
Some(data) => unsafe {
if *self.steals.get() > MAX_STEALS {
match self.cnt.swap(0, Ordering::SeqCst) {
DISCONNECTED => {
self.cnt.store(DISCONNECTED, Ordering::SeqCst);
}
n => {
let m = cmp::min(n, *self.steals.get());
*self.steals.get() -= m;
self.bump(n - m);
}
}
assert!(*self.steals.get() >= 0);
}
*self.steals.get() += 1;
Ok(data)
},

// See the discussion in the stream implementation for why we try
// again.
None => {
match self.cnt.load(Ordering::SeqCst) {
n if n != DISCONNECTED => Err(Empty),
_ =>  {
match self.queue.pop() {
mpsc::Data(t) => Ok(t),
mpsc::Empty => Err(Disconnected),
// with no senders, an inconsistency is impossible.
mpsc::Inconsistent => unreachable!(),
}
}
}
}
}
}

Код: Выделить всё

pub use self::PopResult::*;

use core::cell::UnsafeCell;
use core::ptr;

use crate::boxed::Box;
use crate::sync::atomic::{AtomicPtr, Ordering};

/// A result of the `pop` function.
pub enum PopResult {
/// Some data has been popped
Data(T),
/// The queue is empty
Empty,
/// The queue is in an inconsistent state. Popping data should succeed, but
/// some pushers have yet to make enough progress in order allow a pop to
/// succeed. It is recommended that a pop() occur "in the near future" in
/// order to see if the sender has made progress or not
Inconsistent,
}

struct Node {
next: AtomicPtr,
value: Option,
}

/// The multi-producer single-consumer structure. This is not cloneable, but it
/// may be safely shared so long as it is guaranteed that there is only one
/// popper at a time (many pushers are allowed).
pub struct Queue {
head: AtomicPtr,
tail: UnsafeCell,
}

unsafe impl Send for Queue {}
unsafe impl Sync for Queue {}

impl Node {
unsafe fn new(v: Option) -> *mut Node {
Box::into_raw(box Node { next: AtomicPtr::new(ptr::null_mut()), value: v })
}
}

impl Queue {
/// Creates a new queue that is safe to share among multiple producers and
/// one consumer.
pub fn new() -> Queue {
let stub = unsafe { Node::new(None) };
Queue { head: AtomicPtr::new(stub), tail: UnsafeCell::new(stub) }
}

/// Pushes a new value onto this queue.
pub fn push(&self, t: T) {
unsafe {
let n = Node::new(Some(t));  // #1
let prev = self.head.swap(n, Ordering::AcqRel);  // #2
(*prev).next.store(n, Ordering::Release);  // #3
}
}

/// Pops some data from this queue.
///
/// Note that the current implementation means that this function cannot
/// return `Option`. It is possible for this queue to be in an
/// inconsistent state where many pushes have succeeded and completely
/// finished, but pops cannot return `Some(t)`. This inconsistent state
/// happens when a pusher is pre-empted at an inopportune moment.
///
/// This inconsistent state means that this queue does indeed have data, but
/// it does not currently have access to it at this time.
pub fn pop(&self) -> PopResult {
unsafe {
let tail = *self.tail.get();
let next = (*tail).next.load(Ordering::Acquire);  // #4

if !next.is_null() { // #5
*self.tail.get() = next;
assert!((*tail).value.is_none());
assert!((*next).value.is_some());
let ret = (*next).value.take().unwrap();
let _: Box = Box::from_raw(tail);
return Data(ret);
}

if self.head.load(Ordering::Acquire) == tail { Empty } else { Inconsistent }
}
}
}

impl Drop for Queue {
fn drop(&mut self) {
unsafe {
let mut cur = *self.tail.get();
while !cur.is_null() {
let next = (*cur).next.load(Ordering::Relaxed);
let _: Box = Box::from_raw(cur);
cur = next;
}
}
}
}

Полный код находится здесь, а реализация очереди — здесь
Предполагая, что данные отправляются тремя потоками. Согласно [intro.races] p14

Если побочный эффект X на атомарный объект M происходит до вычисления значения B для M, то оценка B берет свое значение из X или из побочного эффекта Y, который следует за X в порядке модификации M.

Если нет других дополнительных синхронизаций, поток pop не гарантирует, что увидит последующие узлы, установленные в push потоки; однако заголовок начального значения и его поле, следующее за ним со значением null, гарантированно будут видны.
Согласно [atomic.order] p10

Атомарные операции чтения-изменения-записи всегда должны считывать последнее значение (в порядке модификации), записанное перед записью, связанной с операцией чтения-изменения-записи.

Если одна из них RMW считывает начальное значение 0, другие операции RMW должны считывать более позднюю модификацию в порядке модификации, и никакие две операции RMW не могут считывать одну и ту же модификацию. Все операции над cnt выполняются RMW; это гарантирует, что загрузочная часть операции RMW всегда может двигаться вперед и не может перейти к другой (т. е. все операции сериализуются). При синхронизации, установленной сериализованными операциями RMW (т. е. cnt.fetch_xxx), при загрузке в next и head гарантированно будут видны более поздние значения в их порядках модификации.
Каждый обмен в push может установить синхронизацию с другим. Однако, поскольку fetch_add(1,...) является последовательным после вызова push, нет события "до", установленного синхронизацией в push для каждого fetch_add, следовательно, может быть случай, когда fetch_add, соответствующий первому узлу, отличному от head, может быть упорядочен позже в порядке его модификации. В частности, этот случай показан на следующем изображении:
Изображение

Первое всплывающее окно начинается с этой головы и head->next имеет значение null, cnt.fetch_sub(1) синхронизируется с M1, это может гарантировать только то, что node2->next = node3 произойдет до try_recv(), который располагается после cnt.fetch_sub(1). Если я не ошибся в своем изображении, head->next = node1 по-прежнему не упорядочен по head->next.load(...) в try_recv (т. е. в очереди.pop()), однако self.head.load гарантированно увидит node3 из-за синхронизации, установленной операциями RMW на cnt. Таким образом, try_recv должен войти в ветку InConsistent, чтобы запустить цикл ожидания обновления, хотя это по-прежнему не может гарантировать появление нового значения, как указано в [atomics.order] p12.

Реализация должна сделать атомарные хранилища видимыми для атомарных загрузок, а атомарные загрузки должны наблюдать за атомарными хранилищами в течение разумного периода времени.

То есть цикл технически может быть бесконечным (хотя в реальной реализации это невозможно).
Правильны ли мои рассуждения?

Подробнее здесь: https://stackoverflow.com/questions/798 ... e-producer
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «C++»