Внутри этого проекта один раз в день выполняется только одно запланированное задание. p>
Задание содержит следующие действия:
Сканировать все элементы из таблицы dynamoDB (в этой таблице 1 миллион элементов и каждый элемент имеет размер 40 КБ)< /p>
Преобразуйте каждый элемент в новый объект
Загрузите эти преобразованные объекты в корзину AWS S3 как один JSON-файл
После проверки файла дампа основная причина заключается в том, что слишком много объектов Map не освобождаются. Поскольку количество элементов в таблице слишком велико, поэтому он продолжает сканировать элементы (и, я думаю, не освобождает их), и в конечном итоге область кучи перегружается и выдает OOM.
Я пробовал другие подходы к обработке данных частями с использованием Mono.buffer() или с использованием класса AtomicInteger для управления потоком, продолжающим сканирование элементов. Но ни один из них не работает.
Я ожидаю: может ли это задание продолжать обрабатывать элементы частями (например, 1000 за раз)? Например:
извлечь 1000 элементов -> преобразовать 1000 элементов -> элементы многочастной загрузки -> конец процесса фрагмента -> начать еще одну выборку 1000 элементов...
и убедитесь, что память освобождается после каждого подпроцесса.
Для воспроизведения:
Код: Выделить всё
@Scheduled(cron = "0 0 10 * * ?")
public void job() {
Flux.range(0, 8)
.flatMap((segment) -> scanningDB(segment, 8, null))
.collectList()
.subscribe();
}
private Flux scanningDB(Integer segment, int totalSegment, Map lastKey) {
return Mono.defer(() -> {
ScanRequest scanRequest = ScanRequest.builder()
.segment(segment)
.totalSegments(totalSegment)
.tableName(TABLE_NAME)
.exclusiveStartKey(lastKey)
.build();
return Mono.fromFuture(dynamoClient.scan(scanRequest));
})
.map((scanResponse) -> {
transformAndUpload(scanResponse.items()).subscribe();
return scanResponse;
})
.filter(ScanResponse::hasLastEvaluatedKey)
.flatMapMany((scanResponse) -> scanningDB(segment, totalSegment, scanResponse.lastEvaluatedKey()));
}
private Mono transformAndUpload(List items) {
return Flux.fromIterable(items)
.map(this::transformItems)
.collectList()
.flatMap(this::uploadToS3);
}
Подробнее здесь: https://stackoverflow.com/questions/787 ... tering-oom
Мобильная версия