Код: Выделить всё
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public KeyValue transform(String s, String msg) {
Headers headers = context.headers();
for (int i = 0; i < 2; i++) {
headers.add("test", "test".getBytes());
headers.add("meta", "test".getBytes());
context.forward("new key", msg);
}
return null;
}
Код: Выделить всё
.transform(TestTransformer::new)
.to(orderTopicNameExtractor, Produced.with(Serdes.serdeFrom(String.class),
Serdes.serdeFrom(String.class)));
Проблема заключается в том, что следующим шагом во втором сообщении является получение нескольких заголовков (так что будет 2 «тестовых» ключа), единственный способ — удалить их перед добавлением в цикле преобразования. Он работает локально, но как насчет потокобезопасности и высокой пропускной способности? Контекст, по-видимому, является общим. Может ли возникнуть ситуация, когда метод Remove() удалит заголовок до того, как нижестоящий экстрактор обработает сообщение?
Код: Выделить всё
headers.remove("meta");
headers.add("meta", "test".getBytes());
context.forward("new key", msg);
Подробнее здесь: https://stackoverflow.com/questions/792 ... ges-output