Как повторно подключиться к RabbitMQ через aio-pikaPython

Программы на Python
Ответить
Anonymous
 Как повторно подключиться к RabbitMQ через aio-pika

Сообщение Anonymous »

У меня есть продюсер:
class RabbitProducer:
_CONNECTION: aio_pika.robust_connection.AbstractRobustConnection | None = None
_CHANNEL: aio_pika.robust_channel.AbstractRobustChannel | None = None

def __init__(
self,
login: str = config.SETTINGS.RABBIT.LOGIN,
password: str = config.SETTINGS.RABBIT.PASSWORD,
cluster_servers: str = config.SETTINGS.RABBIT.CLUSTER_SERVERS,
heartbeat_interval: int = config.SETTINGS.RABBIT.HEARTBEAT_INTERVAL,
):
self.login = login
self.password = password
self.heartbeat_interval = heartbeat_interval
self.cluster_servers = cluster_servers.split(",")
self.cluster_addresses = self._create_connection_addresses()

def _create_connection_addresses(self):
servers = [server.split(":") for server in self.cluster_servers]
addresses = [f'amqp://{self.login}:{self.password}@{server[0]}:{server[1]}/?heartbeat={self.heartbeat_interval}'
for server in servers]
return addresses

async def _on_connection_restore(self, *args) -> None:
LOGGER.info(f"------ Connection restoring: {args} ------")

self._CONNECTION = None
self._CHANNEL = None
await self.connect()

async def _create_connection(self) -> None:
if self._CONNECTION is None:
for addr in itertools.cycle(self.cluster_addresses):
try:
self._CONNECTION = await aio_pika.connect_robust(addr)
self._CONNECTION.close_callbacks.add(self._on_connection_restore)
break
except aio_pika.exceptions.AMQPException as error:
LOGGER.warning(f"------ Rabbit producer connection error: {error} -----")
LOGGER.warning(f"------ trying to connect to the next node in the rabbitMQ cluster ------")
continue

async def _create_channel(self) -> None:
if self._CHANNEL is None:
self._CHANNEL = await self._CONNECTION.channel()
self._CHANNEL.close_callbacks.add(self._on_close_channel)

async def connect(self) -> None:
try:
await self._create_connection()
await self._create_channel()
except aio_pika.exceptions.AMQPException as error:
LOGGER.error(f"------ Rabbit producer connection error: {error} ------")
await self.disconnect()

async def publish(self, routing_key: str, message: dict) -> None:
await self._CHANNEL.default_exchange.publish(
routing_key=routing_key,
message=aio_pika.Message(body=orjson.dumps(message)),
)

RABBIT_PRODUCER = RabbitProducer()

Я не уверен, почему, когда я отключаю контейнер с узлом кролика, к которому подключен производитель, мой код не переходит в _on_connection_restore()?Когда я выключаю контейнер с работающим узлом, я получаю это в журналах. У меня есть аналогичный потребитель, в котором я могу обработать внезапную остановку узла-кролика.

Неожиданное закрытие соединения с удаленного "amqp://guest:**" ****@localhost:56721/?heartbeat=10", Connection.Close(reply_code=320, Reply_text="CONNECTION_FORCED — брокер принудительно закрывает соединение с причиной "завершение работы"")
NoneType: None

Мой потребитель, который обрабатывает стоповый узел RabbitMQ в кластере:
class RabbitMQConsumer(consumer.AbstractConsumer):
_CONNECTION: aio_pika.connection.AbstractConnection | None = None
_CHANNEL: aio_pika.channel.AbstractChannel | None = None
_QUEUE: aio_pika.queue.AbstractQueue | None = None
_PROCESS_MESSAGE_CALLBACK = None

def __init__(
self,
cluster_servers: str,
login: str,
password: str,
queue_name: str,
heartbeat_interval: int,
) -> None:
self.cluster_servers = cluster_servers.split(",")
self.login = login
self.password = password
self.heartbeat_interval = heartbeat_interval
self.queue_name = queue_name
self.cluster_addresses = self._create_connection_addresses()

def _create_connection_addresses(self):
servers = [server.split(":") for server in self.cluster_servers]
addresses = [f'amqp://{self.login}:{self.password}@{server[0]}:{server[1]}/?heartbeat={self.heartbeat_interval}'
for server in servers]
return addresses

async def _on_connection_restore(self, *args) -> None:
self._CHANNEL = None
self._QUEUE = None
self._CONNECTION = None

await self._create_connection()
await self.start()
await self.consume(self._PROCESS_MESSAGE_CALLBACK)

async def _on_close_channel(self, *args) -> None:
await self._CHANNEL.reopen()

async def _create_connection(self) -> None:
if self._CONNECTION is None:
for addr in itertools.cycle(self.cluster_addresses):
try:
# self._CONNECTION = await aio_pika.connect_robust(addr)
self._CONNECTION = await aio_pika.connect(addr)
self._CONNECTION.close_callbacks.add(self._on_connection_restore)
break
except aio_pika.exceptions.AMQPException as error:
LOGGER.warning(f"------ Rabbit producer connection error: {error} -----")
LOGGER.warning(f"------ trying to connect to the next node in the rabbitMQ cluster ------")
continue

async def start(self) -> None:
LOGGER.info("------ RabbitMQ consumer starting! ------")

try:
await self._create_connection()
await self._create_channel()
await self._create_queue()

LOGGER.info("------ RabbitMQ consumer started! ------")
except aio_pika.exceptions.AMQPException as error:
LOGGER.error(f"------ Rabbit consumer connection error: {error} ------")
await self.stop()

@staticmethod
def _wrap_message_callback(process_message_callback: typing.Callable) -> typing.Callable:
async def _message_callback(message: aio_pika.IncomingMessage):
async with message.process(requeue=True):
game_match_event = schemas.GameMatchMessage(**orjson.loads(message.body))
result = await process_message_callback(game_match_event)
return result

return _message_callback

async def consume(
self, process_message_callback: typing.Callable[[schemas.GameMatchMessage], typing.Any]
) -> None:
self._PROCESS_MESSAGE_CALLBACK = process_message_callback
message_callback = self._wrap_message_callback(self._PROCESS_MESSAGE_CALLBACK)
await self._QUEUE.consume(message_callback)
await asyncio.Future()```

I need my consumer to reconnect to the next running node if the one he was connected to before suddenly went offline


Подробнее здесь: https://stackoverflow.com/questions/790 ... a-aio-pika
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»