Подписка на SendMoves в конечном итоге приводит к смещению значений результатов вниз. Поэтому подписку на результаты необходимо завершить до любой подписки на SendMoves, чтобы избежать состояния гонки.
В качестве демонстрации кода с состоянием гонки рассмотрим:
Код: Выделить всё
// See https://aka.ms/new-console-template for more information
using System;
using System.Diagnostics;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
Test m = new Test();
m.RaceConditionTest();
Console.WriteLine("Hello, World!");
internal class Test
{
public void RaceConditionTest()
{
// by subscribing to the results before the moves are sent you
// might think you are guaranteed to get all of the results
// but subscribe is asynchronous with no obvious way of knowing when it has completed.
WhenResults([1, 2, 3, 4, 5, 6]).SubscribeOn(Scheduler.NewThread).Subscribe(r => System.Diagnostics.Debug.WriteLine($"Result {r}"));
SendMoves([1, 2, 3, 4, 5, 6]).SubscribeOn(Scheduler.NewThread).Subscribe();
}
private readonly Subject mMoveResults = new Subject();
private IObservable WhenResults(IEnumerable moveTokens) =>
moveTokens
.ToObservable()
.SelectMany(i => mMoveResults.Where(r => r == i).FirstAsync().Trace($"mMoveResult {i}"));
private IObservable SendMoves(List moves) =>
Observable.Create(o =>
{
foreach (var move in moves)
{
System.Diagnostics.Debug.WriteLine($"Sending move {move}");
mMoveResults.OnNext(move);
}
o.OnCompleted();
return Disposable.Empty;
});
}
public static class ObservableExtensions
{
///
/// Allows us to trace the observable pipeline.
/// Specifically, we can trace the subscribe, onNext, onError and onCompleted events for all observers.
///
///
///
///
///
///
public static IObservable Trace(this IObservable source, string name)
{
var id = 0;
return Observable.Create(observer =>
{
var id1 = ++id;
Action trace = (m, v) => Debug.WriteLine($"{name}-{id1}: {m}({v})");
trace("Subscribe", "");
var disposable = source.Subscribe(
v =>
{
trace("OnNext", v.ToString());
observer.OnNext(v);
},
e =>
{
trace("OnError", e.Message);
observer.OnError(e);
},
() =>
{
trace("OnCompleted", "");
observer.OnCompleted();
});
return () =>
{
trace("Dispose", "");
disposable.Dispose();
};
});
}
}
Код: Выделить всё
Sending move 1
'ConsoleApp1.exe' (CoreCLR: clrhost): Loaded 'C:\Program Files\dotnet\shared\Microsoft.NETCore.App\8.0.8\System.Text.Encoding.Extensions.dll'. Skipped loading symbols. Module is optimized and the debugger option 'Just My Code' is enabled.
Sending move 2
Sending move 3
Sending move 4
Sending move 5
Sending move 6
The thread '[Thread Destroyed]' (16108) has exited with code 0 (0x0).
mMoveResult 1-1: Subscribe()
mMoveResult 2-1: Subscribe()
mMoveResult 3-1: Subscribe()
mMoveResult 4-1: Subscribe()
mMoveResult 5-1: Subscribe()
mMoveResult 6-1: Subscribe()
Подробнее здесь: https://stackoverflow.com/questions/790 ... -to-finish
Мобильная версия