Flink CDC/ MySQL: сбой связи связи после произвольной продолжительностиMySql

Форум по Mysql
Ответить
Anonymous
 Flink CDC/ MySQL: сбой связи связи после произвольной продолжительности

Сообщение Anonymous »

Я реплицирую базу данных MySQL в Starocks, используя Flink CDC. Работа на Flink запускается правильно и передает данные, как и ожидалось , но после произвольного времени (в диапазоне от 10 минут до 12 часов) они случайно проваливают с приведенным ниже сбоем связи . Я попытался увеличить значение тайм -аута подключения работы Flink и использовал совершенно разные сетевые интерфейсы, но я получаю одинаковую ошибку. Это разочаровывает, потому что это происходит на начальном этапе снимка. Таким образом, задания просто постоянно перезагружается без каждую репликацию базы данных.2025-09-16 16:59:15
java.lang.RuntimeException: One or more fetchers have encountered exception
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:333)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:228)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:190)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:443)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.RuntimeException: SplitFetcher thread 414 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
... 1 more
Caused by: io.debezium.DebeziumException: Error reading MySQL variables: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
at io.debezium.connector.mysql.MySqlConnection.querySystemVariables(MySqlConnection.java:162)
at io.debezium.connector.mysql.MySqlConnection.readMySqlSystemVariables(MySqlConnection.java:140)
at io.debezium.connector.mysql.MySqlConnection.isTableIdCaseSensitive(MySqlConnection.java:502)
at org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.configure(StatefulTaskContext.java:114)
at org.apache.flink.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.submitSplit(SnapshotSplitReader.java:134)
at org.apache.flink.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.submitSplit(SnapshotSplitReader.java:77)
at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSplitReader.pollSplitRecords(MySqlSplitReader.java:118)
at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:84)
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
... 6 more
Caused by: com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link failure
...

Below it the setup I am using:
Source database:
  • MySQL 8.0.43 in a Podman container on a Debian 13 VM that runs on Proxmox
Flink Сервер:
  • debian 13
  • Flink CDC 3.4.0 и Flink 1.20.2, в контейнере для подма />Network:

    NIC 1: 25Gb/s Connectx 4-LX cards (PCIe pass-through on the Proxmox machine)
  • NIC 2: Onboard 1Gb/s interfaces (virtio on Proxmox machine)
shrong>/opt/flink/conf/config.yaml
blob.server.port: '6124'

taskmanager.bind-host: 0.0.0.0
taskmanager.memory.process.size: 200g
taskmanager.memory.managed.size: 1g
taskmanager.memory.network.max: 1g
taskmanager.numberOfTaskSlots: 1

jobmanager.bind-host: 0.0.0.0
jobmanager.execution.failover-strategy: region
jobmanager.memory.process.size: 100g
jobmanager.rpc.address: 10.89.100.40
jobmanager.rpc.port: '6123'

query.server.port: '6125'

rest.address: 10.89.100.40
rest.bind-address: 0.0.0.0
rest.port: '8081'

parallelism.default: 1

state.backend.type: 'hashmap'

execution.checkpointing.incremental: true
execution.checkpointing.interval: 5m
execution.checkpointing.min-pause: 5m
execution.checkpointing.timeout: 1h
execution.checkpointing.storage: 'filesystem'
execution.checkpointing.dir: 'file:///opt/flink/checkpoints/'
execution.checkpointing.savepoint-dir: 'file:///opt/flink/savepoints/'

env.java.opts.all: --add-exports=java.base/sun.net.util=ALL-UNNAMED --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED

pekko.ask.timeout: 10m
heartbeat.timeout: 10m


flink_replication_job.yaml
pipeline:
name: Sync all MySQL tables to StarRocks
parallelism: 1

source:
type: mysql
hostname: 192.168.1.1
port: 3306
username: flink
password: "$FLINK_MYSQL_PASSWORD"
tables: db.\.*
jdbc.properties.allowPublicKeyRetrieval: "true"
server-id: 5400
scan.incremental.snapshot.chunk.size: 65536 # 8192
scan.binlog.newly-added-table.enabled: "true"
connect.timeout: 1800s
connect.max-retries: 10
connection.pool.size: 10

sink:
type: starrocks
name: StarRocks Sink
jdbc-url: jdbc:mysql://10.89.100.50:9030
load-url: http://10.89.100.50:8030
username: flink
password: "$FLINK_STARROCKS_PASSWORD"
table.create.properties.replication_num: 1
table.create.properties.fast_schema_evolution: "true"
sink.properties.timeout: 1200


my.cnf
[mysqld]
skip-name-resolve
datadir = /var/lib/mysql
socket = /var/lib/mysql/mysql.sock
secure-file-priv = /var/lib/mysql-files
pid-file = /var/run/mysqld/mysqld.pid
tmpdir = /tmp

bind-address = 0.0.0.0

log_bin = ON
general_log = OFF
general_log_file = /var/lib/mysql/log_general
slow_query_log = ON
slow_query_log_file = /var/lib/mysql/log_slow_query

default_storage_engine = InnoDB
character_set_server = utf8mb4
collation_server = utf8mb4_bin
lower_case_table_names = 1

max_connections = 50

innodb_file_per_table = ON
innodb_buffer_pool_size = 92G
innodb_redo_log_capacity = 24G
innodb_flush_log_at_trx_commit = 1
innodb_flush_method = O_DIRECT
innodb_log_buffer_size = 1G
innodb_io_capacity = 10000
innodb_io_capacity_max = 20000
innodb_ddl_buffer_size = 4G
innodb_ddl_threads = 10
innodb_parallel_read_threads = 10

temptable_max_ram = 4G

``


Подробнее здесь: https://stackoverflow.com/questions/797 ... y-duration
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «MySql»