Обмен заголовками, сообщения не маршрутизируются по заголовкам x-match = всеJAVA

Программисты JAVA общаются здесь
Anonymous
Обмен заголовками, сообщения не маршрутизируются по заголовкам x-match = все

Сообщение Anonymous »

Я пытаюсь настроить обмен заголовками с очередью, в которой сообщения маршрутизируются на основе заголовка получателя.
Обмен имеет тип заголовков.
На данный момент класс может подключаться к обмену и передавать сообщения в очередь.
Он также может подписаться на очередь и получать сообщения. Он также закрывает соединение всякий раз, когда соединение подписчика прерывается.
Текущая проблема заключается в том, что сообщение не маршрутизируется по значению заголовка получателя.
Для следующего класса:

Код: Выделить всё

import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

@Slf4j
public class MyQueue {

private final ConnectionFactory connectionFactory;
private Channel channel;

public MyQueue(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}

public String sendMessage(TestTextMessage message) throws UndeliverableMessageException {
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {

Map headers = new HashMap();
headers.put(RabbitMqConfig.MATCH_HEADER, message.getRecipient());
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.deliveryMode(MessageProperties.PERSISTENT_TEXT_PLAIN.getDeliveryMode())
.priority(MessageProperties.PERSISTENT_TEXT_PLAIN.getPriority())
.headers(headers).build();

log.info("Sending message to {}", headers);

channel.basicPublish(RabbitMqConfig.EXCHANGE_NAME, "", props,
message.getMessage().getBytes(StandardCharsets.UTF_8));

log.info("RabbitMQ sent message {} to {}", message.getMessage(), message.getRecipient());
return "ok";
} catch (TimeoutException e) {
log.error("Rabbit mq timeout", e);
} catch (IOException e) {
log.error("Rabbit mq io error", e);
}
throw new UndeliverableMessageException();
}

public Flux listenMessages(String recipient) throws IOException, TimeoutException {
Connection connection = connectionFactory.newConnection();
this.channel = connection.createChannel();

// The map for the headers.
Map headers = new HashMap();
headers.put("x-match", "all");
headers.put(RabbitMqConfig.MATCH_HEADER, recipient);

final String[] consumerTag = new String[1];
Flux as = Flux.create(sink -> new MessageListener() {
{
try {
log.info("Binding to {}", headers);
channel.queueBind(RabbitMqConfig.QUEUE_NAME, RabbitMqConfig.EXCHANGE_NAME, "",
headers);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
log.info("Subscriber {} received a message {} with headers {}", recipient, delivery.getEnvelope(),
delivery.getProperties().getHeaders());

sink.next(delivery.getEnvelope().getDeliveryTag() + "--" + message);
//channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};

consumerTag[0] = channel.basicConsume(RabbitMqConfig.QUEUE_NAME,
true, deliverCallback, tag -> {
sink.complete();
});

} catch (IOException e) {
log.error("RabbitMQ IOException ", e);
}
}

});

return as.doOnCancel(() ->  {
try {
if (consumerTag[0] == null) {
log.error("RabbitMQ uncloseable subscription, consumerTag is null!");
channel.close();
return;
}
channel.basicCancel(consumerTag[0]);
channel.close();
log.info("RabbitMQ CANCEL subscription for recipient {}", recipient);
} catch (IOException | TimeoutException e) {
log.error("RabbitMQ channel close error", e);
}
});
}

interface MessageListener {

}
}
Обмен объявляется следующим вызовом

Код: Выделить всё

channel.exchangeDeclare(RabbitMqConfig.EXCHANGE_NAME, BuiltinExchangeType.HEADERS, true);
Привязка журнала получателей:

Код: Выделить всё

Binding to {x-match=all, message-recipient=mary}
Binding to {x-match=all, message-recipient=james}
Binding to {x-match=all, message-recipient=john}
Привязка 3 получателей с помощью x-match:
Изображение

Однако сообщения не сопоставляются, как если бы они были маршрутизированы случайным образом

Код: Выделить всё

Sending message to {message-recipient=james}
RabbitMQ sent message Hey there to james
Subscriber mary received a message Envelope(deliveryTag=1, redeliver=false, exchange=my-exchange, routingKey=) with headers {message-recipient=james}

Sending message to {message-recipient=james}
RabbitMQ sent message Hey there to james
Subscriber james received a message Envelope(deliveryTag=1, redeliver=false, exchange=my-exchange, routingKey=) with headers {message-recipient=james}

Sending message to {message-recipient=james}
RabbitMQ sent message Hey there to james
Subscriber john received a message Envelope(deliveryTag=1, redeliver=false, exchange=my-exchange, routingKey=) with headers {message-recipient=james}
Почему x-match: all не соответствует?

Вернуться в «JAVA»