Оператор Rx, ожидающий завершения подпискиC#

Место общения программистов C#
Ответить
Anonymous
 Оператор Rx, ожидающий завершения подписки

Сообщение Anonymous »

Я хочу объединить два наблюдаемых объекта, скажем, SendMoves и Results, чтобы получить SendMovesAndGetResults.
Подписка на SendMoves в конечном итоге приводит к смещению значений результатов вниз. Поэтому подписку на результаты необходимо завершить до любой подписки на SendMoves, чтобы избежать состояния гонки.
В качестве демонстрации кода с состоянием гонки рассмотрим:

Код: Выделить всё

// See https://aka.ms/new-console-template for more information
using System;
using System.Diagnostics;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;

Test m = new Test();
m.RaceConditionTest();
Console.WriteLine("Hello, World!");

internal class Test
{
public void RaceConditionTest()
{
// by subscribing to the results before the moves are sent you
// might think you are guaranteed to get all of the results
// but subscribe is asynchronous with no obvious way of knowing when it has completed.
WhenResults([1, 2, 3, 4, 5, 6]).SubscribeOn(Scheduler.NewThread).Subscribe(r => System.Diagnostics.Debug.WriteLine($"Result {r}"));
SendMoves([1, 2, 3, 4, 5, 6]).SubscribeOn(Scheduler.NewThread).Subscribe();
}

private readonly Subject mMoveResults = new Subject();
private IObservable WhenResults(IEnumerable moveTokens) =>
moveTokens
.ToObservable()
.SelectMany(i => mMoveResults.Where(r => r == i).FirstAsync().Trace($"mMoveResult {i}"));

private IObservable SendMoves(List moves) =>
Observable.Create(o =>
{
foreach (var move in moves)
{
System.Diagnostics.Debug.WriteLine($"Sending move {move}");
mMoveResults.OnNext(move);
}

o.OnCompleted();
return Disposable.Empty;
});
}

public static class ObservableExtensions
{
/// 
/// Allows us to trace the observable pipeline.
/// Specifically, we can trace the subscribe, onNext, onError and onCompleted events for all observers.
/// 
/// 
/// 

/// 
/// 
/// 
public static IObservable Trace(this IObservable source, string name)
{
var id = 0;

return Observable.Create(observer =>
{
var id1 = ++id;
Action trace = (m, v) => Debug.WriteLine($"{name}-{id1}: {m}({v})");
trace("Subscribe", "");
var disposable = source.Subscribe(
v =>
{
trace("OnNext", v.ToString());
observer.OnNext(v);
},
e =>
{
trace("OnError", e.Message);
observer.OnError(e);
},
() =>
{
trace("OnCompleted", "");
observer.OnCompleted();
});

return () =>
{
trace("Dispose", "");
disposable.Dispose();
};
});
}
}
Вывод:

Код: Выделить всё

Sending move 1
'ConsoleApp1.exe' (CoreCLR: clrhost): Loaded 'C:\Program Files\dotnet\shared\Microsoft.NETCore.App\8.0.8\System.Text.Encoding.Extensions.dll'. Skipped loading symbols. Module is optimized and the debugger option 'Just My Code' is enabled.
Sending move 2
Sending move 3
Sending move 4
Sending move 5
Sending move 6
The thread '[Thread Destroyed]' (16108) has exited with code 0 (0x0).
mMoveResult 1-1: Subscribe()
mMoveResult 2-1: Subscribe()
mMoveResult 3-1: Subscribe()
mMoveResult 4-1: Subscribe()
mMoveResult 5-1: Subscribe()
mMoveResult 6-1: Subscribe()
Который показывает ходы, отправленные до того, как мы подписались на результаты. Интересно, что тот же результат наблюдается в приложении WPF без необходимости SubscribeOn. Есть ли стандартный способ справиться с этим? Как узнать, что подписка полностью настроена, прежде чем вызывать код, который начнет передавать значения.

Подробнее здесь: https://stackoverflow.com/questions/790 ... -to-finish
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «C#»