Приложение Flink выдает поток «jobmanager-io-thread-25», создавший неперехваченное исключение. java.lang.OutOfMemoryError: Прямая буферная память и завершается после работы в течение 2–3 дней.
Независимо от того, насколько увеличена прямая буферная память, она со временем исчерпывается (она просто остается дольше, но в конечном итоге завершается), до сих пор пробовал максимум 16 ГБ.
Когда контрольные точки отключены, буферная память не увеличивается и приложение работает. хорошо.
Пробовал все виды оптимизации или настройки, предложенные в документации, такие как инкрементальная проверка точек, состояниеChangelog, увеличение DirectBuffer и т. д., но ничего не помогло.
2025-11-09 17:06:56,442 ERROR org.apache.flink.util.FatalExitExceptionHandler [] - FATAL: Thread 'jobmanager-io-thread-25' produced an uncaught exception. Stopping the process...
java.lang.OutOfMemoryError: Direct buffer memory
at java.base/java.nio.Bits.reserveMemory(Bits.java:175)
at java.base/java.nio.DirectByteBuffer.(DirectByteBuffer.java:118)
at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317)
at java.base/sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:242)
at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:71)
at java.base/sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:280)
at java.base/java.nio.channels.Channels.writeFullyImpl(Channels.java:74)
at java.base/java.nio.channels.Channels.writeFully(Channels.java:97)
at java.base/java.nio.channels.Channels$1.write(Channels.java:172)
at org.apache.flink.core.fs.OffsetAwareOutputStream.write(OffsetAwareOutputStream.java:48)
at org.apache.flink.core.fs.RefCountedFileWithStream.write(RefCountedFileWithStream.java:54)
at org.apache.flink.core.fs.RefCountedBufferingFileStream.write(RefCountedBufferingFileStream.java:88)
at org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.write(S3RecoverableFsDataOutputStream.java:112)
at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.write(FsCheckpointMetadataOutputStream.java:78)
at java.base/java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.base/java.io.FilterOutputStream.write(FilterOutputStream.java:108)
at org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeStreamStateHandle(MetadataV2V3SerializerBase.java:758)
at org.apache.flink.runtime.checkpoint.metadata.MetadataV3Serializer.serializeStreamStateHandle(MetadataV3Serializer.java:264)
at org.apache.flink.runtime.checkpoint.metadata.MetadataV3Serializer.serializeOperatorState(MetadataV3Serializer.java:109)
at org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeMetadata(MetadataV2V3SerializerBase.java:165)
at org.apache.flink.runtime.checkpoint.metadata.MetadataV3Serializer.serialize(MetadataV3Serializer.java:83)
at org.apache.flink.runtime.checkpoint.metadata.MetadataV4Serializer.serialize(MetadataV4Serializer.java:56)
at org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:101)
at org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:88)
at org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:83)
at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:339)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1624)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1518)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1410)
at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$2(ExecutionGraphHandler.java:109)
at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$4(ExecutionGraphHandler.java:139)
Конфигурация контрольной точки:
Структура приложения (высокий уровень):
Приложение настроено на Flink на AWS EMR. Использование Flink 1.20 в EMR 7.10.0.
Имеет 2 файловых источника S3. Читает паркетные файлы, оба неограниченные, непрерывный мониторинг с интервалом обнаружения 1 минута, новые файлы добавляются каждую минуту по новым путям гггг/мм/дд/ми/файлы.
обрабатывает и создает объекты из обоих потоков.
Объединяет их с помощью keyed-coprocess. функция с TTL 30 минут.
Выполнить некоторые операции сопоставления, фильтрации в объединенном потоке.
Снова отправить поток результатов в S3 в виде файлов паркета.
Справка:
Пожалуйста, помогите мне понять, почему контрольные точки постепенно потребляют такие большие буферы? Даже тогда, почему они не выпускаются?
Что именно сохраняется в этой буферной памяти координатором контрольных точек?
Как я могу решить эту проблему или применить настройку, чтобы этого не происходило?
Каковы мои следующие действия, чтобы попробовать и решить эту проблему?
Проблема: [list] [*]Приложение Flink выдает поток «jobmanager-io-thread-25», создавший неперехваченное исключение. java.lang.OutOfMemoryError: Прямая буферная память и завершается после работы в течение 2–3 дней.
[*]Независимо от того, насколько увеличена прямая буферная память, она со временем исчерпывается (она просто остается дольше, но в конечном итоге завершается), до сих пор пробовал максимум 16 ГБ.
[*]Когда контрольные точки отключены, буферная память не увеличивается и приложение работает. хорошо.
[*]Пробовал все виды оптимизации или настройки, предложенные в документации, такие как инкрементальная проверка точек, состояниеChangelog, увеличение DirectBuffer и т. д., но ничего не помогло.
[/list] Stack Trace: [code]2025-11-09 17:06:56,442 ERROR org.apache.flink.util.FatalExitExceptionHandler [] - FATAL: Thread 'jobmanager-io-thread-25' produced an uncaught exception. Stopping the process... java.lang.OutOfMemoryError: Direct buffer memory at java.base/java.nio.Bits.reserveMemory(Bits.java:175) at java.base/java.nio.DirectByteBuffer.(DirectByteBuffer.java:118) at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317) at java.base/sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:242) at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:71) at java.base/sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:280) at java.base/java.nio.channels.Channels.writeFullyImpl(Channels.java:74) at java.base/java.nio.channels.Channels.writeFully(Channels.java:97) at java.base/java.nio.channels.Channels$1.write(Channels.java:172) at org.apache.flink.core.fs.OffsetAwareOutputStream.write(OffsetAwareOutputStream.java:48) at org.apache.flink.core.fs.RefCountedFileWithStream.write(RefCountedFileWithStream.java:54) at org.apache.flink.core.fs.RefCountedBufferingFileStream.write(RefCountedBufferingFileStream.java:88) at org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.write(S3RecoverableFsDataOutputStream.java:112) at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.write(FsCheckpointMetadataOutputStream.java:78) at java.base/java.io.DataOutputStream.write(DataOutputStream.java:107) at java.base/java.io.FilterOutputStream.write(FilterOutputStream.java:108) at org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeStreamStateHandle(MetadataV2V3SerializerBase.java:758) at org.apache.flink.runtime.checkpoint.metadata.MetadataV3Serializer.serializeStreamStateHandle(MetadataV3Serializer.java:264) at org.apache.flink.runtime.checkpoint.metadata.MetadataV3Serializer.serializeOperatorState(MetadataV3Serializer.java:109) at org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeMetadata(MetadataV2V3SerializerBase.java:165) at org.apache.flink.runtime.checkpoint.metadata.MetadataV3Serializer.serialize(MetadataV3Serializer.java:83) at org.apache.flink.runtime.checkpoint.metadata.MetadataV4Serializer.serialize(MetadataV4Serializer.java:56) at org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:101) at org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:88) at org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:83) at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:339) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1624) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1518) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1410) at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$2(ExecutionGraphHandler.java:109) at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$4(ExecutionGraphHandler.java:139) [/code] Конфигурация контрольной точки: [img]https://i.sstatic.net/0kvmaaVC.png[/img]
Структура приложения (высокий уровень): [list] [*]Приложение настроено на Flink на AWS EMR. Использование Flink 1.20 в EMR 7.10.0. [*]Имеет 2 файловых источника S3. Читает паркетные файлы, оба неограниченные, непрерывный мониторинг с интервалом обнаружения 1 минута, новые файлы добавляются каждую минуту по новым путям гггг/мм/дд/ми/файлы. [*]обрабатывает и создает объекты из обоих потоков. [*]Объединяет их с помощью keyed-coprocess. функция с TTL 30 минут. [*]Выполнить некоторые операции сопоставления, фильтрации в объединенном потоке. [*]Снова отправить поток результатов в S3 в виде файлов паркета. [/list] Справка: [list] [*]Пожалуйста, помогите мне понять, почему контрольные точки постепенно потребляют такие большие буферы? Даже тогда, почему они не выпускаются? [*]Что именно сохраняется в этой буферной памяти координатором контрольных точек? [*]Как я могу решить эту проблему или применить настройку, чтобы этого не происходило? [*]Каковы мои следующие действия, чтобы попробовать и решить эту проблему? [/list] Прямая память и размер контрольной точки: [img]https://i.sstatic.net/2fg7jmoM.png[/img]
Утечка встроенной памяти — трассировка стека: [img]https://i.sstatic.net/zV068Z5n.png[/img]