Код: Выделить всё
package org.example;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.api.OpenTelemetry;
import org.springframework.beans.factory.annotation.Autowired;
import java.time.Instant;
import java.util.Map;
import java.util.HashMap;
@Service
public class ConsumerService {
@Autowired
private Tracer tracer;
@Autowired
private OpenTelemetry openTelemetry;
@KafkaListener(topics = "sample-topic", groupId = "group-2")
public void consumeLinksWithRecord(ConsumerRecord record) {
String word = record.value();
String traceparent = record.headers().lastHeader("traceparent") != null
? new String(record.headers().lastHeader("traceparent").value())
: "No traceparentid header found";
// Create a span for Kafka message processing with ConsumerRecord
Span span = tracer.spanBuilder("inside-kafka")
.setAttribute("service.name", "insidekafkaservice")
.setAttribute("kafka.topic", record.topic())
.setAttribute("kafka.partition", record.partition())
.setAttribute("kafka.offset", record.offset())
.setAttribute("kafka.key", record.key() != null ? record.key() : "null")
.setAttribute("kafka.message", word)
.setAttribute("kafka.traceparent", traceparent)
.setStartTimestamp(Instant.ofEpochMilli(record.timestamp()))
.startSpan();
try (var scope = span.makeCurrent()) {
extractTraceContext(traceparent, span);
System.out.println("Received Message: " + word + " from partition: " + record.partition());
System.out.println("Trace Parent ID: " + traceparent);
// Add custom attributes to the span
span.setAttribute("message.length", word.length());
span.setAttribute("header.count", record.headers().spliterator().getExactSizeIfKnown());
try {
Thread.sleep(80);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
span.setStatus(StatusCode.ERROR, "Processing interrupted");
return;
}
span.setStatus(StatusCode.OK);
} catch (Exception e) {
span.setStatus(StatusCode.ERROR, e.getMessage());
span.recordException(e);
throw e;
} finally {
span.end();
}
}
private void extractTraceContext(String traceparentId, Span span) {
try {
// Create a carrier map with the traceparentid
Map carrier = new HashMap();
carrier.put("traceparent", traceparentId);
// Extract the context using the W3C trace context propagator
TextMapPropagator propagator = openTelemetry.getPropagators().getTextMapPropagator();
Context extractedContext = propagator.extract(Context.current(), carrier, new TextMapGetter() {
@Override
public String get(Map carrier, String key) {
return carrier.get(key);
}
@Override
public Iterable keys(Map carrier) {
return carrier.keySet();
}
});
// Link the extracted context to the current span
if (extractedContext != Context.current()) {
span.addLink(Span.fromContext(extractedContext).getSpanContext());
}
} catch (Exception e) {
// Log the error but don't fail the processing
System.err.println("Failed to extract trace context: " + e.getMessage());
}
}
}
[*]the producer starts their logic at 00:00
[*]the producer finishes their logic and puts the message inside Kafka at 00:01
[*]the consumer picked up the message at 00:04
the Потребитель закончил свою бизнес -логику в сообщении по телефону 00:05 < /li>
< /ul>
это означало бы, что сообщение осталось внутри Кафки от 00:01 до 00:04. src = "https://i.sstatic.net/6hldxjmb.png"/>
, но вместо этого я сейчас вижу это:
prcy. /> Как создать дополнительную трассу, представляющую время, проведенное в Кафке? < /p>
Подробнее здесь: https://stackoverflow.com/questions/796 ... -spring-ka