Вот части моего компонента
Класс компонентов Camel
@Component("zipkintrace")
public class ZipkinTraceComponent extends DefaultComponent {
private final ZipkinTraceProperties zipkinTraceProperties;
private final ZipkinTraceCache zipkinTraceCache;
public ZipkinTraceComponent(
CamelContext context, ZipkinTraceProperties zipkinTraceProperties,
ZipkinTraceCache zipkinTraceCache
) {
super(context);
this.zipkinTraceProperties = zipkinTraceProperties;
this.zipkinTraceCache = zipkinTraceCache;
}
@Override
protected ZipkinTraceEndpoint createEndpoint(
String uri, String remaining, Map parameters
) throws Exception {
ZipkinTraceEndpoint endpoint = new ZipkinTraceEndpoint(
uri, this, zipkinTraceProperties, zipkinTraceCache
);
setProperties(endpoint, parameters);
endpoint.setAction(remaining);
return endpoint;
}
}
Класс Camel Endpoint
@UriEndpoint(
firstVersion = "3.21.0",
scheme = "zipkintrace",
syntax = "zipkintrace:action",
title = "zipkintrace",
category = Category.LOG,
producerOnly = true,
headersClass = ZipkinTraceConstants.class
)
public class ZipkinTraceEndpoint extends DefaultEndpoint implements AsyncEndpoint {
private final ZipkinTraceProperties zipkinTraceProperties;
private final ZipkinTraceCache zipkinTraceCache;
@UriPath
@Metadata(required = true)
private String action;
// other params
public ZipkinTraceEndpoint(String endpointUri, Component component,
ZipkinTraceProperties zipkinTraceProperties, ZipkinTraceCache zipkinTraceCache) {
super(endpointUri, component);
this.zipkinTraceProperties = zipkinTraceProperties;
this.zipkinTraceCache = zipkinTraceCache;
}
@Override
public Producer createProducer() throws Exception {
return new ZipkinTraceProduces(this, zipkinTraceProperties, zipkinTraceCache);
}
@Override
public Consumer createConsumer(Processor processor) throws Exception {
throw new IllegalArgumentException("zipkintraser has no consumer, so you cannot use get any data from him");
}
// getters\setters
}
Класс производителей верблюдов
public class ZipkinTraceProduces extends DefaultAsyncProducer {
private final ZipkinTraceProperties zipkinTraceProperties;
private final ZipkinTraceCache zipkinTraceCache;
public ZipkinTraceProduces(ZipkinTraceEndpoint endpoint,
ZipkinTraceProperties zipkinTraceProperties, ZipkinTraceCache zipkinTraceCache
) {
super(endpoint);
this.zipkinTraceProperties = zipkinTraceProperties;
this.zipkinTraceCache = zipkinTraceCache;
}
@Override
public ZipkinTraceEndpoint getEndpoint() {
return (ZipkinTraceEndpoint) super.getEndpoint();
}
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
if (!isRunAllowed()) {
return shutDownWithException(exchange, callback);
}
try {
// useful work
callback.done(true);
return true;
} catch (Throwable e) {
exchange.setException(e);
callback.done(true);
return true;
}
}
private boolean shutDownWithException(Exchange exchange, AsyncCallback callback) {
if (isNull(exchange.getException())) {
exchange.setException(new RejectedExecutionException());
}
callback.done(true);
return true;
}
}
Этот график без использования моего компонента

Этот график, когда мой компонент просто регистрирует действие
[img]https:// i.sstatic.net/cWEGrYlg.png[/img]
Я вызываю компонент таким образом, и это может произойти несколько раз по одному и тому же маршруту
from("kafka:...")
.toD("zipkintrace:continue?traceContext=${body.traceContext}") // call my component
.to("direct:elkOutRequestProcessingRoute")
.to("direct:commitKafka")
.to("zipkintrace:end") // call my component
.end();
Подробнее здесь: https://stackoverflow.com/questions/784 ... mory-usage