У меня есть вопросы/проблемы с пониманием/использованием RabbitMQ с MassTransit в приложении C#.
Нам необходимо использовать наши данные по порядку, и мы сталкиваемся с некоторыми проблемами оптимизации.
У нас есть издатель, который получает данные в базу данных (15 000 строк) каждые 1,5 секунды.
Этот издатель отправляет сообщения, используя функцию, которая группирует данные в сообщениях и отправляет их в 4 очереди. Мы группируем данные в сообщениях по правилу: (data.Id % NumberOfQueues) + 1. В настоящее время наше число очередей равно 4.
Наш потребитель в своем файле program.cs объявляет и настраивает 4 очереди на прямой обмен типами. Чтобы оптимизировать текучесть между брокером и нашим потребителем, мы хотим установить prefetchCount > 1. Чтобы брокер доставлял несколько сообщений одновременно, и мы установили concurencyLimit равным 1, чтобы наши четыре очереди потребляли по одному сообщению за раз (нет параллелизм).
Я пытаюсь установить prefetchCount равным 1.000. В интерфейсе RMQ я вижу, что у меня есть 1000 неподтвержденных сообщений:
неподтвержденные сообщения
Я помещаю несколько журналов в своего потребителя, чтобы увидеть, есть ли сообщения потребляются по порядку:
журналы потребителей упорядочены
На экране вы можете видеть, что мы находимся у сообщения с идентификатором 951, и до сих пор все сообщения находятся в правильный порядок.
Но иногда у нас бывает такое:
потребитель: некоторые исключения
Как вы можете видеть на втором экране из журналов моего потребителя, сообщение с идентификатором 1925 было опубликовано в: 40:3:523, и, несмотря на это, оно было использовано после сообщения с идентификатором 1926, которое было опубликовано после него.
Так что в целом оно работает хорошо (95% хорошо заказал) (оценка) и 5% ошибок (= не заказано).
Но почему? Я не хочу, чтобы эти 5% были неправильными. Как я могу с этим справиться?
Я вижу, что общественный транспорт использует планировщик задач, чтобы сообщить потребителю, какое сообщение использовать из пакета сообщений в оперативной памяти. Если я использую natif RMQ вместо общественного транспорта, будет ли эта проблема решена?
Более того, когда я останавливаю свой процесс (микросервис, у которого есть задача потребления), неподтвержденные сообщения, которые были в оперативной памяти моего потребителя, поскольку pretechCount (1.000), похоже, возвращается в очередь, но в случайном порядке. Посмотрите, что я получаю, когда останавливаю/запускаю своего работника:
1/ Сообщения упорядочены с оценкой 5% неупорядоченных. Я приостанавливаю работника с 315 сообщениями в очереди и 1000 неподтвержденными сообщениями:
состояние очереди перед остановкой
2/ Я останавливаю свой процесс, поэтому неподтвержденные сообщения возвращается в очередь:
состояние очереди после остановки
3/ Я запускаю своего издателя/потребителя:
логи потребителей после перезагрузка
У меня теперь 95% не заказано. Что произошло?
При значении prefetchCount, равном 1,000, и значении concurencyLimit, равном 1, я ожидал, что мой брокер доставит 1,000 сразу в оперативную память моего потребителя, а затем я ожидаю, что мой потребитель потребляйте сообщения одно за другим из-за concurencyLimit в правильном порядке.
Как я могу этого добиться?
Если я создам пакет 1.000 (эквивалент prefetchCount), будут ли мои сообщения использоваться в правильном порядке?
Подробнее здесь: https://stackoverflow.com/questions/792 ... tchcount-1
Как мои сообщения можно использовать по порядку, используя prefetchCount > 1? ⇐ C#
Место общения программистов C#
1732028758
Anonymous
У меня есть вопросы/проблемы с пониманием/использованием RabbitMQ с MassTransit в приложении C#.
Нам необходимо использовать наши данные по порядку, и мы сталкиваемся с некоторыми проблемами оптимизации.
У нас есть издатель, который получает данные в базу данных (15 000 строк) каждые 1,5 секунды.
Этот издатель отправляет сообщения, используя функцию, которая группирует данные в сообщениях и отправляет их в 4 очереди. Мы группируем данные в сообщениях по правилу: (data.Id % NumberOfQueues) + 1. В настоящее время наше число очередей равно 4.
Наш потребитель в своем файле program.cs объявляет и настраивает 4 очереди на прямой обмен типами. Чтобы оптимизировать текучесть между брокером и нашим потребителем, мы хотим установить prefetchCount > 1. Чтобы брокер доставлял несколько сообщений одновременно, и мы установили concurencyLimit равным 1, чтобы наши четыре очереди потребляли по одному сообщению за раз (нет параллелизм).
Я пытаюсь установить prefetchCount равным 1.000. В интерфейсе RMQ я вижу, что у меня есть 1000 неподтвержденных сообщений:
неподтвержденные сообщения
Я помещаю несколько журналов в своего потребителя, чтобы увидеть, есть ли сообщения потребляются по порядку:
журналы потребителей упорядочены
На экране вы можете видеть, что мы находимся у сообщения с идентификатором 951, и до сих пор все сообщения находятся в правильный порядок.
Но иногда у нас бывает такое:
потребитель: некоторые исключения
Как вы можете видеть на втором экране из журналов моего потребителя, сообщение с идентификатором 1925 было опубликовано в: 40:3:523, и, несмотря на это, оно было использовано после сообщения с идентификатором 1926, которое было опубликовано после него.
Так что в целом оно работает хорошо (95% хорошо заказал) (оценка) и 5% ошибок (= не заказано).
Но почему? Я не хочу, чтобы эти 5% были неправильными. Как я могу с этим справиться?
Я вижу, что общественный транспорт использует планировщик задач, чтобы сообщить потребителю, какое сообщение использовать из пакета сообщений в оперативной памяти. Если я использую natif RMQ вместо общественного транспорта, будет ли эта проблема решена?
Более того, когда я останавливаю свой процесс (микросервис, у которого есть задача потребления), неподтвержденные сообщения, которые были в оперативной памяти моего потребителя, поскольку pretechCount (1.000), похоже, возвращается в очередь, но в случайном порядке. Посмотрите, что я получаю, когда останавливаю/запускаю своего работника:
1/ Сообщения упорядочены с оценкой 5% неупорядоченных. Я приостанавливаю работника с 315 сообщениями в очереди и 1000 неподтвержденными сообщениями:
состояние очереди перед остановкой
2/ Я останавливаю свой процесс, поэтому неподтвержденные сообщения возвращается в очередь:
состояние очереди после остановки
3/ Я запускаю своего издателя/потребителя:
логи потребителей после перезагрузка
У меня теперь 95% не заказано. Что произошло?
При значении prefetchCount, равном 1,000, и значении concurencyLimit, равном 1, я ожидал, что мой брокер доставит 1,000 сразу в оперативную память моего потребителя, а затем я ожидаю, что мой потребитель потребляйте сообщения одно за другим из-за concurencyLimit в правильном порядке.
Как я могу этого добиться?
Если я создам пакет 1.000 (эквивалент prefetchCount), будут ли мои сообщения использоваться в правильном порядке?
Подробнее здесь: [url]https://stackoverflow.com/questions/79203917/how-can-my-messages-be-consumed-in-order-using-a-prefetchcount-1[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия