Spring WebFlux загружает большой объем данных, не сталкиваясь с OOMJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Spring WebFlux загружает большой объем данных, не сталкиваясь с OOM

Сообщение Anonymous »

Я новичок в Spring WebFlux, недавно я столкнулся с OOM в проекте Spring WebFlux.
Внутри этого проекта один раз в день выполняется только одно запланированное задание. p>
Задание содержит следующие действия:
Сканировать все элементы из таблицы dynamoDB (в этой таблице 1 миллион элементов и каждый элемент имеет размер 40 КБ)< /p>
Преобразуйте каждый элемент в новый объект
Загрузите эти преобразованные объекты в корзину AWS S3 как один JSON-файл
После проверки файла дампа основная причина заключается в том, что слишком много объектов Map не освобождаются. Поскольку количество элементов в таблице слишком велико, поэтому он продолжает сканировать элементы (и, я думаю, не освобождает их), и в конечном итоге область кучи перегружается и выдает OOM.
Я пробовал другие подходы к обработке данных частями с использованием Mono.buffer() или с использованием класса AtomicInteger для управления потоком, продолжающим сканирование элементов. Но ни один из них не работает.
Я ожидаю: может ли это задание продолжать обрабатывать элементы частями (например, 1000 за раз)? Например:
извлечь 1000 элементов -> преобразовать 1000 элементов -> элементы многочастной загрузки -> конец процесса фрагмента -> начать еще одну выборку 1000 элементов...
и убедитесь, что память освобождается после каждого подпроцесса.
Для воспроизведения:

Код: Выделить всё

    @Scheduled(cron = "0 0 10 * * ?")
public void job() {
Flux.range(0, 8)
.flatMap((segment) -> scanningDB(segment, 8, null))
.collectList()
.subscribe();
}

private Flux scanningDB(Integer segment, int totalSegment, Map lastKey) {
return Mono.defer(() -> {
ScanRequest scanRequest = ScanRequest.builder()
.segment(segment)
.totalSegments(totalSegment)
.tableName(TABLE_NAME)
.exclusiveStartKey(lastKey)
.build();
return Mono.fromFuture(dynamoClient.scan(scanRequest));
})
.map((scanResponse) -> {
transformAndUpload(scanResponse.items()).subscribe();
return scanResponse;
})
.filter(ScanResponse::hasLastEvaluatedKey)
.flatMapMany((scanResponse) -> scanningDB(segment, totalSegment, scanResponse.lastEvaluatedKey()));
}

private Mono transformAndUpload(List items) {
return Flux.fromIterable(items)
.map(this::transformItems)
.collectList()
.flatMap(this::uploadToS3);
}
Я пропущу часть преобразования и S3Upload, поскольку основной причиной является отсутствие слишком большого количества объектов (software.amazon.awssdk.services.dynamodb.model.AttributeValue) выпущен

Подробнее здесь: https://stackoverflow.com/questions/787 ... tering-oom
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»