Драйвер испытаний на топологию кафки потока - записи, созданные с помощью подавленного (), но не эминстратегии ()JAVA

Программисты JAVA общаются здесь
Anonymous
Драйвер испытаний на топологию кафки потока - записи, созданные с помощью подавленного (), но не эминстратегии ()

Сообщение Anonymous »

Я работаю над написанием модульных тестов для приложения Kafka Streams, которое использует временное окно для агрегирования сообщений и возвращаю максимальное значение, наблюдаемое в этих сообщениях в этом окне. Ниже приведено определение потока. < /P>

Код: Выделить всё

StreamsBuilder builder = new StreamsBuilder();
KStream stream = builder.stream(inputTopicName, Consumed.with(Serdes.String(), outputValueSerde).withTimestampExtractor(new CustomTimestampExtractor()));
stream.groupBy((key, value) -> String.valueOf(value.getMessageKey()),
Grouped.with(Serdes.String(), outputValueSerde))
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(AGGREGATION_PERIOD)))
.emitStrategy(EmitStrategy.onWindowClose())
.aggregate(
CustomObject::new,
(key, value, aggregate) ->
{
// aggregation logic
},
Materialized.
as("state-store")
.withKeySerde(Serdes.String())
.withValueSerde(outputValueSerde)
.withStoreType(Materialized.StoreType.IN_MEMORY)
.withCachingDisabled()
.withRetention(Duration.ofSeconds(AGGREGATION_PERIOD * RETENTION_BUFFER_RATE))
)
.toStream(windowedKey, aggregate) -> windowedKey.key())
.to(outputTopicName, Produced.with(Serdes.String(), outputValueSerde));
< /code>
При запуске этот поток ведет себя по желанию. Тем не менее, модульный тест нет. Ниже приведен рассматриваемый тест. < /P>
    @Test
public void testStreamsValidInput()
{
// two messages within window
CustomObject value = new CustomObject("", , );
CustomObject aggregate = new CustomObject("", , );

// send message timed for after window of first two messages, advances stream time and closes previous window
CustomObject advance = new CustomObject("", , );

// send first two
inputTopic.pipeInput(aggregate);
inputTopic.pipeInput(value);

// send third to close window
inputTopic.pipeInput(advance);

CustomObject output = null;
try
{
output = outputTopic.readValue();
} catch (NoSuchElementException e)
{
fail("No message was produced to output topic!", e);
}

assertEquals(, output.getTimestamp().toString());
assertEquals(1, output.getIntegerKey());
assertEquals(2.0, output.getDoubleValueBeingAggregated());

// confirm only one message was produced to output topic
assertThrows(NoSuchElementException.class, () -> outputTopic.readValue());
}
При выполнении этого теста он не работает на outputtopic.readvalue () , потому что поток не дал значения для вывода.
< P> Я запустил реальный поток и вложил те же сообщения в теме ввода, что и в тесте. Настоящий поток вел себя как и ожидалось: после отправки третьего сообщения было закрыто первое окно, и было создано правильное совокупное сообщение. Код> to .suppress (suppressed.utilwindowclose (подавленные. bufferconfig.unbounded ())) и тест Прошел. Использование Fustress () , но не EmitStrategy () и как исправить тест, чтобы он работал с EmitStrategy () , это было бы много ценится.
заранее спасибо.

Подробнее здесь: https://stackoverflow.com/questions/794 ... ed-but-not

Вернуться в «JAVA»