Как отправить уведомления из другого метода на несколько клиентов с использованием потоковой передачи GRPC в ASP.NET?C#

Место общения программистов C#
Ответить Пред. темаСлед. тема
Anonymous
 Как отправить уведомления из другого метода на несколько клиентов с использованием потоковой передачи GRPC в ASP.NET?

Сообщение Anonymous »

Идея похожа на отправку сообщений из -за пределов хаба с SignalR, объясняется здесь. Цель состоит в том, чтобы несколько клиентов подписывались на определенное уведомление с идентификатором (GUID). Затем я хочу отправить сообщение из другого метода клиентам, которые слушают конкретный идентификатор. Для этого я использую потоковую передачу сервера. Я сделал класс уведомления с методом асинхронно ждать новых обновлений и еще один метод, чтобы продвигать изменения. У меня есть Liveservice.cs, который ждет новых предметов и отправляет их, как только он что -то получит. Похоже: < /p>
public class LiveService(ILiveNotifier liveNotifier) : LiveServiceBase
{
private readonly ILiveNotifier _liveNotifier = liveNotifier;

public override async Task StreamLiveUpdates(
StreamLiveUpdatesRequest request,
IServerStreamWriter responseStream,
ServerCallContext context
)
{
Guid id = GuidMapper.ToGuid(request.Id);
var cancellationToken = context.CancellationToken;

while (!cancellationToken.IsCancellationRequested)
{
var item = await _liveNotifier.WaitForNewItem(id, cancellationToken);
await responseStream.WriteAsync(item);
}
}
}
< /code>
Другой метод может «уведомить» Livenotifier с новым элементом. < /p>
_liveNotifier.NotifyNewItemAvailable(someId, "message to send")
< /code>
Livenotifier выглядит так: < /p>
public class LiveNotifier : ILiveNotifier
{
public ConcurrentDictionary Waiters { get; set; } = [];

public void NotifyNewItemAvailable(Guid statusId, string value)
{
ConcurrentDictionary old = [];
foreach (var waiter in Waiters)
if (waiter.Key.Item2 == statusId)
waiter.Value.TrySetResult(value);
else
old.TryAdd(waiter.Key, waiter.Value);

Waiters = old;
}

public void EndLive(Guid statusId)
{
ConcurrentDictionary old = [];
foreach (var waiter in Waiters)
if (waiter.Key.Item2 == statusId) {
waiter.Value.TrySetCanceled();
}
else
old.TryAdd(waiter.Key, waiter.Value);

Waiters = old;
}

public Task WaitForNewItem(Guid statusId, CancellationToken token)
{
TaskCompletionSource newTask = new(TaskCreationOptions.RunContinuationsAsynchronously);
Waiters.AddOrUpdate((token, statusId), newTask, (_, _) => newTask);

token.Register(() =>
{
newTask.TrySetCanceled(token);
Waiters.TryRemove((token, statusId), out newTask!);
});

return newTask.Task;
}
}
< /code>
Это работает нормально при уведомлении новых элементов. Завершение потока, однако, не работает, когда есть более 1 клиента, в этом случае ни один клиент не получает уведомление. Когда есть только 1 клиент, этот клиент закрывает соединение. Есть ли лучшее решение или какие -либо отзывы об этой реализации? Итак, несколько клиентов получают сообщение.public class LiveNotifier : ILiveNotifier
{
private readonly ConcurrentDictionary _subscriptions = new();

public Task Subscribe(Guid liveStreamId, IServerStreamWriter responseStream, CancellationToken token)
{
TaskCompletionSource tcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
var newVal = (responseStream, tcs);

_subscriptions.AddOrUpdate(liveStreamId, [newVal], (key, oldValue) => oldValue.Add(newVal));

token.Register(
() =>
{
tcs.TrySetCanceled();
_subscriptions.AddOrUpdate(liveStreamId, [], (id, oldValues) => oldValues.Remove(newVal));
},
false
);
return tcs.Task;
}

public void Broadcast(Guid liveStatusId, string message)
{
_subscriptions.TryGetValue(
liveStatusId,
out ImmutableList? values
);
if (values != null)
{
foreach (var req in values)
req.Item1.WriteAsync(message);
}
}

public void EndLive(Guid liveStatusId)
{
_subscriptions.TryGetValue(
liveStatusId,
out ImmutableList? values
);
if (values != null)
{
foreach (var req in values)
req.Item2.TrySetResult();
_subscriptions.TryRemove(liveStatusId, out _);
}
}
}
< /code>
и служба: < /p>
Guid id = GuidMapper.ToGuid(request.Id);
var cancellationToken = context.CancellationToken;

await _liveNotifier.Subscribe(id, responseStream, cancellationToken);


Подробнее здесь: https://stackoverflow.com/questions/795 ... using-grpc
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «C#»