У меня есть продюсер:
class RabbitProducer:
_CONNECTION: aio_pika.robust_connection.AbstractRobustConnection | None = None
_CHANNEL: aio_pika.robust_channel.AbstractRobustChannel | None = None
def __init__(
self,
login: str = config.SETTINGS.RABBIT.LOGIN,
password: str = config.SETTINGS.RABBIT.PASSWORD,
cluster_servers: str = config.SETTINGS.RABBIT.CLUSTER_SERVERS,
heartbeat_interval: int = config.SETTINGS.RABBIT.HEARTBEAT_INTERVAL,
):
self.login = login
self.password = password
self.heartbeat_interval = heartbeat_interval
self.cluster_servers = cluster_servers.split(",")
self.cluster_addresses = self._create_connection_addresses()
def _create_connection_addresses(self):
servers = [server.split(":") for server in self.cluster_servers]
addresses = [f'amqp://{self.login}:{self.password}@{server[0]}:{server[1]}/?heartbeat={self.heartbeat_interval}'
for server in servers]
return addresses
async def _on_connection_restore(self, *args) -> None:
LOGGER.info(f"------ Connection restoring: {args} ------")
self._CONNECTION = None
self._CHANNEL = None
await self.connect()
async def _create_connection(self) -> None:
if self._CONNECTION is None:
for addr in itertools.cycle(self.cluster_addresses):
try:
self._CONNECTION = await aio_pika.connect_robust(addr)
self._CONNECTION.close_callbacks.add(self._on_connection_restore)
break
except aio_pika.exceptions.AMQPException as error:
LOGGER.warning(f"------ Rabbit producer connection error: {error} -----")
LOGGER.warning(f"------ trying to connect to the next node in the rabbitMQ cluster ------")
continue
async def _create_channel(self) -> None:
if self._CHANNEL is None:
self._CHANNEL = await self._CONNECTION.channel()
self._CHANNEL.close_callbacks.add(self._on_close_channel)
async def connect(self) -> None:
try:
await self._create_connection()
await self._create_channel()
except aio_pika.exceptions.AMQPException as error:
LOGGER.error(f"------ Rabbit producer connection error: {error} ------")
await self.disconnect()
async def publish(self, routing_key: str, message: dict) -> None:
await self._CHANNEL.default_exchange.publish(
routing_key=routing_key,
message=aio_pika.Message(body=orjson.dumps(message)),
)
RABBIT_PRODUCER = RabbitProducer()
Я не уверен, почему, когда я отключаю контейнер с узлом кролика, к которому подключен производитель, мой код не переходит в _on_connection_restore()?Когда я выключаю контейнер с работающим узлом, я получаю это в журналах. У меня есть аналогичный потребитель, в котором я могу обработать внезапную остановку узла-кролика.
Неожиданное закрытие соединения с удаленного "amqp://guest:**" ****@localhost:56721/?heartbeat=10", Connection.Close(reply_code=320, Reply_text="CONNECTION_FORCED — брокер принудительно закрывает соединение с причиной "завершение работы"")
NoneType: None
Мой потребитель, который обрабатывает стоповый узел RabbitMQ в кластере:
class RabbitMQConsumer(consumer.AbstractConsumer):
_CONNECTION: aio_pika.connection.AbstractConnection | None = None
_CHANNEL: aio_pika.channel.AbstractChannel | None = None
_QUEUE: aio_pika.queue.AbstractQueue | None = None
_PROCESS_MESSAGE_CALLBACK = None
def __init__(
self,
cluster_servers: str,
login: str,
password: str,
queue_name: str,
heartbeat_interval: int,
) -> None:
self.cluster_servers = cluster_servers.split(",")
self.login = login
self.password = password
self.heartbeat_interval = heartbeat_interval
self.queue_name = queue_name
self.cluster_addresses = self._create_connection_addresses()
def _create_connection_addresses(self):
servers = [server.split(":") for server in self.cluster_servers]
addresses = [f'amqp://{self.login}:{self.password}@{server[0]}:{server[1]}/?heartbeat={self.heartbeat_interval}'
for server in servers]
return addresses
async def _on_connection_restore(self, *args) -> None:
self._CHANNEL = None
self._QUEUE = None
self._CONNECTION = None
await self._create_connection()
await self.start()
await self.consume(self._PROCESS_MESSAGE_CALLBACK)
async def _on_close_channel(self, *args) -> None:
await self._CHANNEL.reopen()
async def _create_connection(self) -> None:
if self._CONNECTION is None:
for addr in itertools.cycle(self.cluster_addresses):
try:
# self._CONNECTION = await aio_pika.connect_robust(addr)
self._CONNECTION = await aio_pika.connect(addr)
self._CONNECTION.close_callbacks.add(self._on_connection_restore)
break
except aio_pika.exceptions.AMQPException as error:
LOGGER.warning(f"------ Rabbit producer connection error: {error} -----")
LOGGER.warning(f"------ trying to connect to the next node in the rabbitMQ cluster ------")
continue
async def start(self) -> None:
LOGGER.info("------ RabbitMQ consumer starting! ------")
try:
await self._create_connection()
await self._create_channel()
await self._create_queue()
LOGGER.info("------ RabbitMQ consumer started! ------")
except aio_pika.exceptions.AMQPException as error:
LOGGER.error(f"------ Rabbit consumer connection error: {error} ------")
await self.stop()
@staticmethod
def _wrap_message_callback(process_message_callback: typing.Callable) -> typing.Callable:
async def _message_callback(message: aio_pika.IncomingMessage):
async with message.process(requeue=True):
game_match_event = schemas.GameMatchMessage(**orjson.loads(message.body))
result = await process_message_callback(game_match_event)
return result
return _message_callback
async def consume(
self, process_message_callback: typing.Callable[[schemas.GameMatchMessage], typing.Any]
) -> None:
self._PROCESS_MESSAGE_CALLBACK = process_message_callback
message_callback = self._wrap_message_callback(self._PROCESS_MESSAGE_CALLBACK)
await self._QUEUE.consume(message_callback)
await asyncio.Future()```
I need my consumer to reconnect to the next running node if the one he was connected to before suddenly went offline
Подробнее здесь: https://stackoverflow.com/questions/790 ... a-aio-pika
Как повторно подключиться к RabbitMQ через aio-pika ⇐ Python
Программы на Python
-
Anonymous
1728488142
Anonymous
У меня есть продюсер:
class RabbitProducer:
_CONNECTION: aio_pika.robust_connection.AbstractRobustConnection | None = None
_CHANNEL: aio_pika.robust_channel.AbstractRobustChannel | None = None
def __init__(
self,
login: str = config.SETTINGS.RABBIT.LOGIN,
password: str = config.SETTINGS.RABBIT.PASSWORD,
cluster_servers: str = config.SETTINGS.RABBIT.CLUSTER_SERVERS,
heartbeat_interval: int = config.SETTINGS.RABBIT.HEARTBEAT_INTERVAL,
):
self.login = login
self.password = password
self.heartbeat_interval = heartbeat_interval
self.cluster_servers = cluster_servers.split(",")
self.cluster_addresses = self._create_connection_addresses()
def _create_connection_addresses(self):
servers = [server.split(":") for server in self.cluster_servers]
addresses = [f'amqp://{self.login}:{self.password}@{server[0]}:{server[1]}/?heartbeat={self.heartbeat_interval}'
for server in servers]
return addresses
async def _on_connection_restore(self, *args) -> None:
LOGGER.info(f"------ Connection restoring: {args} ------")
self._CONNECTION = None
self._CHANNEL = None
await self.connect()
async def _create_connection(self) -> None:
if self._CONNECTION is None:
for addr in itertools.cycle(self.cluster_addresses):
try:
self._CONNECTION = await aio_pika.connect_robust(addr)
self._CONNECTION.close_callbacks.add(self._on_connection_restore)
break
except aio_pika.exceptions.AMQPException as error:
LOGGER.warning(f"------ Rabbit producer connection error: {error} -----")
LOGGER.warning(f"------ trying to connect to the next node in the rabbitMQ cluster ------")
continue
async def _create_channel(self) -> None:
if self._CHANNEL is None:
self._CHANNEL = await self._CONNECTION.channel()
self._CHANNEL.close_callbacks.add(self._on_close_channel)
async def connect(self) -> None:
try:
await self._create_connection()
await self._create_channel()
except aio_pika.exceptions.AMQPException as error:
LOGGER.error(f"------ Rabbit producer connection error: {error} ------")
await self.disconnect()
async def publish(self, routing_key: str, message: dict) -> None:
await self._CHANNEL.default_exchange.publish(
routing_key=routing_key,
message=aio_pika.Message(body=orjson.dumps(message)),
)
RABBIT_PRODUCER = RabbitProducer()
Я не уверен, почему, когда я отключаю контейнер с узлом кролика, к которому подключен производитель, мой код не переходит в _on_connection_restore()?Когда я выключаю контейнер с работающим узлом, я получаю это в журналах. У меня есть аналогичный потребитель, в котором я могу обработать внезапную остановку узла-кролика.
Неожиданное закрытие соединения с удаленного "amqp://guest:**" ****@localhost:56721/?heartbeat=10", Connection.Close(reply_code=320, Reply_text="CONNECTION_FORCED — брокер принудительно закрывает соединение с причиной "завершение работы"")
NoneType: None
Мой потребитель, который обрабатывает стоповый узел RabbitMQ в кластере:
class RabbitMQConsumer(consumer.AbstractConsumer):
_CONNECTION: aio_pika.connection.AbstractConnection | None = None
_CHANNEL: aio_pika.channel.AbstractChannel | None = None
_QUEUE: aio_pika.queue.AbstractQueue | None = None
_PROCESS_MESSAGE_CALLBACK = None
def __init__(
self,
cluster_servers: str,
login: str,
password: str,
queue_name: str,
heartbeat_interval: int,
) -> None:
self.cluster_servers = cluster_servers.split(",")
self.login = login
self.password = password
self.heartbeat_interval = heartbeat_interval
self.queue_name = queue_name
self.cluster_addresses = self._create_connection_addresses()
def _create_connection_addresses(self):
servers = [server.split(":") for server in self.cluster_servers]
addresses = [f'amqp://{self.login}:{self.password}@{server[0]}:{server[1]}/?heartbeat={self.heartbeat_interval}'
for server in servers]
return addresses
async def _on_connection_restore(self, *args) -> None:
self._CHANNEL = None
self._QUEUE = None
self._CONNECTION = None
await self._create_connection()
await self.start()
await self.consume(self._PROCESS_MESSAGE_CALLBACK)
async def _on_close_channel(self, *args) -> None:
await self._CHANNEL.reopen()
async def _create_connection(self) -> None:
if self._CONNECTION is None:
for addr in itertools.cycle(self.cluster_addresses):
try:
# self._CONNECTION = await aio_pika.connect_robust(addr)
self._CONNECTION = await aio_pika.connect(addr)
self._CONNECTION.close_callbacks.add(self._on_connection_restore)
break
except aio_pika.exceptions.AMQPException as error:
LOGGER.warning(f"------ Rabbit producer connection error: {error} -----")
LOGGER.warning(f"------ trying to connect to the next node in the rabbitMQ cluster ------")
continue
async def start(self) -> None:
LOGGER.info("------ RabbitMQ consumer starting! ------")
try:
await self._create_connection()
await self._create_channel()
await self._create_queue()
LOGGER.info("------ RabbitMQ consumer started! ------")
except aio_pika.exceptions.AMQPException as error:
LOGGER.error(f"------ Rabbit consumer connection error: {error} ------")
await self.stop()
@staticmethod
def _wrap_message_callback(process_message_callback: typing.Callable) -> typing.Callable:
async def _message_callback(message: aio_pika.IncomingMessage):
async with message.process(requeue=True):
game_match_event = schemas.GameMatchMessage(**orjson.loads(message.body))
result = await process_message_callback(game_match_event)
return result
return _message_callback
async def consume(
self, process_message_callback: typing.Callable[[schemas.GameMatchMessage], typing.Any]
) -> None:
self._PROCESS_MESSAGE_CALLBACK = process_message_callback
message_callback = self._wrap_message_callback(self._PROCESS_MESSAGE_CALLBACK)
await self._QUEUE.consume(message_callback)
await asyncio.Future()```
I need my consumer to reconnect to the next running node if the one he was connected to before suddenly went offline
Подробнее здесь: [url]https://stackoverflow.com/questions/79070969/how-to-reconnect-to-rabbitmq-via-aio-pika[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия