Kafka Streams отправляет пользовательские заголовки с помощью Transformer при выводе нескольких сообщенийJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Kafka Streams отправляет пользовательские заголовки с помощью Transformer при выводе нескольких сообщений

Сообщение Anonymous »

Я хочу отправить несколько сообщений в нисходящем направлении с помощью Transformer (kafkastreams dsl)

Код: Выделить всё

    private ProcessorContext context;

@Override
public void init(ProcessorContext context) {
this.context = context;
}

@Override
public KeyValue transform(String s, String msg) {
Headers headers = context.headers();
for (int i = 0; i < 2; i++) {
headers.add("test", "test".getBytes());
headers.add("meta", "test".getBytes());

context.forward("new key", msg);
}
return null;
}
В дальнейшем будет TopicNameExtractor

Код: Выделить всё

.transform(TestTransformer::new)
.to(orderTopicNameExtractor, Produced.with(Serdes.serdeFrom(String.class),
Serdes.serdeFrom(String.class)));
Там, где мне нужно использовать данные из заголовка (там будет название темы), в Kafka необходимо записать и другие заголовки.
Проблема заключается в том, что следующим шагом во втором сообщении является получение нескольких заголовков (так что будет 2 «тестовых» ключа), единственный способ — удалить их перед добавлением в цикле преобразования. Он работает локально, но как насчет потокобезопасности и высокой пропускной способности? Контекст, по-видимому, является общим. Может ли возникнуть ситуация, когда метод Remove() удалит заголовок до того, как нижестоящий экстрактор обработает сообщение?

Код: Выделить всё

headers.remove("meta");
headers.add("meta", "test".getBytes());
context.forward("new key", msg);
Есть ли более правильный способ решения этой проблемы? Единственный способ, который, как я думаю, может сработать, - это использовать интерфейс Process и отправить сообщение с помощью withHeaders(new RecordHeaders()) или просто добавить их непосредственно в сообщение


Подробнее здесь: https://stackoverflow.com/questions/792 ... ges-output
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»