Код: Выделить всё
pub fn send(&self, t: T) -> Result {
// See Port::drop for what's going on
if self.port_dropped.load(Ordering::SeqCst) {
return Err(t);
}
if self.cnt.load(Ordering::SeqCst) < DISCONNECTED + FUDGE {
return Err(t);
}
self.queue.push(t);
match self.cnt.fetch_add(1, Ordering::SeqCst) {
-1 => {
self.take_to_wake().signal();
}
// Can't make any assumptions about this case like in the SPSC case.
_ => {}
}
Ok(())
}
pub fn recv(&self, deadline: Option) -> Result {
match self.try_recv() {
Err(Empty) => {}
data => return data,
}
let (wait_token, signal_token) = blocking::tokens();
if self.decrement(signal_token) == Installed {
if let Some(deadline) = deadline {
let timed_out = !wait_token.wait_max_until(deadline);
if timed_out {
self.abort_selection(false);
}
} else {
wait_token.wait();
}
}
match self.try_recv() {
data @ Ok(..) => unsafe {
*self.steals.get() -= 1;
data
},
data => data,
}
}
fn decrement(&self, token: SignalToken) -> StartResult {
unsafe {
assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
let ptr = token.cast_to_usize();
self.to_wake.store(ptr, Ordering::SeqCst);
let steals = ptr::replace(self.steals.get(), 0);
match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) {
DISCONNECTED => {
self.cnt.store(DISCONNECTED, Ordering::SeqCst);
}
// If we factor in our steals and notice that the channel has no
// data, we successfully sleep
n => {
assert!(n >= 0);
if n - steals Result {
let ret = match self.queue.pop() {
mpsc::Data(t) => Some(t),
mpsc::Empty => None,
mpsc::Inconsistent => {
let data;
loop {
thread::yield_now();
match self.queue.pop() {
mpsc::Data(t) => {
data = t;
break;
}
mpsc::Empty => panic!("inconsistent => empty"),
mpsc::Inconsistent => {}
}
}
Some(data)
}
};
match ret {
// See the discussion in the stream implementation for why we
// might decrement steals.
Some(data) => unsafe {
if *self.steals.get() > MAX_STEALS {
match self.cnt.swap(0, Ordering::SeqCst) {
DISCONNECTED => {
self.cnt.store(DISCONNECTED, Ordering::SeqCst);
}
n => {
let m = cmp::min(n, *self.steals.get());
*self.steals.get() -= m;
self.bump(n - m);
}
}
assert!(*self.steals.get() >= 0);
}
*self.steals.get() += 1;
Ok(data)
},
// See the discussion in the stream implementation for why we try
// again.
None => {
match self.cnt.load(Ordering::SeqCst) {
n if n != DISCONNECTED => Err(Empty),
_ => {
match self.queue.pop() {
mpsc::Data(t) => Ok(t),
mpsc::Empty => Err(Disconnected),
// with no senders, an inconsistency is impossible.
mpsc::Inconsistent => unreachable!(),
}
}
}
}
}
}
Предполагая, что данные отправляются тремя потоками. Согласно [intro.races] p14
Если побочный эффект X на атомарный объект M происходит до вычисления значения B для M, то оценка B берет свое значение из X или из побочного эффекта Y, который следует за X в порядке модификации M.
Если нет других дополнительных синхронизаций, поток pop не гарантирует, что увидит последующие узлы, установленные в push потоки; однако начальное значение head и его поле next должны быть нулевыми гарантированно будут видимыми. При синхронизации, установленной сериализованными операциями RMW (т. е. cnt.fetch_xxx), при загрузке в next и head гарантированно будут видны более поздние значения в их порядках модификации.
Каждый обмен в push может установить синхронизацию с другим. Однако, поскольку fetch_add(1,...) является последовательным после вызова push, нет события "до", установленного синхронизацией в push для каждого fetch_add, следовательно, может быть случай, когда fetch_add, соответствующий первому узлу, отличному от head, может быть упорядочен позже в порядке его модификации. В частности, этот случай показан на следующем изображении:

Первое всплывающее окно начинается с этой головы и head->next имеет значение null, cnt.fetch_sub(1) синхронизируется с M1, это может гарантировать только то, что node2->next = node3 произойдет до try_recv(), который располагается после cnt.fetch_sub(1). Если я не ошибся в своем изображении, head->next = node1 по-прежнему не упорядочен по head->next.load(...) в try_recv (т. е. в очереди.pop()), однако self.head.load гарантированно увидит node3 из-за синхронизации, установленной операциями RMW на cnt. Таким образом, try_recv должен войти в ветку InConsistent, чтобы запустить цикл ожидания обновления, хотя это по-прежнему не может гарантировать появление нового значения, как указано в [atomics.order] p12.
Реализация должна сделать атомарные хранилища видимыми для атомарных загрузок, а атомарные загрузки должны наблюдать за атомарными хранилищами в течение разумного периода времени.
То есть цикл технически может быть бесконечным (хотя в реальной реализации это невозможно).
Правильны ли мои рассуждения?
Подробнее здесь: https://stackoverflow.com/questions/798 ... e-producer
Мобильная версия