Google Pub/Sub — издатель работает. Как мне убедиться, что мой подписчик получает/подтверждает сообщения?Python

Программы на Python
Anonymous
 Google Pub/Sub — издатель работает. Как мне убедиться, что мой подписчик получает/подтверждает сообщения?

Сообщение Anonymous »

Я работаю над проектом, в котором Flask используется в качестве издателя, а подписчик Python для обработки сообщений с помощью Google Cloud Pub/Sub.
Мои настройки:
  • Локальный интерфейс Flask публикует сообщения (идентификатор элемента, местоположение, количество и т. д.).
  • Подписчик прослушивает и сохраняет их в локальную базу данных SQLite.
  • Оба прекрасно работают локально — сообщения отображаются в SQLite, а Flask возвращает успешный ответ.
  • Однако я не вижу четких доказательств того, что сообщения действительно отправляются в Pub/Sub в Google Cloud.
Я вижу небольшое увеличение количества опубликованных сообщений в разделе Pub/Sub → Темы → [my тема] → Метрики, но я не уверен, что:
  • Сообщения действительно доходят до темы,
  • Абонент правильно их получает и подтверждает,
  • Или я вижу только локальную активность.
Как я могу это подтвердить? мой издатель отправляет сообщения в тему GCP, а подписчик получает/подтверждает их? введите здесь описание изображения
  • Аутентификация осуществляется через переменную среды GOOGLE_APPLICATION_CREDENTIALS
  • Сообщения отображаются локально в моем почтовом ящике SQLite.
#Consumer
from google.cloud import pubsub_v1
import json, os

PROJECT_ID = os.getenv("GCP_PROJECT_ID", "your-project-id")
SUB_ID = os.getenv("PUBSUB_SUB", "your-subscription-name")

def callback(message: pubsub_v1.subscriber.message.Message):
try:
payload = json.loads(message.data.decode("utf-8"))
print("[RECEIVED]", payload, "pubsub_id=", message.message_id, flush=True)
# do processing/storage here (omitted)
message.ack() # ← ACK confirms successful handling
print("[ACKED]", message.message_id, flush=True)
except Exception as e:
print("[SUBSCRIBER][ERROR]", e, flush=True)
# Optionally: message.nack()

if __name__ == "__main__":
subscriber = pubsub_v1.SubscriberClient()
sub_path = subscriber.subscription_path(PROJECT_ID, SUB_ID)
print("Listening on", sub_path, flush=True)
future = subscriber.subscribe(sub_path, callback=callback)
try:
future.result()
except KeyboardInterrupt:
future.cancel()

#Subscriber
from flask import Flask, request, jsonify
from google.cloud import pubsub_v1
import json, os

PROJECT_ID = os.getenv("GCP_PROJECT_ID", "your-project-id")
TOPIC_ID = os.getenv("PUBSUB_TOPIC", "your-topic-name")

app = Flask(__name__)

# Create Pub/Sub publisher and topic path
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC_ID)

@app.route("/api/publish", methods=["POST"])
def publish():
"""Publish request JSON to Pub/Sub as the message data."""
try:
payload = request.get_json(force=True) or {}
data_bytes = json.dumps(payload).encode("utf-8")

# publish and wait for server ack
msg_id = publisher.publish(topic_path, data=data_bytes).result()
print("[PUBLISH]", topic_path, "msg_id=", msg_id, flush=True)

return jsonify({"ok": True, "message_id": msg_id}), 200
except Exception as e:
print("[PUBLISH][ERROR]", e, flush=True)
return jsonify({"ok": False, "error": str(e)}), 500

if __name__ == "__main__":

app.run(host="127.0.0.1", port=5000, debug=True)


Подробнее здесь: https://stackoverflow.com/questions/798 ... ceiving-ac

Вернуться в «Python»