У меня проблема: когда я сразу же отправляю новое сообщение своему потребителю в Kotlin после обработки последнего сообщения, он не получит и не обработает второе сообщение, но если я подожду некоторое время (например, несколько минут, не так ли? не утруждаюсь узнать точное время) и снова отправляю новое сообщение, мой потребитель получает его и обрабатывает. Я на 100% уверен, что новое сообщение будет отправлено, потому что я вижу, что график сообщений на панели управления Rabbit MQ поднимается вверх. Я хочу, чтобы мой потребитель RabbitMQ был достаточно быстрым, чтобы обрабатывать несколько сообщений, но не одновременно, одно за другим.
Вот мой RabbitMQConfig
import org.springframework.amqp.core.Queue
import org.springframework.amqp.rabbit.connection.ConnectionFactory
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter
import org.springframework.amqp.support.converter.MessageConverter
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
@Configuration
class RabbitMQConfig {
@Value("\${spring.rabbitmq.host}")
lateinit var rabbitmqHost: String
@Value("\${rabbitmq.queue}")
lateinit var queueName: String
@Bean
fun queue(): Queue {
return Queue(queueName)
}
@Bean
fun jsonMessageConverter(): MessageConverter {
return Jackson2JsonMessageConverter()
}
@Bean
fun listenerAdapter(receiver: RabbitMQReceiver, converter: MessageConverter): MessageListenerAdapter {
val adapter = MessageListenerAdapter(receiver, "receiveMessage")
adapter.setMessageConverter(converter)
return adapter
}
@Bean
fun container(
connectionFactory: ConnectionFactory,
listenerAdapter: MessageListenerAdapter
): SimpleMessageListenerContainer {
return SimpleMessageListenerContainer().apply {
this.connectionFactory = connectionFactory
this.setQueueNames(queueName)
this.setMessageListener(listenerAdapter)
this.setDefaultRequeueRejected(false)
}
}
}
А вот мой класс RabbitMQReceiver (мой потребитель)
import org.springframework.stereotype.Component
import java.util.concurrent.CountDownLatch
object MessageConstant {
const val DATA_MESSAGE_KEY = "data"
const val TOPIC_MESSAGE_KEY = "topic"
const val TYPE_MESSAGE_KEY = "type"
}
@Component
class RabbitMQReceiver(
private val productStrategy: ProductStrategy,
private val supplierStrategy: SupplierStrategy,
private val supplierStockScheduleStrategy: SupplierStockScheduleStrategy,
private val productCategoryStrategy: ProductCategoryStrategy,
private val deliveryDriverStrategy: DeliveryDriverStrategy,
private val vehicleStrategy: VehicleStrategy,
) {
private val latch = CountDownLatch(1)
fun receiveMessage(message: Map) {
val topic = message[MessageConstant.TOPIC_MESSAGE_KEY] as? String
?: throw IllegalArgumentException("Topic key is missing in the message")
val messageType = message[MessageConstant.TYPE_MESSAGE_KEY] as? String
?: throw IllegalArgumentException("Message type key is missing in the message")
@Suppress("UNCHECKED_CAST")
val data = message[MessageConstant.DATA_MESSAGE_KEY] as? Map
data?.let {
val topicStrategy = getTopicStrategy(topic)
when (messageType) {
MessageType.Upsert.name -> topicStrategy.upsert(it)
MessageType.Unpublish.name -> topicStrategy.delete(it)
MessageType.Delete.name -> topicStrategy.delete(it, softDelete = true)
else -> throw IllegalArgumentException("Unsupported action: $messageType")
}
} ?: throw IllegalArgumentException("Data key is missing or invalid in the message")
latch.countDown()
}
private fun getTopicStrategy(topic: String): TopicStrategy {
return when (topic) {
Topic.Product.name -> productStrategy
Topic.Supplier.name -> supplierStrategy
Topic.SupplierStockSchedule.name -> supplierStockScheduleStrategy
Topic.ProductCategory.name -> productCategoryStrategy
Topic.DeliveryDriver.name -> deliveryDriverStrategy
Topic.Vehicle.name -> vehicleStrategy
else -> throw IllegalArgumentException("Unsupported topic: $topic")
}
}
fun getLatch(): CountDownLatch {
return latch
}
}
Подробнее здесь: https://stackoverflow.com/questions/792 ... it-too-fas
Потребитель RabbitMQ не получит последующее сообщение, если я попытаюсь отправить его слишком быстро после получения пер ⇐ JAVA
Программисты JAVA общаются здесь
1734049272
Anonymous
У меня проблема: когда я сразу же отправляю новое сообщение своему потребителю в Kotlin после обработки последнего сообщения, он не получит и не обработает второе сообщение, но если я подожду некоторое время (например, несколько минут, не так ли? не утруждаюсь узнать точное время) и снова отправляю новое сообщение, мой потребитель получает его и обрабатывает. Я на 100% уверен, что новое сообщение будет отправлено, потому что я вижу, что график сообщений на панели управления Rabbit MQ поднимается вверх. Я хочу, чтобы мой потребитель RabbitMQ был достаточно быстрым, чтобы обрабатывать несколько сообщений, но не одновременно, одно за другим.
Вот мой RabbitMQConfig
import org.springframework.amqp.core.Queue
import org.springframework.amqp.rabbit.connection.ConnectionFactory
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter
import org.springframework.amqp.support.converter.MessageConverter
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
@Configuration
class RabbitMQConfig {
@Value("\${spring.rabbitmq.host}")
lateinit var rabbitmqHost: String
@Value("\${rabbitmq.queue}")
lateinit var queueName: String
@Bean
fun queue(): Queue {
return Queue(queueName)
}
@Bean
fun jsonMessageConverter(): MessageConverter {
return Jackson2JsonMessageConverter()
}
@Bean
fun listenerAdapter(receiver: RabbitMQReceiver, converter: MessageConverter): MessageListenerAdapter {
val adapter = MessageListenerAdapter(receiver, "receiveMessage")
adapter.setMessageConverter(converter)
return adapter
}
@Bean
fun container(
connectionFactory: ConnectionFactory,
listenerAdapter: MessageListenerAdapter
): SimpleMessageListenerContainer {
return SimpleMessageListenerContainer().apply {
this.connectionFactory = connectionFactory
this.setQueueNames(queueName)
this.setMessageListener(listenerAdapter)
this.setDefaultRequeueRejected(false)
}
}
}
А вот мой класс RabbitMQReceiver (мой потребитель)
import org.springframework.stereotype.Component
import java.util.concurrent.CountDownLatch
object MessageConstant {
const val DATA_MESSAGE_KEY = "data"
const val TOPIC_MESSAGE_KEY = "topic"
const val TYPE_MESSAGE_KEY = "type"
}
@Component
class RabbitMQReceiver(
private val productStrategy: ProductStrategy,
private val supplierStrategy: SupplierStrategy,
private val supplierStockScheduleStrategy: SupplierStockScheduleStrategy,
private val productCategoryStrategy: ProductCategoryStrategy,
private val deliveryDriverStrategy: DeliveryDriverStrategy,
private val vehicleStrategy: VehicleStrategy,
) {
private val latch = CountDownLatch(1)
fun receiveMessage(message: Map) {
val topic = message[MessageConstant.TOPIC_MESSAGE_KEY] as? String
?: throw IllegalArgumentException("Topic key is missing in the message")
val messageType = message[MessageConstant.TYPE_MESSAGE_KEY] as? String
?: throw IllegalArgumentException("Message type key is missing in the message")
@Suppress("UNCHECKED_CAST")
val data = message[MessageConstant.DATA_MESSAGE_KEY] as? Map
data?.let {
val topicStrategy = getTopicStrategy(topic)
when (messageType) {
MessageType.Upsert.name -> topicStrategy.upsert(it)
MessageType.Unpublish.name -> topicStrategy.delete(it)
MessageType.Delete.name -> topicStrategy.delete(it, softDelete = true)
else -> throw IllegalArgumentException("Unsupported action: $messageType")
}
} ?: throw IllegalArgumentException("Data key is missing or invalid in the message")
latch.countDown()
}
private fun getTopicStrategy(topic: String): TopicStrategy {
return when (topic) {
Topic.Product.name -> productStrategy
Topic.Supplier.name -> supplierStrategy
Topic.SupplierStockSchedule.name -> supplierStockScheduleStrategy
Topic.ProductCategory.name -> productCategoryStrategy
Topic.DeliveryDriver.name -> deliveryDriverStrategy
Topic.Vehicle.name -> vehicleStrategy
else -> throw IllegalArgumentException("Unsupported topic: $topic")
}
}
fun getLatch(): CountDownLatch {
return latch
}
}
Подробнее здесь: [url]https://stackoverflow.com/questions/79276874/rabbitmq-consumer-doesnt-receive-subsequent-message-if-i-try-sending-it-too-fas[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия