Увеличьте время отклика в потоках кафкиJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Увеличьте время отклика в потоках кафки

Сообщение Anonymous »

У меня есть проект с потоками Kafka, чтобы создать одну минуту свечу по цене для акций. Моя топология: < /p>
List inputTopics = new ArrayList();
inputTopics.add(tradeTopic);
Consumed CandleConsumerOptions = Consumed
.with(Serdes.String(), ProtobufSerdes.Trade())
.withTimestampExtractor(new CandleTimestampExtractor());

KTable ohlcKTable =
stream
.filter((key, Trade) -> Trade.getTradeTime() != 0 && Trade.getTradeTime() > todayMillis)
.groupByKey(
Grouped.with(Serdes.String(),ProtobufSerdes.Trade())
)
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(1)),Duration.ofHours(8)))
.aggregate(
()-> ProtoClass.OHLC.newBuilder().build(),
(String key, ProtoClass.Trade trade,ProtoClass.OHLC ohlc) -> ExtendedOHLC.add(trade,key,ohlc),
Materialized.as(stateStoreName)
.withCachingEnabled()
.withKeySerde(Serdes.String())
.withValueSerde(ProtobufSerdes.OHLC())
);
< /code>
и

my config: < /p>
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BootstrapServer);
streamsConfiguration.put(StreamsConfig.APPLICATION_SERVER_CONFIG, appServerConfig);
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/app/kafka-stream");
streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 3);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
streamsConfiguration.put(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, 2);
streamsConfiguration.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
< /code>
Мой сервисный уровень: < /p>
public List getRange(String symbol, long from, long to) {
Instant fromTime = Instant.ofEpochMilli(convertEpochTimeToLocal(from));
Instant toTime = Instant.ofEpochMilli(convertEpochTimeToLocal(to));

KeyQueryMetadata metadata = streams.queryMetadataForKey(stateStoreName, symbol, Serdes.String().serializer());
if (hostInfo.equals(metadata.activeHost()))
return fetchCandlesLocally(symbol, fromTime, toTime);
else
return fetchCandlesRemotelyGrpc(symbol, from, to, metadata.activeHost());
//return fetchCandlesRemotely(symbol, from, to, metadata.activeHost());

}
private List fetchCandlesLocally(String symbol, Instant fromTime, Instant toTime) {

readOnlyWindowStore = streams.store(StoreQueryParameters.fromNameAndType(stateStoreName, QueryableStoreTypes.windowStore()));

List candles = new ArrayList();
try (WindowStoreIterator
range = readOnlyWindowStore.fetch(symbol, fromTime, toTime)) {
if (range == null)
return candles;

while (range.hasNext()) {
KeyValue next = range.next();
if (next.value != null) {
Candle candle = createCandle(symbol, next.value,next.key);
candles.add(candle);
}
}
}
return candles;
}
< /code>
Моя проблема заключается в том, что у меня есть много входных данных для обработки и приблизительно 700 запросов в секунду на уровне обслуживания значительно увеличило время отклика. У меня есть 24 перегородка, и у меня есть 12 экземпляров. Почему эта проблема? И каково ваше предложение по решению этой проблемы?


Подробнее здесь: https://stackoverflow.com/questions/795 ... ka-streams
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Уменьшить время отклика в потоках кафки
    Anonymous » » в форуме JAVA
    0 Ответы
    6 Просмотры
    Последнее сообщение Anonymous
  • Уменьшить время отклика в потоках кафки
    Anonymous » » в форуме JAVA
    0 Ответы
    8 Просмотры
    Последнее сообщение Anonymous
  • Уменьшить время отклика в потоках кафки
    Anonymous » » в форуме JAVA
    0 Ответы
    6 Просмотры
    Последнее сообщение Anonymous
  • Birt Report: странное время отклика
    Гость » » в форуме JAVA
    0 Ответы
    20 Просмотры
    Последнее сообщение Гость
  • Сократите время отклика для контроля качества документов с помощью Llama-2 7b (8-бит)
    Anonymous » » в форуме Python
    0 Ответы
    18 Просмотры
    Последнее сообщение Anonymous

Вернуться в «JAVA»