Интеграция облачного потока Spring позволяет пользователям предоставлять реализацию Function для обработки сообщений, полученных через некоторую предварительно настроенную реализацию MQ, например kafka. Он также позволяет установить десериализатор, который будет преобразовывать массив байтов для обоих ключей и значений во что-то более полезное для приложения и перегружать десериализацию клиенту Kafka. Глядя на интеграцию облачного потока, он использует обычный клиент Kafka под капотом и напрямую взаимодействует с ConsumerRecord, а также сопоставляет дополнительные поля, такие как тема, раздел, смещение и временная метка получения, в качестве заголовков в предоставленном сообщении. реализация функции. Возникает вопрос: как можно настроить облачный поток Spring или интеграцию десериализаторов Kafka для предоставления либо записи потребителя, либо полученной метки времени в десериализатор, чтобы десериализатор мог взаимодействовать с этим значением (например, использовать его) как своего рода «метка времени создания сообщения»). Мой текущий обходной путь — сопоставить заголовок kafka_receivedTimestamp в прослушивателе, но это приводит к утечке части десериализации в потребителя, который будет выполнять фактическую обработку сообщения. Примеры кода ниже
package foo;
record Container(long id, long createdAt) {}
class ContainerDeserializer implements Deserializer {
public Container deserialize(String topic, Headers headers, byte[] data) {
return new Container(ByteBuffer.wrap(data).longValue(), 0L); // would like to read kafka_receivedTimestamp here
}
}
class ContainerConsumer implements Consumer {
public void accept(Message message) {
Container original = message.getPayload();
Container enriched = new Container(original.id(), message.getHeaders().get("kafka_receivedTimestamp", Long.class));
// do something with enriched counterpart
}
}
и в конфигурации yaml
spring:
cloud:
stream:
kafka:
bindings:
containerConsumer-in-0:
consumer:
configuration:
spring.deserializer.value.delegate.class: foo.ContainerDeserializer
Подробнее здесь: https://stackoverflow.com/questions/791 ... reams-meta
Как предоставить десериализатору метаданные ConsumerRecord или Spring Cloud Stream Kafka? ⇐ JAVA
Программисты JAVA общаются здесь
1730294673
Anonymous
Интеграция облачного потока Spring позволяет пользователям предоставлять реализацию Function для обработки сообщений, полученных через некоторую предварительно настроенную реализацию MQ, например kafka. Он также позволяет установить десериализатор, который будет преобразовывать массив байтов для обоих ключей и значений во что-то более полезное для приложения и перегружать десериализацию клиенту Kafka. Глядя на интеграцию облачного потока, он использует обычный клиент Kafka под капотом и напрямую взаимодействует с ConsumerRecord, а также сопоставляет дополнительные поля, такие как тема, раздел, смещение и временная метка получения, в качестве заголовков в предоставленном сообщении. реализация функции. Возникает вопрос: как можно настроить облачный поток Spring или интеграцию десериализаторов Kafka для предоставления либо записи потребителя, либо полученной метки времени в десериализатор, чтобы десериализатор мог взаимодействовать с этим значением (например, использовать его) как своего рода «метка времени создания сообщения»). Мой текущий обходной путь — сопоставить заголовок kafka_receivedTimestamp в прослушивателе, но это приводит к утечке части десериализации в потребителя, который будет выполнять фактическую обработку сообщения. Примеры кода ниже
package foo;
record Container(long id, long createdAt) {}
class ContainerDeserializer implements Deserializer {
public Container deserialize(String topic, Headers headers, byte[] data) {
return new Container(ByteBuffer.wrap(data).longValue(), 0L); // would like to read kafka_receivedTimestamp here
}
}
class ContainerConsumer implements Consumer {
public void accept(Message message) {
Container original = message.getPayload();
Container enriched = new Container(original.id(), message.getHeaders().get("kafka_receivedTimestamp", Long.class));
// do something with enriched counterpart
}
}
и в конфигурации yaml
spring:
cloud:
stream:
kafka:
bindings:
containerConsumer-in-0:
consumer:
configuration:
spring.deserializer.value.delegate.class: foo.ContainerDeserializer
Подробнее здесь: [url]https://stackoverflow.com/questions/79141291/how-to-expose-either-kafkas-consumerrecordk-v-or-spring-cloud-streams-meta[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия