Реализовать шаблон разветвления для сообщений mqtt с использованием RabbitMQ.
Данные Интернета вещей -> обмен mqtt -> поток RabbitMq -> несколько потребителей

RabbitMQ работает локально следующим образом (5552 — для потока, 1883 — для mqtt):
Код: Выделить всё
docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672 -p 5672:5672 -p 1883:1883 \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:4-management
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream rabbitmq_stream_management
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_mqtt
Код: Выделить всё
# publish
❯ mosquitto_pub -h localhost -p 1883 -t test_mqtt -m "Hello, MQTT" -u guest -P guest
# subscribe
❯ mosquitto_sub -h localhost -p 1883 -t test_mqtt -u guest -P guest
Hello, MQTT
Поток с тремя сообщениями, которые были опубликованы в нем с помощью указанной выше команды mosquitto_pub:

А вот как я подписываюсь на поток с помощью клиента rstream из официального руководства RabbitMQ:
Код: Выделить всё
import asyncio
from rstream import (
AMQPMessage,
Consumer,
ConsumerOffsetSpecification,
MessageContext,
OffsetType,
)
STREAM_NAME = "mqtt_stream"
# 5GB
STREAM_RETENTION = 5000000000
async def receive():
async with Consumer(host="localhost", username="guest", password="guest") as consumer:
async def on_message(msg: AMQPMessage, message_context: MessageContext):
print("Got message: {} from stream {}".format(msg, message_context.stream))
print("Press control + C to close")
await consumer.start()
await consumer.subscribe(
stream=STREAM_NAME,
callback=on_message,
offset_specification=ConsumerOffsetSpecification(OffsetType.LAST, None),
)
try:
await consumer.run()
except (KeyboardInterrupt, asyncio.CancelledError):
print("Closing Consumer...")
return
with asyncio.Runner() as runner:
runner.run(receive())
Код: Выделить всё
❯ python receive.py
Press control + C to close
Got message: b'\x00Sp\xc0\x06\x05B@@AC\x00Sr\xc12\x04\xa3\nx-exchange\xa1\tamq.topic\xa3\rx-routing-key\xa1\ttest_mqtt\x00Su\xa0\x0bHello, MQTT' from stream mqtt_stream
Есть ли способ, позволяющий мне не писать специальный фрагмент кода, чтобы получить чистую строку (в конечном итоге это будет json) из сообщения?
Когда я публикую в потоке с помощью rstream, тот же код выводит сообщение чистой строки...
Подробнее здесь: https://stackoverflow.com/questions/798 ... tmq-stream
Мобильная версия