У меня есть проект с потоками Kafka, чтобы создать одну минуту свечу по цене для акций. Моя топология: < /p>
List inputTopics = new ArrayList();
inputTopics.add(tradeTopic);
Consumed CandleConsumerOptions = Consumed
.with(Serdes.String(), ProtobufSerdes.Trade())
.withTimestampExtractor(new CandleTimestampExtractor());
KTable ohlcKTable =
stream
.filter((key, Trade) -> Trade.getTradeTime() != 0 && Trade.getTradeTime() > todayMillis)
.groupByKey(
Grouped.with(Serdes.String(),ProtobufSerdes.Trade())
)
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(1)),Duration.ofHours(8)))
.aggregate(
()-> ProtoClass.OHLC.newBuilder().build(),
(String key, ProtoClass.Trade trade,ProtoClass.OHLC ohlc) -> ExtendedOHLC.add(trade,key,ohlc),
Materialized.as(stateStoreName)
.withCachingEnabled()
.withKeySerde(Serdes.String())
.withValueSerde(ProtobufSerdes.OHLC())
);
< /code>
и
my config: < /p>
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BootstrapServer);
streamsConfiguration.put(StreamsConfig.APPLICATION_SERVER_CONFIG, appServerConfig);
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/app/kafka-stream");
streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 3);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
streamsConfiguration.put(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, 2);
streamsConfiguration.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
< /code>
Мой сервисный уровень: < /p>
public List getRange(String symbol, long from, long to) {
Instant fromTime = Instant.ofEpochMilli(convertEpochTimeToLocal(from));
Instant toTime = Instant.ofEpochMilli(convertEpochTimeToLocal(to));
KeyQueryMetadata metadata = streams.queryMetadataForKey(stateStoreName, symbol, Serdes.String().serializer());
if (hostInfo.equals(metadata.activeHost()))
return fetchCandlesLocally(symbol, fromTime, toTime);
else
return fetchCandlesRemotelyGrpc(symbol, from, to, metadata.activeHost());
//return fetchCandlesRemotely(symbol, from, to, metadata.activeHost());
}
private List fetchCandlesLocally(String symbol, Instant fromTime, Instant toTime) {
readOnlyWindowStore = streams.store(StoreQueryParameters.fromNameAndType(stateStoreName, QueryableStoreTypes.windowStore()));
List candles = new ArrayList();
try (WindowStoreIterator
range = readOnlyWindowStore.fetch(symbol, fromTime, toTime)) {
if (range == null)
return candles;
while (range.hasNext()) {
KeyValue next = range.next();
if (next.value != null) {
Candle candle = createCandle(symbol, next.value,next.key);
candles.add(candle);
}
}
}
return candles;
}
< /code>
Моя проблема заключается в том, что у меня есть много входных данных для обработки и приблизительно 700 запросов в секунду на уровне обслуживания значительно увеличило время отклика. У меня есть 24 перегородка, и у меня есть 12 экземпляров. Почему эта проблема? И каково ваше предложение по решению этой проблемы?
Подробнее здесь: https://stackoverflow.com/questions/795 ... ka-streams
Уменьшить время отклика в потоках кафки ⇐ JAVA
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
SkiaSharp — уменьшить изображение и уменьшить качество (и, следовательно, размер изображения)
Anonymous » » в форуме C# - 0 Ответы
- 33 Просмотры
-
Последнее сообщение Anonymous
-