Идея похожа на отправку сообщений из -за пределов хаба с SignalR, объясняется здесь. Цель состоит в том, чтобы несколько клиентов подписывались на определенное уведомление с идентификатором (GUID). Затем я хочу отправить сообщение из другого метода клиентам, которые слушают конкретный идентификатор. Для этого я использую потоковую передачу сервера. Я сделал класс уведомления с методом асинхронно ждать новых обновлений и еще один метод, чтобы продвигать изменения. У меня есть Liveservice.cs, который ждет новых предметов и отправляет их, как только он что -то получит. Похоже: < /p>
public class LiveService(ILiveNotifier liveNotifier) : LiveServiceBase
{
private readonly ILiveNotifier _liveNotifier = liveNotifier;
public override async Task StreamLiveUpdates(
StreamLiveUpdatesRequest request,
IServerStreamWriter responseStream,
ServerCallContext context
)
{
Guid id = GuidMapper.ToGuid(request.Id);
var cancellationToken = context.CancellationToken;
while (!cancellationToken.IsCancellationRequested)
{
var item = await _liveNotifier.WaitForNewItem(id, cancellationToken);
await responseStream.WriteAsync(item);
}
}
}
< /code>
Другой метод может «уведомить» Livenotifier с новым элементом. < /p>
_liveNotifier.NotifyNewItemAvailable(someId, "message to send")
< /code>
Livenotifier выглядит так: < /p>
public class LiveNotifier : ILiveNotifier
{
public ConcurrentDictionary Waiters { get; set; } = [];
public void NotifyNewItemAvailable(Guid statusId, string value)
{
ConcurrentDictionary old = [];
foreach (var waiter in Waiters)
if (waiter.Key.Item2 == statusId)
waiter.Value.TrySetResult(value);
else
old.TryAdd(waiter.Key, waiter.Value);
Waiters = old;
}
public void EndLive(Guid statusId)
{
ConcurrentDictionary old = [];
foreach (var waiter in Waiters)
if (waiter.Key.Item2 == statusId) {
waiter.Value.TrySetCanceled();
}
else
old.TryAdd(waiter.Key, waiter.Value);
Waiters = old;
}
public Task WaitForNewItem(Guid statusId, CancellationToken token)
{
TaskCompletionSource newTask = new(TaskCreationOptions.RunContinuationsAsynchronously);
Waiters.AddOrUpdate((token, statusId), newTask, (_, _) => newTask);
token.Register(() =>
{
newTask.TrySetCanceled(token);
Waiters.TryRemove((token, statusId), out newTask!);
});
return newTask.Task;
}
}
< /code>
Это работает нормально при уведомлении новых элементов. Завершение потока, однако, не работает, когда есть более 1 клиента, в этом случае ни один клиент не получает уведомление. Когда есть только 1 клиент, этот клиент закрывает соединение. Есть ли лучшее решение или какие -либо отзывы об этой реализации? Итак, несколько клиентов получают сообщение.public class LiveNotifier : ILiveNotifier
{
private readonly ConcurrentDictionary _subscriptions = new();
public Task Subscribe(Guid liveStreamId, IServerStreamWriter responseStream, CancellationToken token)
{
TaskCompletionSource tcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
var newVal = (responseStream, tcs);
_subscriptions.AddOrUpdate(liveStreamId, [newVal], (key, oldValue) => oldValue.Add(newVal));
token.Register(
() =>
{
tcs.TrySetCanceled();
_subscriptions.AddOrUpdate(liveStreamId, [], (id, oldValues) => oldValues.Remove(newVal));
},
false
);
return tcs.Task;
}
public void Broadcast(Guid liveStatusId, string message)
{
_subscriptions.TryGetValue(
liveStatusId,
out ImmutableList? values
);
if (values != null)
{
foreach (var req in values)
req.Item1.WriteAsync(message);
}
}
public void EndLive(Guid liveStatusId)
{
_subscriptions.TryGetValue(
liveStatusId,
out ImmutableList? values
);
if (values != null)
{
foreach (var req in values)
req.Item2.TrySetResult();
_subscriptions.TryRemove(liveStatusId, out _);
}
}
}
< /code>
и служба: < /p>
Guid id = GuidMapper.ToGuid(request.Id);
var cancellationToken = context.CancellationToken;
await _liveNotifier.Subscribe(id, responseStream, cancellationToken);
Подробнее здесь: https://stackoverflow.com/questions/795 ... using-grpc
Как отправить уведомления из другого метода на несколько клиентов с использованием потоковой передачи GRPC в ASP.NET? ⇐ C#
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение