Прямая буферная память Flink Job Manager исчерпывается при включении контрольных точекJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Прямая буферная память Flink Job Manager исчерпывается при включении контрольных точек

Сообщение Anonymous »

Проблема:
  • Приложение Flink выдает поток «jobmanager-io-thread-25», создавший неперехваченное исключение. java.lang.OutOfMemoryError: Прямая буферная память и завершается после работы в течение 2–3 дней.
  • Независимо от того, насколько увеличена прямая буферная память, она со временем исчерпывается (она просто остается дольше, но в конечном итоге завершается), до сих пор пробовал максимум 16 ГБ.
  • Когда контрольные точки отключены, буферная память не увеличивается и приложение работает. хорошо.
  • Пробовал все виды оптимизации или настройки, предложенные в документации, такие как инкрементальная проверка точек, состояниеChangelog, увеличение DirectBuffer и т. д., но ничего не помогло.
Stack Trace:

Код: Выделить всё

2025-11-09 17:06:56,442 ERROR org.apache.flink.util.FatalExitExceptionHandler              [] - FATAL: Thread 'jobmanager-io-thread-25' produced an uncaught exception. Stopping the process...
java.lang.OutOfMemoryError: Direct buffer memory
at java.base/java.nio.Bits.reserveMemory(Bits.java:175)
at java.base/java.nio.DirectByteBuffer.(DirectByteBuffer.java:118)
at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317)
at java.base/sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:242)
at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:71)
at java.base/sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:280)
at java.base/java.nio.channels.Channels.writeFullyImpl(Channels.java:74)
at java.base/java.nio.channels.Channels.writeFully(Channels.java:97)
at java.base/java.nio.channels.Channels$1.write(Channels.java:172)
at org.apache.flink.core.fs.OffsetAwareOutputStream.write(OffsetAwareOutputStream.java:48)
at org.apache.flink.core.fs.RefCountedFileWithStream.write(RefCountedFileWithStream.java:54)
at org.apache.flink.core.fs.RefCountedBufferingFileStream.write(RefCountedBufferingFileStream.java:88)
at org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.write(S3RecoverableFsDataOutputStream.java:112)
at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.write(FsCheckpointMetadataOutputStream.java:78)
at java.base/java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.base/java.io.FilterOutputStream.write(FilterOutputStream.java:108)
at org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeStreamStateHandle(MetadataV2V3SerializerBase.java:758)
at org.apache.flink.runtime.checkpoint.metadata.MetadataV3Serializer.serializeStreamStateHandle(MetadataV3Serializer.java:264)
at org.apache.flink.runtime.checkpoint.metadata.MetadataV3Serializer.serializeOperatorState(MetadataV3Serializer.java:109)
at org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeMetadata(MetadataV2V3SerializerBase.java:165)
at org.apache.flink.runtime.checkpoint.metadata.MetadataV3Serializer.serialize(MetadataV3Serializer.java:83)
at org.apache.flink.runtime.checkpoint.metadata.MetadataV4Serializer.serialize(MetadataV4Serializer.java:56)
at org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:101)
at org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:88)
at org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:83)
at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:339)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1624)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1518)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1410)
at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$2(ExecutionGraphHandler.java:109)
at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$4(ExecutionGraphHandler.java:139)
Конфигурация контрольной точки:
Изображение

Структура приложения (высокий уровень):
  • Приложение настроено на Flink на AWS EMR. Использование Flink 1.20 в EMR 7.10.0.
  • Имеет 2 файловых источника S3. Читает паркетные файлы, оба неограниченные, непрерывный мониторинг с интервалом обнаружения 1 минута, новые файлы добавляются каждую минуту по новым путям гггг/мм/дд/ми/файлы.
  • обрабатывает и создает объекты из обоих потоков.
  • Объединяет их с помощью keyed-coprocess. функция с TTL 30 минут.
  • Выполнить некоторые операции сопоставления, фильтрации в объединенном потоке.
  • Снова отправить поток результатов в S3 в виде файлов паркета.
Справка:
  • Пожалуйста, помогите мне понять, почему контрольные точки постепенно потребляют такие большие буферы? Даже тогда, почему они не выпускаются?
  • Что именно сохраняется в этой буферной памяти координатором контрольных точек?
  • Как я могу решить эту проблему или применить настройку, чтобы этого не происходило?
  • Каковы мои следующие действия, чтобы попробовать и решить эту проблему?
Прямая память и размер контрольной точки:
Изображение

Утечка встроенной памяти — трассировка стека:
Изображение


Подробнее здесь: https://stackoverflow.com/questions/798 ... ng-enabled
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»