Я пишу ProcessorSupplier для объединения n записей в одну. Для этого я использую List Serdes...
Моя проблема в том, что ArrayList всегда пуст.
Использование потоков Java 21 и Kafka 3.7. 0
Код: Выделить всё
public class KafkaTheBatcherProcessorApiApplication {
public static void main(String[] args) {
final Topology topology = new Topology();
topology.addSource( "source-node", stringSerde.deserializer(), stringSerde.deserializer(), "inputTopic");
topology.addProcessor("aggregate-records",
new BatchProcessorSupplierPersistedStore(),
"source-node");
topology.addSink( "sink-node", "outputTopic", stringSerde.serializer(), listSerde.serializer(), "aggregate-records");
Properties properties = new Properties();
try (KafkaStreams kafkaStreams = new KafkaStreams(topology, properties)) {
kafkaStreams.start();
}
}
Код: Выделить всё
class BatchProcessorSupplierPersistedStore implements ProcessorSupplier {
@Override
public Set
Подробнее здесь: [url]https://stackoverflow.com/questions/78486547/kafka-streams-list-serdes-is-always-empty[/url]
Мобильная версия