Потреблять все сообщения в System.Threading.Channels.Channel ⇐ C#
-
Anonymous
Потреблять все сообщения в System.Threading.Channels.Channel
Предположим, у меня есть много производителей, 1 несвязанный потребительский канал, с потребителем:
await foreach (сообщение var в канале.Reader.ReadAllAsync(cts.Token)) { ждать потребления (сообщение); } Проблема в том, что функция consume осуществляет некоторый доступ к вводу-выводу и, возможно, некоторый доступ к сети, поэтому до того, как будет использовано одно сообщение, может быть создано гораздо больше. Но поскольку к ресурсам ввода-вывода нельзя получить одновременный доступ, у меня не может быть много потребителей, и я не могу добавить функцию consume в задачу и забыть о ней.
Функция consume устроена так, что ее можно легко модифицировать для приема нескольких сообщений и обработки их всех в пакете. Итак, мой вопрос заключается в том, есть ли способ заставить потребителя принимать все сообщения в очереди канала всякий раз, когда он пытается получить к нему доступ, примерно так:
while (true) { Message[] messages = awaitchannel.Reader.TakeAll(); ждать ConsumerAll (сообщения); } Редактировать: 1 вариант, который я могу придумать:
List messages = new(); await foreach (сообщение var в канале.Reader.ReadAllAsync(cts.Token)) { ждать потребления (сообщение); Сообщение сообщения; в то время как (channel.Reader.TryRead(выходное сообщение)) сообщения.Добавить(сообщение); если (messages.Count > 0) { ждать ConsumerAll (сообщения); сообщения.Очистить(); } } Но мне кажется, что это лучший способ сделать это.
Предположим, у меня есть много производителей, 1 несвязанный потребительский канал, с потребителем:
await foreach (сообщение var в канале.Reader.ReadAllAsync(cts.Token)) { ждать потребления (сообщение); } Проблема в том, что функция consume осуществляет некоторый доступ к вводу-выводу и, возможно, некоторый доступ к сети, поэтому до того, как будет использовано одно сообщение, может быть создано гораздо больше. Но поскольку к ресурсам ввода-вывода нельзя получить одновременный доступ, у меня не может быть много потребителей, и я не могу добавить функцию consume в задачу и забыть о ней.
Функция consume устроена так, что ее можно легко модифицировать для приема нескольких сообщений и обработки их всех в пакете. Итак, мой вопрос заключается в том, есть ли способ заставить потребителя принимать все сообщения в очереди канала всякий раз, когда он пытается получить к нему доступ, примерно так:
while (true) { Message[] messages = awaitchannel.Reader.TakeAll(); ждать ConsumerAll (сообщения); } Редактировать: 1 вариант, который я могу придумать:
List messages = new(); await foreach (сообщение var в канале.Reader.ReadAllAsync(cts.Token)) { ждать потребления (сообщение); Сообщение сообщения; в то время как (channel.Reader.TryRead(выходное сообщение)) сообщения.Добавить(сообщение); если (messages.Count > 0) { ждать ConsumerAll (сообщения); сообщения.Очистить(); } } Но мне кажется, что это лучший способ сделать это.
Мобильная версия