Однако вызов store_offset, похоже, никогда не завершается. Я вижу журнал «Сохранение», но не журнал «Сохранено».
Код: Выделить всё
async def consume(self, callback, subscriber_name=None, offset_type: OffsetType = OffsetType.FIRST):
# -----------------------------------------
# WRAPPED CALLBACK
# -----------------------------------------
async def wrapped(msg: AMQPMessage, context: MessageContext):
if not msg:
print(f"[WARNING] Empty message at offset {context.offset}")
return
start = msg.find(b"{")
if start == -1:
print(f"[WARNING] No JSON found at offset {context.offset}: {msg}")
return
raw_json = msg[start:]
try:
data = json.loads(raw_json.decode())
except Exception as e:
print(f"[WARNING] Failed JSON decode at offset {context.offset}: {e}")
return
print(f"[Consumer] Received → {data}")
print(f"Offset of the data in the stream: {context.offset}")
# Store last offset for shutdown
self.last_offset = context.offset
if asyncio.iscoroutinefunction(callback):
await callback(data, context)
else:
callback(data, context)
# -----------------------------------------
# Convert offset
# -----------------------------------------
await self.connect_consumer()
stored_offset = None
if offset_type == OffsetType.OFFSET:
try:
stored_offset = await self.consumer.query_offset(
self.stream_name, subscriber_name=subscriber_name
)
print(f"Last stored offset for subscriber {subscriber_name}: {stored_offset}")
stored_offset+=1 #increase the offset to next value so that it will start from the new data
except OffsetNotFound:
offset_type = OffsetType.FIRST
print("No stored offset found. Starting from beginning.")
offset_spec = self._convert_offset(offset_type, stored_offset)
await self.consumer.subscribe(
stream=self.stream_name,
subscriber_name=subscriber_name,
offset_specification=offset_spec,
callback=wrapped,
)
print(f"[Consumer] Starting stream={self.stream_name}")
print(f"[Consumer] Subscribed with the name: {subscriber_name}")
# -----------------------------
# Register signal handlers
# -----------------------------
loop = asyncio.get_running_loop()
loop.add_signal_handler(
signal.SIGINT, lambda: asyncio.create_task(self.stop_consume(subscriber_name))
)
loop.add_signal_handler(
signal.SIGTERM, lambda: asyncio.create_task(self.stop_consume(subscriber_name))
)
await self.consumer.run()
# =====================================================
# STOP CONSUMER
# =====================================================
async def stop_consume(self, subscriber_name=None):
print("\n[Shutdown] Stopping consumer...")
try:
await self.consumer.stop()
except:
pass
if self.last_offset is not None:
print(f"[Shutdown] Saving last offset: {self.last_offset}")
await self.consumer.store_offset(
stream=self.stream_name,
subscriber_name=subscriber_name,
offset=self.last_offset
)
print("[Shutdown] Offset saved.")
else:
print("[Shutdown] No offset to save.")
print("[Shutdown] Exiting now.")
loop = asyncio.get_running_loop()
loop.stop()
Мой вопрос: Как лучше всего «держать» цикл событий открытым до тех пор, пока задачи очистки в моем обработчике сигнала не будут полностью завершены, а также рассказать, как работает этот поток?
Подробнее здесь: https://stackoverflow.com/questions/798 ... ng-shutdow
Мобильная версия