MySQL-репликация | BinLogStreamReader не возобновляет работу с последней позиции (log_file, log_pos не работает)Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 MySQL-репликация | BinLogStreamReader не возобновляет работу с последней позиции (log_file, log_pos не работает)

Сообщение Anonymous »

Я столкнулся с проблемой: BinLogStreamReader не возобновляет работу с последней позиции.
У меня жестко закодирована позиция 1146, но процесс занимает 875 > позиция тоже. Поток журнала должен возобновиться с позиции >=1146.

Код: Выделить всё

    stream = BinLogStreamReader(
connection_settings=config,
server_id=1,
only_events=[WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent],
is_mariadb=True,
blocking=True,
resume_stream=True,
# log_file=log_file,
# log_pos=log_pos,
log_file="mysqld-bin.000003",
log_pos=1146,
)
Версии:
Версия MariaDB: 10.3.39
Версия пакета: mysql- replication=="0.45.1" (Работает с версией MariaDB 10.3.39)
my.cnf:

Код: Выделить всё

[mysqld]
log-bin
binlog-format=ROW
server-id=1
binlog_row_image=FULL
Журналы процессов:

Код: Выделить всё

INFO:root:Resuming from mysqld-bin.000003:1146
INFO:root:Starting BinLogStreamReader with log_file=mysqld-bin.000003 and log_pos=1146
DEBUG:root:BinLogStreamReader initialized with log_file=mysqld-bin.000003 and log_pos=1146
INFO:root: -> Insert into mydatabase.example_table: {'id': 1, 'name': 'Alice', 'created_at': datetime.datetime(2024, 6, 28, 0, 34, 17), '_deleted': False}
INFO:root:Saved logs: mysqld-bin.000003:875
INFO:root: -> Insert into mydatabase.example_table: {'id': 2, 'name': 'Alice', 'created_at': datetime.datetime(2024, 6, 28, 0, 56, 39), '_deleted': False}
INFO:root:Saved logs: mysqld-bin.000003:1146
Код Python:

Код: Выделить всё

import time
# mysql-replication=="0.45.1"
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent
from pymysqlreplication.event import RotateEvent
import logging

# Database connection parameters
config = {
'host': 'localhost',
'port': 3307,
'user': 'root',
'password': 'rootpassword',
'database': 'mydatabase'
}

# Setup logging
logging.basicConfig(level=logging.DEBUG)

def save_binlog_position(log_file, log_pos):
with open("binlog_position.txt", "w") as f:
f.write(f"{log_file},{log_pos}")
logging.info(f"Saved logs: {log_file}:{log_pos}")

def load_binlog_position():
try:
with open("binlog_position.txt", "r") as f:
log_file, log_pos = f.read().strip().split(',')
logging.info(f"Resuming from {log_file}:{log_pos}")
return log_file, int(log_pos)
except FileNotFoundError:
logging.info("Starting from beginning")
return None, None

def get_values_from_logs(stream):
for event in stream:
table_name = event.table
if isinstance(event, WriteRowsEvent):
for row in event.rows:
row['values'].update({"_deleted": False})
logging.info(f" -> Insert into {event.schema}.{event.table}: {row['values']}")
elif isinstance(event, UpdateRowsEvent):
for row in event.rows:
row['after_values'].update({"_deleted": False})
logging.info(
f" -> Update {event.schema}.{event.table}: {row['before_values']} -> {row['after_values']}")
elif isinstance(event, DeleteRowsEvent):
for row in event.rows:
row['values'].update({"_deleted": True})
logging.info(f" ->  Delete from {event.schema}.{event.table}: {row['values']}")
log_file, log_pos = stream.log_file, stream.log_pos
save_binlog_position(log_file, log_pos)

# Function to parse binary log events
def parse_binlog_events():
log_file, log_pos = load_binlog_position()

logging.info(f"Starting BinLogStreamReader with log_file={log_file} and log_pos={log_pos}")

stream = BinLogStreamReader(
connection_settings=config,
server_id=1,
only_events=[WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent],
is_mariadb=True,
blocking=True,
resume_stream=True,
# log_file=log_file,
# log_pos=log_pos,
log_file="mysqld-bin.000003",
log_pos=1146,
)

logging.debug(f"BinLogStreamReader initialized with log_file={stream.log_file} and log_pos={stream.log_pos}")
get_values_from_logs(stream)
stream.close()

if __name__ == "__main__":
parse_binlog_events()
Docker-compose:

Код: Выделить всё

services:
mariadb:
image: mariadb:10.3.39
ports:
- "3307:3306"
environment:
MYSQL_ROOT_PASSWORD: rootpassword
MYSQL_DATABASE: mydatabase
MYSQL_USER: myuser
MYSQL_PASSWORD: mypassword
volumes:
- ./mariadb/my.cnf:/etc/mysql/conf.d/my.cnf
Кто-нибудь может помочь? Заранее спасибо!

Подробнее здесь: https://stackoverflow.com/questions/786 ... tion-log-f
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • React Native File File File File Error Type Ошибка ошибки
    Anonymous » » в форуме Android
    0 Ответы
    58 Просмотры
    Последнее сообщение Anonymous
  • React Native File File File File Error Type Ошибка ошибки
    Anonymous » » в форуме Android
    0 Ответы
    18 Просмотры
    Последнее сообщение Anonymous
  • React Native File File File File Error Type Ошибка ошибки
    Anonymous » » в форуме Android
    0 Ответы
    16 Просмотры
    Последнее сообщение Anonymous
  • Поле добавления POS в проблеме pos.config
    Anonymous » » в форуме Python
    0 Ответы
    46 Просмотры
    Последнее сообщение Anonymous
  • Есть ли разница между «file.readlines()», «list(file)» и «file.read().splitlines(True)»?
    Anonymous » » в форуме Python
    0 Ответы
    157 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»