Как мои сообщения можно использовать по порядку, используя prefetchCount > 1?C#

Место общения программистов C#
Ответить
Anonymous
 Как мои сообщения можно использовать по порядку, используя prefetchCount > 1?

Сообщение Anonymous »

У меня есть вопросы/проблемы с пониманием/использованием RabbitMQ с MassTransit в приложении C#.
Нам необходимо использовать наши данные по порядку, и мы сталкиваемся с некоторыми проблемами оптимизации.
У нас есть издатель, который получает данные в базу данных (15 000 строк) каждые 1,5 секунды.
Этот издатель отправляет сообщения, используя функцию, которая группирует данные в сообщениях и отправляет их в 4 очереди. Мы группируем данные в сообщениях по правилу: (data.Id % NumberOfQueues) + 1. В настоящее время наше число очередей равно 4.
Наш потребитель в своем файле program.cs объявляет и настраивает 4 очереди на прямой обмен типами. Чтобы оптимизировать текучесть между брокером и нашим потребителем, мы хотим установить prefetchCount > 1. Чтобы брокер доставлял несколько сообщений одновременно, и мы установили concurencyLimit равным 1, чтобы наши четыре очереди потребляли по одному сообщению за раз (нет параллелизм).
Я пытаюсь установить prefetchCount равным 1.000. В интерфейсе RMQ я вижу, что у меня есть 1000 неподтвержденных сообщений:
неподтвержденные сообщения
Я помещаю несколько журналов в своего потребителя, чтобы увидеть, есть ли сообщения потребляются по порядку:
журналы потребителей упорядочены
На экране вы можете видеть, что мы находимся у сообщения с идентификатором 951, и до сих пор все сообщения находятся в правильный порядок.
Но иногда у нас бывает такое:
потребитель: некоторые исключения
Как вы можете видеть на втором экране из журналов моего потребителя, сообщение с идентификатором 1925 было опубликовано в: 40:3:523, и, несмотря на это, оно было использовано после сообщения с идентификатором 1926, которое было опубликовано после него.
Так что в целом оно работает хорошо (95% хорошо заказал) (оценка) и 5% ошибок (= не заказано).
Но почему? Я не хочу, чтобы эти 5% были неправильными. Как я могу с этим справиться?
Я вижу, что общественный транспорт использует планировщик задач, чтобы сообщить потребителю, какое сообщение использовать из пакета сообщений в оперативной памяти. Если я использую natif RMQ вместо общественного транспорта, будет ли эта проблема решена?
Более того, когда я останавливаю свой процесс (микросервис, у которого есть задача потребления), неподтвержденные сообщения, которые были в оперативной памяти моего потребителя, поскольку pretechCount (1.000), похоже, возвращается в очередь, но в случайном порядке. Посмотрите, что я получаю, когда останавливаю/запускаю своего работника:
1/ Сообщения упорядочены с оценкой 5% неупорядоченных. Я приостанавливаю работника с 315 сообщениями в очереди и 1000 неподтвержденными сообщениями:
состояние очереди перед остановкой
2/ Я останавливаю свой процесс, поэтому неподтвержденные сообщения возвращается в очередь:
состояние очереди после остановки
3/ Я запускаю своего издателя/потребителя:
логи потребителей после перезагрузка
У меня теперь 95% не заказано. Что произошло?
При значении prefetchCount, равном 1,000, и значении concurencyLimit, равном 1, я ожидал, что мой брокер доставит 1,000 сразу в оперативную память моего потребителя, а затем я ожидаю, что мой потребитель потребляйте сообщения одно за другим из-за concurencyLimit в правильном порядке.
Как я могу этого добиться?
Если я создам пакет 1.000 (эквивалент prefetchCount), будут ли мои сообщения использоваться в правильном порядке?

Подробнее здесь: https://stackoverflow.com/questions/792 ... tchcount-1
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «C#»