Мои настройки:
- Локальный интерфейс Flask публикует сообщения (идентификатор элемента, местоположение, количество и т. д.).
- Подписчик прослушивает и сохраняет их в локальную базу данных SQLite.
- Оба прекрасно работают локально — сообщения отображаются в SQLite, а Flask возвращает успешный ответ.
- Однако я не вижу четких доказательств того, что сообщения действительно отправляются в Pub/Sub в Google Cloud.
- Сообщения действительно доходят до темы,
- Абонент правильно их получает и подтверждает,
- Или я вижу только локальную активность.
- Аутентификация осуществляется через переменную среды GOOGLE_APPLICATION_CREDENTIALS
- Сообщения отображаются локально в моем почтовом ящике SQLite.
from google.cloud import pubsub_v1
import json, os
PROJECT_ID = os.getenv("GCP_PROJECT_ID", "your-project-id")
SUB_ID = os.getenv("PUBSUB_SUB", "your-subscription-name")
def callback(message: pubsub_v1.subscriber.message.Message):
try:
payload = json.loads(message.data.decode("utf-8"))
print("[RECEIVED]", payload, "pubsub_id=", message.message_id, flush=True)
# do processing/storage here (omitted)
message.ack() # ← ACK confirms successful handling
print("[ACKED]", message.message_id, flush=True)
except Exception as e:
print("[SUBSCRIBER][ERROR]", e, flush=True)
# Optionally: message.nack()
if __name__ == "__main__":
subscriber = pubsub_v1.SubscriberClient()
sub_path = subscriber.subscription_path(PROJECT_ID, SUB_ID)
print("Listening on", sub_path, flush=True)
future = subscriber.subscribe(sub_path, callback=callback)
try:
future.result()
except KeyboardInterrupt:
future.cancel()
#Subscriber
from flask import Flask, request, jsonify
from google.cloud import pubsub_v1
import json, os
PROJECT_ID = os.getenv("GCP_PROJECT_ID", "your-project-id")
TOPIC_ID = os.getenv("PUBSUB_TOPIC", "your-topic-name")
app = Flask(__name__)
# Create Pub/Sub publisher and topic path
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC_ID)
@app.route("/api/publish", methods=["POST"])
def publish():
"""Publish request JSON to Pub/Sub as the message data."""
try:
payload = request.get_json(force=True) or {}
data_bytes = json.dumps(payload).encode("utf-8")
# publish and wait for server ack
msg_id = publisher.publish(topic_path, data=data_bytes).result()
print("[PUBLISH]", topic_path, "msg_id=", msg_id, flush=True)
return jsonify({"ok": True, "message_id": msg_id}), 200
except Exception as e:
print("[PUBLISH][ERROR]", e, flush=True)
return jsonify({"ok": False, "error": str(e)}), 500
if __name__ == "__main__":
app.run(host="127.0.0.1", port=5000, debug=True)
Подробнее здесь: https://stackoverflow.com/questions/798 ... ceiving-ac