Код: Выделить всё
StreamsBuilder builder = new StreamsBuilder();
KStream stream = builder.stream(inputTopicName, Consumed.with(Serdes.String(), outputValueSerde).withTimestampExtractor(new CustomTimestampExtractor()));
stream.groupBy((key, value) -> String.valueOf(value.getMessageKey()),
Grouped.with(Serdes.String(), outputValueSerde))
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(AGGREGATION_PERIOD)))
.emitStrategy(EmitStrategy.onWindowClose())
.aggregate(
CustomObject::new,
(key, value, aggregate) ->
{
// aggregation logic
},
Materialized.
as("state-store")
.withKeySerde(Serdes.String())
.withValueSerde(outputValueSerde)
.withStoreType(Materialized.StoreType.IN_MEMORY)
.withCachingDisabled()
.withRetention(Duration.ofSeconds(AGGREGATION_PERIOD * RETENTION_BUFFER_RATE))
)
.toStream(windowedKey, aggregate) -> windowedKey.key())
.to(outputTopicName, Produced.with(Serdes.String(), outputValueSerde));
< /code>
При запуске этот поток ведет себя по желанию. Тем не менее, модульный тест нет. Ниже приведен рассматриваемый тест. < /P>
@Test
public void testStreamsValidInput()
{
// two messages within window
CustomObject value = new CustomObject("", , );
CustomObject aggregate = new CustomObject("", , );
// send message timed for after window of first two messages, advances stream time and closes previous window
CustomObject advance = new CustomObject("", , );
// send first two
inputTopic.pipeInput(aggregate);
inputTopic.pipeInput(value);
// send third to close window
inputTopic.pipeInput(advance);
CustomObject output = null;
try
{
output = outputTopic.readValue();
} catch (NoSuchElementException e)
{
fail("No message was produced to output topic!", e);
}
assertEquals(, output.getTimestamp().toString());
assertEquals(1, output.getIntegerKey());
assertEquals(2.0, output.getDoubleValueBeingAggregated());
// confirm only one message was produced to output topic
assertThrows(NoSuchElementException.class, () -> outputTopic.readValue());
}
< P> Я запустил реальный поток и вложил те же сообщения в теме ввода, что и в тесте. Настоящий поток вел себя как и ожидалось: после отправки третьего сообщения было закрыто первое окно, и было создано правильное совокупное сообщение. Код> to .suppress (suppressed.utilwindowclose (подавленные. bufferconfig.unbounded ())) и тест Прошел. Использование Fustress () , но не EmitStrategy () и как исправить тест, чтобы он работал с EmitStrategy () , это было бы много ценится.
заранее спасибо.
Подробнее здесь: https://stackoverflow.com/questions/794 ... ed-but-not