Код: Выделить всё
pub fn send(&self, t: T) -> Result {
// See Port::drop for what's going on
if self.port_dropped.load(Ordering::SeqCst) {
return Err(t);
}
if self.cnt.load(Ordering::SeqCst) < DISCONNECTED + FUDGE {
return Err(t);
}
self.queue.push(t);
match self.cnt.fetch_add(1, Ordering::SeqCst) {
-1 => {
self.take_to_wake().signal();
}
// Can't make any assumptions about this case like in the SPSC case.
_ => {}
}
Ok(())
}
pub fn recv(&self, deadline: Option) -> Result {
match self.try_recv() {
Err(Empty) => {}
data => return data,
}
let (wait_token, signal_token) = blocking::tokens();
if self.decrement(signal_token) == Installed {
if let Some(deadline) = deadline {
let timed_out = !wait_token.wait_max_until(deadline);
if timed_out {
self.abort_selection(false);
}
} else {
wait_token.wait();
}
}
match self.try_recv() {
data @ Ok(..) => unsafe {
*self.steals.get() -= 1;
data
},
data => data,
}
}
fn decrement(&self, token: SignalToken) -> StartResult {
unsafe {
assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
let ptr = token.cast_to_usize();
self.to_wake.store(ptr, Ordering::SeqCst);
let steals = ptr::replace(self.steals.get(), 0);
match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) {
DISCONNECTED => {
self.cnt.store(DISCONNECTED, Ordering::SeqCst);
}
// If we factor in our steals and notice that the channel has no
// data, we successfully sleep
n => {
assert!(n >= 0);
if n - steals Result {
let ret = match self.queue.pop() {
mpsc::Data(t) => Some(t),
mpsc::Empty => None,
mpsc::Inconsistent => {
let data;
loop {
thread::yield_now();
match self.queue.pop() {
mpsc::Data(t) => {
data = t;
break;
}
mpsc::Empty => panic!("inconsistent => empty"),
mpsc::Inconsistent => {}
}
}
Some(data)
}
};
match ret {
// See the discussion in the stream implementation for why we
// might decrement steals.
Some(data) => unsafe {
if *self.steals.get() > MAX_STEALS {
match self.cnt.swap(0, Ordering::SeqCst) {
DISCONNECTED => {
self.cnt.store(DISCONNECTED, Ordering::SeqCst);
}
n => {
let m = cmp::min(n, *self.steals.get());
*self.steals.get() -= m;
self.bump(n - m);
}
}
assert!(*self.steals.get() >= 0);
}
*self.steals.get() += 1;
Ok(data)
},
// See the discussion in the stream implementation for why we try
// again.
None => {
match self.cnt.load(Ordering::SeqCst) {
n if n != DISCONNECTED => Err(Empty),
_ => {
match self.queue.pop() {
mpsc::Data(t) => Ok(t),
mpsc::Empty => Err(Disconnected),
// with no senders, an inconsistency is impossible.
mpsc::Inconsistent => unreachable!(),
}
}
}
}
}
}
Код: Выделить всё
pub use self::PopResult::*;
use core::cell::UnsafeCell;
use core::ptr;
use crate::boxed::Box;
use crate::sync::atomic::{AtomicPtr, Ordering};
/// A result of the `pop` function.
pub enum PopResult {
/// Some data has been popped
Data(T),
/// The queue is empty
Empty,
/// The queue is in an inconsistent state. Popping data should succeed, but
/// some pushers have yet to make enough progress in order allow a pop to
/// succeed. It is recommended that a pop() occur "in the near future" in
/// order to see if the sender has made progress or not
Inconsistent,
}
struct Node {
next: AtomicPtr,
value: Option,
}
/// The multi-producer single-consumer structure. This is not cloneable, but it
/// may be safely shared so long as it is guaranteed that there is only one
/// popper at a time (many pushers are allowed).
pub struct Queue {
head: AtomicPtr,
tail: UnsafeCell,
}
unsafe impl Send for Queue {}
unsafe impl Sync for Queue {}
impl Node {
unsafe fn new(v: Option) -> *mut Node {
Box::into_raw(box Node { next: AtomicPtr::new(ptr::null_mut()), value: v })
}
}
impl Queue {
/// Creates a new queue that is safe to share among multiple producers and
/// one consumer.
pub fn new() -> Queue {
let stub = unsafe { Node::new(None) };
Queue { head: AtomicPtr::new(stub), tail: UnsafeCell::new(stub) }
}
/// Pushes a new value onto this queue.
pub fn push(&self, t: T) {
unsafe {
let n = Node::new(Some(t)); // #1
let prev = self.head.swap(n, Ordering::AcqRel); // #2
(*prev).next.store(n, Ordering::Release); // #3
}
}
/// Pops some data from this queue.
///
/// Note that the current implementation means that this function cannot
/// return `Option`. It is possible for this queue to be in an
/// inconsistent state where many pushes have succeeded and completely
/// finished, but pops cannot return `Some(t)`. This inconsistent state
/// happens when a pusher is pre-empted at an inopportune moment.
///
/// This inconsistent state means that this queue does indeed have data, but
/// it does not currently have access to it at this time.
pub fn pop(&self) -> PopResult {
unsafe {
let tail = *self.tail.get();
let next = (*tail).next.load(Ordering::Acquire); // #4
if !next.is_null() { // #5
*self.tail.get() = next;
assert!((*tail).value.is_none());
assert!((*next).value.is_some());
let ret = (*next).value.take().unwrap();
let _: Box = Box::from_raw(tail);
return Data(ret);
}
if self.head.load(Ordering::Acquire) == tail { Empty } else { Inconsistent }
}
}
}
impl Drop for Queue {
fn drop(&mut self) {
unsafe {
let mut cur = *self.tail.get();
while !cur.is_null() {
let next = (*cur).next.load(Ordering::Relaxed);
let _: Box = Box::from_raw(cur);
cur = next;
}
}
}
}
Предполагая, что данные отправляются тремя потоками. Согласно [intro.races] p14
Если побочный эффект X на атомарный объект M происходит до вычисления значения B для M, то оценка B берет свое значение из X или из побочного эффекта Y, который следует за X в порядке модификации M.
Если нет других дополнительных синхронизаций, поток pop не гарантирует, что увидит последующие узлы, установленные в push потоки; однако заголовок начального значения и его поле, следующее за ним со значением null, гарантированно будут видны.
Согласно [atomic.order] p10
Атомарные операции чтения-изменения-записи всегда должны считывать последнее значение (в порядке модификации), записанное перед записью, связанной с операцией чтения-изменения-записи.
Если одна из них RMW считывает начальное значение 0, другие операции RMW должны считывать более позднюю модификацию в порядке модификации, и никакие две операции RMW не могут считывать одну и ту же модификацию. Все операции над cnt выполняются RMW; это гарантирует, что загрузочная часть операции RMW всегда может двигаться вперед и не может перейти к другой (т. е. все операции сериализуются). При синхронизации, установленной сериализованными операциями RMW (т. е. cnt.fetch_xxx), при загрузке в next и head гарантированно будут видны более поздние значения в их порядках модификации.
Каждый обмен в push может установить синхронизацию с другим. Однако, поскольку fetch_add(1,...) является последовательным после вызова push, нет события "до", установленного синхронизацией в push для каждого fetch_add, следовательно, может быть случай, когда fetch_add, соответствующий первому узлу, отличному от head, может быть упорядочен позже в порядке его модификации. В частности, этот случай показан на следующем изображении:

Первое всплывающее окно начинается с этой головы и head->next имеет значение null, cnt.fetch_sub(1) синхронизируется с M1, это может гарантировать только то, что node2->next = node3 произойдет до try_recv(), который располагается после cnt.fetch_sub(1). Если я не ошибся в своем изображении, head->next = node1 по-прежнему не упорядочен по head->next.load(...) в try_recv (т. е. в очереди.pop()), однако self.head.load гарантированно увидит node3 из-за синхронизации, установленной операциями RMW на cnt. Таким образом, try_recv должен войти в ветку InConsistent, чтобы запустить цикл ожидания обновления, хотя это по-прежнему не может гарантировать появление нового значения, как указано в [atomics.order] p12.
Реализация должна сделать атомарные хранилища видимыми для атомарных загрузок, а атомарные загрузки должны наблюдать за атомарными хранилищами в течение разумного периода времени.
То есть цикл технически может быть бесконечным (хотя в реальной реализации это невозможно).
Правильны ли мои рассуждения?
Подробнее здесь: https://stackoverflow.com/questions/798 ... e-producer
Мобильная версия