Пользовательский плагин pulsar Springboot 3.3.1 @PulsarListener с вопросом о политике KeySharedPolicyStickyJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Пользовательский плагин pulsar Springboot 3.3.1 @PulsarListener с вопросом о политике KeySharedPolicySticky

Сообщение Anonymous »

Я пытаюсь использовать компонент @PulsarListener Springboot 3.3.1 для использования SubscriptionType.Key_Shared для получения данных потребителя с двумя разными потребительскими службами, необходимо, чтобы потребитель имел другой ключ, см. код
produce часть:

Код: Выделить всё

@Service
@Slf4j
публичный класс PulsarDemoProductionImpl реализует PulsarDemoProduction {

Код: Выделить всё

@Autowired
private PulsarTemplate pulsarTemplate;
// @Autowired
// частный PulsarTemplate pulsarTemplateList;

Код: Выделить всё

private static final String TOPIC = "real-time-data";

@Override
public void sendPulsarData(int triggerTimes) {
//组装数据
for(int i = 0; i < triggerTimes; i++) {
RealTimeDTO realTimeDTO = loadData();
//转成json字符串
String realTimeDataJson = JSON.toJSONString(realTimeDTO);
long timeStamp = System.currentTimeMillis();
log.info("Sending timeStamp = {} real time data via Pulsar: {}",  timeStamp, realTimeDataJson);
if (i%2 == 0) {
//发送数据
pulsarTemplate.newMessage(realTimeDataJson)
.withTopic(TOPIC)
.withMessageCustomizer((message) -> message.key("data"))
.withSchema(Schema.JSON(String.class))
.sendAsync();
} else {
//发送数据
pulsarTemplate.newMessage(realTimeDataJson)
.withTopic(TOPIC)
.withMessageCustomizer((message) -> message.key("state"))
.withSchema(Schema.JSON(String.class))
.sendAsync();
}

//pulsarTemplate.sendAsync(TOPIC, realTimeDataJson, Schema.JSON(String.class));

}

log.info("Sending real time data via Pulsar");
}

private RealTimeDTO loadData() {
//生成时间的时间戳
Instant instant = Instant.now();
long mills = instant.toEpochMilli();

DecimalFormat decimalFormat = new DecimalFormat("0.00");

return RealTimeDTO.builder()
.uuid(UUID.fastUUID().toString())
.data(RealTimeData.builder().speed(decimalFormat.format(Math.random() * 10)).build())
.timestamp(String.valueOf(mills))
.build();
}
потребительская часть:

Код: Выделить всё

@Service
@Slf4j
публичный класс PulsarDemoConsumeImpl реализует PulsarDemoConsume {

Код: Выделить всё

private static final String TOPIC = "real-time-data";

@Override
@PulsarListener(topics = TOPIC,
subscriptionName = "real-time-data-consumer",
subscriptionType = SubscriptionType.Key_Shared,
schemaType = SchemaType.JSON,
ackMode = AckMode.MANUAL,
concurrency = "5",
consumerCustomizer = @KeySharedPolicy(
KeySharedPolicy.KeySharedPolicySticky.class,
ranges = {"0x00000000-0x7FFFFFFF", "0x80000000-0xFFFFFFFF"}),
ackTimeoutRedeliveryBackoff = "redeliveryBackoff")
public void handleRedeliveryBackOff(Message message, Acknowledgement acknowledgement) {
log.info("real-time-data-consumer key = {} from Pulsar: {}", message.getKey(), message.getValue());
acknowledgement.acknowledge();
}
}
однако аннотация @PulsarListener в настоящее время не задействована в свойствах KeySharedPolicy.KeySharedPolicySticky
как выполнить это требование , есть идеи
Я пытаюсь использовать ИИ, подскажите, но не получается
введите описание изображения здесь

Подробнее здесь: https://stackoverflow.com/questions/790 ... cysticky-p
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»