У меня проблема: когда я сразу же отправляю новое сообщение своему потребителю в Kotlin после обработки последнего сообщения, он не получит и не обработает второе сообщение, но если я подожду некоторое время (например, несколько минут, не так ли? не утруждаюсь узнать точное время) и снова отправляю новое сообщение, мой потребитель получает его и обрабатывает. Я на 100% уверен, что новое сообщение будет отправлено, потому что я вижу, что график сообщений на панели управления Rabbit MQ поднимается вверх. Я хочу, чтобы мой потребитель RabbitMQ был достаточно быстрым, чтобы обрабатывать несколько сообщений, но не одновременно, одно за другим.
Вот мой RabbitMQConfig
import org.springframework.amqp.core.Queue
import org.springframework.amqp.rabbit.connection.ConnectionFactory
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter
import org.springframework.amqp.support.converter.MessageConverter
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
@Configuration
class RabbitMQConfig {
@Value("\${spring.rabbitmq.host}")
lateinit var rabbitmqHost: String
@Value("\${rabbitmq.queue}")
lateinit var queueName: String
@Bean
fun queue(): Queue {
return Queue(queueName)
}
@Bean
fun jsonMessageConverter(): MessageConverter {
return Jackson2JsonMessageConverter()
}
@Bean
fun listenerAdapter(receiver: RabbitMQReceiver, converter: MessageConverter): MessageListenerAdapter {
val adapter = MessageListenerAdapter(receiver, "receiveMessage")
adapter.setMessageConverter(converter)
return adapter
}
@Bean
fun container(
connectionFactory: ConnectionFactory,
listenerAdapter: MessageListenerAdapter
): SimpleMessageListenerContainer {
return SimpleMessageListenerContainer().apply {
this.connectionFactory = connectionFactory
this.setQueueNames(queueName)
this.setMessageListener(listenerAdapter)
this.setDefaultRequeueRejected(false)
}
}
}
А вот мой класс RabbitMQReceiver (мой потребитель)
import org.springframework.stereotype.Component
import java.util.concurrent.CountDownLatch
object MessageConstant {
const val DATA_MESSAGE_KEY = "data"
const val TOPIC_MESSAGE_KEY = "topic"
const val TYPE_MESSAGE_KEY = "type"
}
@Component
class RabbitMQReceiver(
private val productStrategy: ProductStrategy,
private val supplierStrategy: SupplierStrategy,
private val supplierStockScheduleStrategy: SupplierStockScheduleStrategy,
private val productCategoryStrategy: ProductCategoryStrategy,
private val deliveryDriverStrategy: DeliveryDriverStrategy,
private val vehicleStrategy: VehicleStrategy,
) {
private val latch = CountDownLatch(1)
fun receiveMessage(message: Map) {
val topic = message[MessageConstant.TOPIC_MESSAGE_KEY] as? String
?: throw IllegalArgumentException("Topic key is missing in the message")
val messageType = message[MessageConstant.TYPE_MESSAGE_KEY] as? String
?: throw IllegalArgumentException("Message type key is missing in the message")
@Suppress("UNCHECKED_CAST")
val data = message[MessageConstant.DATA_MESSAGE_KEY] as? Map
data?.let {
val topicStrategy = getTopicStrategy(topic)
when (messageType) {
MessageType.Upsert.name -> topicStrategy.upsert(it)
MessageType.Unpublish.name -> topicStrategy.delete(it)
MessageType.Delete.name -> topicStrategy.delete(it, softDelete = true)
else -> throw IllegalArgumentException("Unsupported action: $messageType")
}
} ?: throw IllegalArgumentException("Data key is missing or invalid in the message")
latch.countDown()
}
private fun getTopicStrategy(topic: String): TopicStrategy {
return when (topic) {
Topic.Product.name -> productStrategy
Topic.Supplier.name -> supplierStrategy
Topic.SupplierStockSchedule.name -> supplierStockScheduleStrategy
Topic.ProductCategory.name -> productCategoryStrategy
Topic.DeliveryDriver.name -> deliveryDriverStrategy
Topic.Vehicle.name -> vehicleStrategy
else -> throw IllegalArgumentException("Unsupported topic: $topic")
}
}
fun getLatch(): CountDownLatch {
return latch
}
}
Подробнее здесь: https://stackoverflow.com/questions/792 ... it-too-fas
Потребитель RabbitMQ не получит последующее сообщение, если я попытаюсь отправить его слишком быстро после получения пер ⇐ JAVA
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение