Задача обработчика сигнала asyncio завершается до завершения ожидания во время завершения работыPython

Программы на Python
Ответить
Anonymous
 Задача обработчика сигнала asyncio завершается до завершения ожидания во время завершения работы

Сообщение Anonymous »

Я пишу потребитель Python для RabbitMQ Streams, используя асинхронную библиотеку. Я хочу сохранить смещение сообщения на потоковом сервере, когда сценарий остановлен (с помощью SIGINT/Ctrl+C).
Однако вызов store_offset, похоже, никогда не завершается. Я вижу журнал «Сохранение», но не журнал «Сохранено».

Код: Выделить всё

async def consume(self, callback, subscriber_name=None, offset_type: OffsetType = OffsetType.FIRST):

# -----------------------------------------
# WRAPPED CALLBACK
# -----------------------------------------
async def wrapped(msg: AMQPMessage, context: MessageContext):
if not msg:
print(f"[WARNING] Empty message at offset {context.offset}")
return

start = msg.find(b"{")
if start == -1:
print(f"[WARNING] No JSON found at offset {context.offset}: {msg}")
return

raw_json = msg[start:]

try:
data = json.loads(raw_json.decode())
except Exception as e:
print(f"[WARNING] Failed JSON decode at offset {context.offset}: {e}")
return

print(f"[Consumer] Received → {data}")
print(f"Offset of the data in the stream: {context.offset}")

# Store last offset for shutdown
self.last_offset = context.offset

if asyncio.iscoroutinefunction(callback):
await callback(data, context)
else:
callback(data, context)

# -----------------------------------------
# Convert offset
# -----------------------------------------
await self.connect_consumer()
stored_offset = None

if offset_type == OffsetType.OFFSET:
try:
stored_offset = await self.consumer.query_offset(
self.stream_name, subscriber_name=subscriber_name
)
print(f"Last stored offset for subscriber {subscriber_name}: {stored_offset}")
stored_offset+=1 #increase the offset to next value so that it will start from the new data
except OffsetNotFound:
offset_type = OffsetType.FIRST
print("No stored offset found. Starting from beginning.")

offset_spec = self._convert_offset(offset_type, stored_offset)

await self.consumer.subscribe(
stream=self.stream_name,
subscriber_name=subscriber_name,
offset_specification=offset_spec,
callback=wrapped,
)
print(f"[Consumer] Starting stream={self.stream_name}")
print(f"[Consumer] Subscribed with the name: {subscriber_name}")

# -----------------------------
# Register signal handlers
# -----------------------------
loop = asyncio.get_running_loop()
loop.add_signal_handler(
signal.SIGINT, lambda: asyncio.create_task(self.stop_consume(subscriber_name))
)
loop.add_signal_handler(
signal.SIGTERM, lambda: asyncio.create_task(self.stop_consume(subscriber_name))
)

await self.consumer.run()

# =====================================================
# STOP CONSUMER
# =====================================================
async def stop_consume(self, subscriber_name=None):
print("\n[Shutdown] Stopping consumer...")

try:
await self.consumer.stop()
except:
pass

if self.last_offset is not None:
print(f"[Shutdown] Saving last offset: {self.last_offset}")
await self.consumer.store_offset(
stream=self.stream_name,
subscriber_name=subscriber_name,
offset=self.last_offset
)
print("[Shutdown] Offset saved.")
else:
print("[Shutdown] No offset to save.")

print("[Shutdown] Exiting now.")
loop = asyncio.get_running_loop()
loop.stop()
Что я заметил: Кажется, что при вызове await self.consumer.stop() основной вызов self.consumer.run() завершается. Поскольку основная сопрограмма завершена, цикл событий завершается до того, как фоновая задача, созданная обработчиком сигнала, сможет завершить сетевой ввод-вывод, необходимый для store_offset.
Мой вопрос: Как лучше всего «держать» цикл событий открытым до тех пор, пока задачи очистки в моем обработчике сигнала не будут полностью завершены, а также рассказать, как работает этот поток?

Подробнее здесь: https://stackoverflow.com/questions/798 ... ng-shutdow
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»