У меня есть 2 приложения: Producer и Consumer. Оба в Springboot 3.
Оба общаются через Kafka. Оба имеют зависимость от микрометра.
Если я попаду в любую конечную точку покоя в Consumer через любой клиент, я смогу получить трассировку и диапазон в журналах, но не в случае потребления сообщений Kafka.
PRODUCER -->
От производителя я могу отправить трассировку и spaid через kafka
(на фабрике производителя у меня есть этот флаг - kafkaTemplate.setObservationEnabled(true);).
spring-boot-starter-actuator
org.springframework.boot
micrometer-tracing-bridge-brave
io.micrometer
KAFKA SERVER -->
используя ./kafka-console-consumer.sh, я могу видеть отправляемый заголовок к теме -

ПОТРЕБИТЕЛЬ -->
В журналах потребителей я не могу получить трассировку и диапазон в журналах. Однако в потребительском заголовке у меня есть трассировка и диапазон: Для справки см. простые журналы
Это прослушиватель -
@KafkaListener(containerFactory = "tracingKafkaConsumerFactory", topics = "tracingTopic3")
public void kafkaTracingListener(ConsumerRecords consumerRecords){
consumerRecords.forEach(consumerRecord ->{
try {
if(consumerRecord.headers()!=null) {
consumerRecord.headers().forEach(header -> {
LOGGER.info("Header key {}, Header value {}", header.key(), new String(header.value()));
});
}
TraceDTO traceDto = objectMapper.readValue((String) consumerRecord.value(), TraceDTO.class);
LOGGER.info("consumer record {}", consumerRecord);
} catch (Exception e) {
LOGGER.error("Exception :", e);
}
});
LOGGER.info("Is trace printed here? ");
}
2024-08-02T11:42:07.483+05:30 INFO 30481 --- [trace-demo] [ntainer#0-1-C-1] [ ] c.e.trace.demo.TracingDemoKafkaConsumer : Header key traceparent, Header value 00-66ac78b618581c0549d46cbed2dfa9dc-9675e35b152ee946-01
2024-08-02T11:42:07.484+05:30 INFO 30481 --- [trace-demo] [ntainer#0-1-C-1] [ ] c.e.trace.demo.TracingDemoKafkaConsumer : Header key __TypeId__, Header value com.srs.chargesessionmanagement.tracetest.TraceDTO
2024-08-02T11:42:07.537+05:30 INFO 30481 --- [trace-demo] [ntainer#0-1-C-1] [ ] c.e.trace.demo.TracingDemoKafkaConsumer : data TraceDTO(id=11, data=eleven)
2024-08-02T11:42:07.571+05:30 INFO 30481 --- [trace-demo] [ntainer#0-1-C-1] [ ] c.e.trace.demo.TracingDemoKafkaConsumer : consumer record ConsumerRecord(topic = tracingTopic3, partition = 1, leaderEpoch = 0, offset = 2, CreateTime = 1722579126973, serialized key size = 3, serialized value size = 25, headers = RecordHeaders(headers = [RecordHeader(key = traceparent, value = [48, 48, 45, 54, 54, 97, 99, 55, 56, 98, 54, 49, 56, 53, 56, 49, 99, 48, 53, 52, 57, 100, 52, 54, 99, 98, 101, 100, 50, 100, 102, 97, 57, 100, 99, 45, 57, 54, 55, 53, 101, 51, 53, 98, 49, 53, 50, 101, 101, 57, 52, 54, 45, 48, 49]), RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 115, 114, 115, 46, 99, 104, 97, 114, 103, 101, 115, 101, 115, 115, 105, 111, 110, 109, 97, 110, 97, 103, 101, 109, 101, 110, 116, 46, 116, 114, 97, 99, 101, 116, 101, 115, 116, 46, 84, 114, 97, 99, 101, 68, 84, 79])], isReadOnly = false), key = key, value = {"id":11,"data":"eleven"})
2024-08-02T11:42:07.571+05:30 INFO 30481 --- [trace-demo] [ntainer#0-1-C-1] [ ] c.e.trace.demo.TracingDemoKafkaConsumer : Is trace printed here?
Потребительская конфигурация –
@EnableKafka
@Configuration
public class TraceDemoKafkaConsumerConfig {
@Bean
public Map tracingKafkaConsumerConfigs() {
Map props = new HashMap();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9094");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "tracing-group-3");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
@Bean
public DefaultKafkaConsumerFactory tracingKafkaConsumerDefaultFactory() {
return new DefaultKafkaConsumerFactory(tracingKafkaConsumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory tracingKafkaConsumerFactory(){
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(tracingKafkaConsumerDefaultFactory());
factory.setConcurrency(2);
factory.setBatchListener(true);
factory.getContainerProperties().setObservationEnabled(true);
factory.getContainerProperties().setSyncCommits(true);
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.valueOf("BATCH"));
return factory;
}
}
application.yaml -
management:
tracing:
sampling:
probability: 1
Подробнее здесь: https://stackoverflow.com/questions/788 ... -from-topi