Как использовать сообщения, маршрутизируемые из плагина RabbitMQ MQTT, через потоки RabbitMQPython

Программы на Python
Ответить
Anonymous
 Как использовать сообщения, маршрутизируемые из плагина RabbitMQ MQTT, через потоки RabbitMQ

Сообщение Anonymous »

Цель:
Реализовать шаблон разветвления для сообщений mqtt с использованием RabbitMQ.
Данные Интернета вещей -> обмен mqtt -> поток RabbitMq -> несколько потребителей
Изображение

RabbitMQ работает локально следующим образом (5552 — для потока, 1883 — для mqtt):

Код: Выделить всё

docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672 -p 5672:5672 -p 1883:1883  \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:4-management

docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream rabbitmq_stream_management
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_mqtt
Я публикую и подтверждаю, что могу использовать данные mqtt следующим образом:

Код: Выделить всё

# publish
❯ mosquitto_pub -h localhost -p 1883 -t test_mqtt -m "Hello, MQTT" -u guest -P guest
# subscribe
❯ mosquitto_sub  -h localhost -p 1883 -t test_mqtt -u guest -P guest
Hello, MQTT
Однако я хочу подписаться на поток rmq, а не на mqtt, чтобы получать данные в виде разветвления.
Поток с тремя сообщениями, которые были опубликованы в нем с помощью указанной выше команды mosquitto_pub:
Изображение

А вот как я подписываюсь на поток с помощью клиента rstream из официального руководства RabbitMQ:

Код: Выделить всё

import asyncio

from rstream import (
AMQPMessage,
Consumer,
ConsumerOffsetSpecification,
MessageContext,
OffsetType,
)

STREAM_NAME = "mqtt_stream"
# 5GB
STREAM_RETENTION = 5000000000

async def receive():
async with Consumer(host="localhost", username="guest", password="guest") as consumer:

async def on_message(msg: AMQPMessage, message_context: MessageContext):
print("Got message: {} from stream {}".format(msg, message_context.stream))

print("Press control + C to close")
await consumer.start()
await consumer.subscribe(
stream=STREAM_NAME,
callback=on_message,
offset_specification=ConsumerOffsetSpecification(OffsetType.LAST, None),
)
try:
await consumer.run()
except (KeyboardInterrupt, asyncio.CancelledError):
print("Closing Consumer...")
return

with asyncio.Runner() as runner:
runner.run(receive())
Наконец, в консоли я вижу свое сообщение, но сопровождаемое каким-то двоичным объектом:

Код: Выделить всё

❯ python receive.py
Press control + C to close
Got message: b'\x00Sp\xc0\x06\x05B@@AC\x00Sr\xc12\x04\xa3\nx-exchange\xa1\tamq.topic\xa3\rx-routing-key\xa1\ttest_mqtt\x00Su\xa0\x0bHello, MQTT' from stream mqtt_stream
Вопрос:
Есть ли способ, позволяющий мне не писать специальный фрагмент кода, чтобы получить чистую строку (в конечном итоге это будет json) из сообщения?
Когда я публикую в потоке с помощью rstream, тот же код выводит сообщение чистой строки...

Подробнее здесь: https://stackoverflow.com/questions/798 ... tmq-stream
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»