Создайте дополнительную трассировку, представляющую «время, проведенное внутри Кафки» с использованием Spring Kafka / OpJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Создайте дополнительную трассировку, представляющую «время, проведенное внутри Кафки» с использованием Spring Kafka / Op

Сообщение Anonymous »

Я работаю с потребителем Cafka Spring Boot, который использует OpenElemetry для трассировки.

Код: Выделить всё

package org.example;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.api.OpenTelemetry;
import org.springframework.beans.factory.annotation.Autowired;

import java.time.Instant;
import java.util.Map;
import java.util.HashMap;

@Service
public class ConsumerService {

@Autowired
private Tracer tracer;

@Autowired
private OpenTelemetry openTelemetry;

@KafkaListener(topics = "sample-topic", groupId = "group-2")
public void consumeLinksWithRecord(ConsumerRecord record) {
String word = record.value();

String traceparent = record.headers().lastHeader("traceparent") != null
? new String(record.headers().lastHeader("traceparent").value())
: "No traceparentid header found";

// Create a span for Kafka message processing with ConsumerRecord
Span span = tracer.spanBuilder("inside-kafka")
.setAttribute("service.name", "insidekafkaservice")
.setAttribute("kafka.topic", record.topic())
.setAttribute("kafka.partition", record.partition())
.setAttribute("kafka.offset", record.offset())
.setAttribute("kafka.key", record.key() != null ? record.key() : "null")
.setAttribute("kafka.message", word)
.setAttribute("kafka.traceparent", traceparent)
.setStartTimestamp(Instant.ofEpochMilli(record.timestamp()))
.startSpan();

try (var scope = span.makeCurrent()) {

extractTraceContext(traceparent, span);

System.out.println("Received Message: " + word + " from partition: " + record.partition());
System.out.println("Trace Parent ID: " + traceparent);

// Add custom attributes to the span
span.setAttribute("message.length", word.length());
span.setAttribute("header.count", record.headers().spliterator().getExactSizeIfKnown());

try {
Thread.sleep(80);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
span.setStatus(StatusCode.ERROR, "Processing interrupted");
return;
}

span.setStatus(StatusCode.OK);

} catch (Exception e) {
span.setStatus(StatusCode.ERROR, e.getMessage());
span.recordException(e);
throw e;
} finally {
span.end();
}
}

private void extractTraceContext(String traceparentId, Span span) {
try {
// Create a carrier map with the traceparentid
Map carrier = new HashMap();
carrier.put("traceparent", traceparentId);

// Extract the context using the W3C trace context propagator
TextMapPropagator propagator = openTelemetry.getPropagators().getTextMapPropagator();
Context extractedContext = propagator.extract(Context.current(), carrier, new TextMapGetter() {
@Override
public String get(Map carrier, String key) {
return carrier.get(key);
}

@Override
public Iterable keys(Map  carrier) {
return carrier.keySet();
}
});

// Link the extracted context to the current span
if (extractedContext != Context.current()) {
span.addLink(Span.fromContext(extractedContext).getSpanContext());
}

} catch (Exception e) {
// Log the error but don't fail the processing
System.err.println("Failed to extract trace context: " + e.getMessage());
}
}
}

For instance, if:

[*]the producer starts their logic at 00:00
[*]the producer finishes their logic and puts the message inside Kafka at 00:01
[*]the consumer picked up the message at 00:04
the Потребитель закончил свою бизнес -логику в сообщении по телефону 00:05 < /li>
< /ul>
это означало бы, что сообщение осталось внутри Кафки от 00:01 до 00:04. src = "https://i.sstatic.net/6hldxjmb.png"/>
, но вместо этого я сейчас вижу это:
prcy. /> Как создать дополнительную трассу, представляющую время, проведенное в Кафке? < /p>

Подробнее здесь: https://stackoverflow.com/questions/796 ... -spring-ka
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»