В настоящее время у меня есть услуга, которая читает события из очереди на SQS и обрабатывает найденные сообщения. У меня есть параллелизм с двумя темами, которые читают одновременно в очереди для обработки сообщений. Итерация составляет одну секунду, чтобы проверить, есть ли в очереди сообщения.
Это значительно увеличило стоимость AWS. В поисках решения, теперь я хочу иметь динамическую задержку или что -то подобное, когда служба читает сообщения из очереди. Это те шаги, которые я хочу выполнить в своем методе < /p>
, если в очереди нет сообщений, увеличивают задержку, чтобы снова прочитать очередь за одну секунду. Например, сервис считывает очередь, и нет сообщений, задержка будет увеличена с 1 секунды до 2 секунд. Он будет работать так же, пока задержка не стала 60 секунд. В случае обнаружения сообщения задержка будет сброшена до 1 секунды, и проверка начнется снова с 1 секунды до 60. очередь и внутри этой проверки увеличивают задержку для чтения очереди. < /p>
Это метод, который я создал для чтения сообщений из очереди.private void createPollingStream() {
Multi.createBy().repeating()
.supplier(
() -> sqsMessagePoller.pollUDMUsages() //read messages from the queue. In case of not messages I want to increase the delay
.runSubscriptionOn(Infrastructure.getDefaultExecutor())
.onItem().transformToUniAndMerge(this::processMessagesQueue) //If the poll has messages we call to another method to process them.
.onFailure().invoke(failure ->
Log.errorf("Error processing the messages: %s", failure))
.subscribe()
.with(succ -> Log.info("Current iteration of processing message complete"),
failure -> Log.error("Failed to process message in flow", failure)
)
).withDelay(Duration.ofMillis(delay))//Initial delay of read messages from the queue. I guess it should be dynamic here.
.indefinitely()
.subscribe().with(x -> Log.info("Current iteration of processing message complete"),
failure -> Log.error("Failed to poll messages in flow", failure));
}
Подробнее здесь: https://stackoverflow.com/questions/794 ... ny-in-java