Невозможно получить данные от подписанного брокера MQTT.Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Невозможно получить данные от подписанного брокера MQTT.

Сообщение Anonymous »

Я работаю над проектом, используя массив обонятельных датчиков Bosch BME688 и перьевую доску Adafruit ESP32 Huzzah Featherboard (комплект разработчика Bosch BME688). В настоящее время я написал программу, которая использует датчик для записи данных в реальном времени, а затем передает эти данные MQTT-брокеру Mosquitto. Эта программа работает правильно, и когда я подписываюсь на эту тему в своем терминале Mac, я получаю данные с датчиков в реальном времени. Итак, теперь, когда это сработало, я хотел попытаться построить график этих точек данных. В Jupyter Notebook я написал скрипт, который подписывается на брокера MQTT и отображает данные в режиме реального времени. Но он не работает, и когда я запускаю скрипт, ничего не происходит, а просто запрашивает следующую ячейку. И это несмотря на то, что датчик записывает данные правильно, а открытое окно терминала показывает, что данные действительно записываются правильно.
Это код, который я использовал для создания MQTT-брокера, и затем начните обработку данных. Как я упоминал ранее, это работало правильно, и я мог правильно видеть выходные данные как в терминале (подписавшись на тему MQTT), так и через последовательный монитор Arduino (я использовал Arduino IDE для написания программы и загрузки в датчик). ).

Код: Выделить всё

#include 
#include "commMux.h"

#include "mqtt_datalogger.h"
#include 
#include

#define NUM_OF_SENS    8
#define PANIC_LED   LED_BUILTIN
#define ERROR_DUR   1000

const char* ssid = "mywifiname";
const char* password = "mywifipassword";

const char* mqttServer = "MyMQTTServerAddress";
const int mqttPort = "MySensorsPort";
const char* mqttTopic = "sensorData";
const char* mqttClientName = "MyClientName";

WiFiClient espClient;
PubSubClient mqttClient(espClient);

// create MQTT logger
bme68xData sensorData[NUM_OF_SENS] = {0};
mqttDataLogger logger(&mqttClient, NUM_OF_SENS, mqttTopic);

void reconnect() {
while (!mqttClient.connected()) {
Serial.print("Attempting MQTT connection...");
if (mqttClient.connect(mqttClientName)) {
Serial.println("connected");
} else {
Serial.print("failed, rc=");
Serial.print(mqttClient.state());
Serial.println(" try again in 5 seconds");
delay(5000);
}
}
}

void errLeds(void);

void checkBsecStatus(Bsec2 bsec);

void newDataCallback(const bme68xData data, const bsecOutputs outputs, Bsec2 bsec);

Bsec2 envSensor[NUM_OF_SENS];
comm_mux communicationSetup[NUM_OF_SENS];
uint8_t bsecMemBlock[NUM_OF_SENS][BSEC_INSTANCE_SIZE];
uint8_t sensor = 0;

void setup()
{
bsecSensor sensorList[] = {
BSEC_OUTPUT_IAQ,
BSEC_OUTPUT_RAW_TEMPERATURE,
BSEC_OUTPUT_RAW_PRESSURE,
BSEC_OUTPUT_RAW_HUMIDITY,
BSEC_OUTPUT_RAW_GAS,
BSEC_OUTPUT_STABILIZATION_STATUS,
BSEC_OUTPUT_RUN_IN_STATUS
};

Serial.begin(115200);

comm_mux_begin(Wire, SPI);
pinMode(PANIC_LED, OUTPUT);
delay(100);
while(!Serial) delay(10);

Serial.print("Connecting to ");
Serial.print(ssid);
Serial.println("...");
WiFi.begin(ssid, password);

while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}

Serial.println("");
Serial.println("WiFi connected");
Serial.println("IP address: ");
Serial.println(WiFi.localIP());

mqttClient.setServer(mqttServer, mqttPort);
mqttClient.setBufferSize(600);
Serial.print("MQTT client buffer size: ");
Serial.println(mqttClient.getBufferSize());

reconnect();

logger.beginSensorData();
for (uint8_t i = 0; i < NUM_OF_SENS; i++)
{
communicationSetup[i] = comm_mux_set_config(Wire, SPI, i, communicationSetup[i]);

envSensor[i].allocateMemory(bsecMemBlock[i]);

if (!envSensor[i].begin(BME68X_SPI_INTF, comm_mux_read, comm_mux_write, comm_mux_delay, &communicationSetup[i]))
{
checkBsecStatus (envSensor[i]);
}

if (!envSensor[i].updateSubscription(sensorList, ARRAY_LEN(sensorList), BSEC_SAMPLE_RATE_LP))
{
checkBsecStatus (envSensor[i]);
}

envSensor[i].attachCallback(newDataCallback);

}

Serial.println("BSEC library version " + \
String(envSensor[0].version.major) + "." \
+ String(envSensor[0].version.minor) + "." \
+ String(envSensor[0].version.major_bugfix) + "." \
+ String(envSensor[0].version.minor_bugfix));
}

void loop()
{
for (sensor = 0; sensor < NUM_OF_SENS; sensor++)
{
if (!envSensor[sensor].run())
{
checkBsecStatus(envSensor[sensor]);
}
}

reconnect();
}

void errLeds(void)
{
while(1)
{
digitalWrite(PANIC_LED, HIGH);
delay(ERROR_DUR);
digitalWrite(PANIC_LED, LOW);
delay(ERROR_DUR);
}
}

void newDataCallback(const bme68xData data, const bsecOutputs outputs, Bsec2 bsec)
{
if (!outputs.nOutputs)
{
return;
}

Serial.println("BSEC outputs:\n\tsensor num = " + String(sensor));
Serial.println("\ttimestamp = " + String((int) (outputs.output[0].time_stamp / INT64_C(1000000))));
for (uint8_t i = 0;  i < outputs.nOutputs; i++)
{
const bsecData output  = outputs.output[i];
switch (output.sensor_id)
{
case BSEC_OUTPUT_IAQ:
Serial.println("\tiaq = " + String(output.signal));
Serial.println("\tiaq accuracy = " + String((int) output.accuracy));
break;
case BSEC_OUTPUT_RAW_TEMPERATURE:
Serial.println("\ttemperature = " + String(output.signal));
sensorData[sensor].temperature = output.signal;
break;
case BSEC_OUTPUT_RAW_PRESSURE:
Serial.println("\tpressure = " + String(output.signal));
sensorData[sensor].pressure = output.signal;
break;
case BSEC_OUTPUT_RAW_HUMIDITY:
Serial.println("\thumidity = " + String(output.signal));
sensorData[sensor].humidity = output.signal;
break;
case BSEC_OUTPUT_RAW_GAS:
Serial.println("\tgas resistance = " + String(output.signal));
sensorData[sensor].gas_resistance = output.signal;
break;
case BSEC_OUTPUT_STABILIZATION_STATUS:
Serial.println("\tstabilization status = " + String(output.signal));
break;
case BSEC_OUTPUT_RUN_IN_STATUS:
Serial.println("\trun in status = " + String(output.signal));
break;
default:
break;
}
}
logger.assembleAndPublishSensorData(sensor, &sensorData[sensor]);
}

void checkBsecStatus(Bsec2 bsec)
{
if (bsec.status < BSEC_OK)
{
Serial.println("BSEC error code : " + String(bsec.status));
errLeds();
}
else if (bsec.status > BSEC_OK)
{
Serial.println("BSEC warning code : " + String(bsec.status));
}

if (bsec.sensor.status < BME68X_OK)
{
Serial.println("BME68X error code : " + String(bsec.sensor.status));
errLeds();
}
else if (bsec.sensor.status > BME68X_OK)
{
Serial.println("BME68X warning code : " + String(bsec.sensor.status));
}
}
Вот результат, который я получил от этой программы:

Код: Выделить всё

{ "datapoints" : [
00:00:52.829 -> [ 0, 37272, 21.483234, 9.934442, 33.501423, 37847.425781 ],
00:00:52.829 -> [ 1, 37328, 21.446659, 9.932629, 33.433811, 16072.325195 ],
00:00:52.829 -> [ 2, 37383, 21.332052, 9.936784, 33.085041, 20506.248047 ],
00:00:52.861 -> [ 3, 37438, 21.383812, 9.933786, 33.250164, 27663.712891 ],
00:00:52.861 -> [ 4, 37493, 21.828791, 9.933929, 32.243370, 26661.111328 ],
00:00:52.861 -> [ 5, 37548, 21.818996, 9.935399, 31.534727, 22519.353516 ],
00:00:52.861 -> [ 6, 37603, 22.092115, 9.936502, 32.235497, 23791.822266 ],
00:00:52.861 -> [ 7, 37658, 22.005573, 9.934257, 32.204479, 35605.007812 ]
00:00:52.861 -> ] }

00:00:55.428 -> BSEC outputs:
00:00:55.428 ->     sensor num = 0
00:00:55.428 ->     timestamp = 40256
00:00:55.428 ->     iaq = 50.00
00:00:55.428 ->     iaq accuracy = 0
00:00:55.428 ->     temperature = 21.80
00:00:55.428 ->     pressure = 993.45
00:00:55.428 ->     humidity = 32.74
00:00:55.428 ->     gas resistance = 73478.76
00:00:55.428 ->     stabilization status = 1.00
00:00:55.460 ->     run in status = 0.00
Выходные данные BSEC — это выходные данные одного из датчиков, а массив — это полный массив выходных данных всех датчиков. Кстати, массив — единственный, который отправляется MQTT-брокеру, и он не содержит всех каналов данных. У него есть только идентификатор датчика, метка времени, давление, температура, давление, влажность и газовое сопротивление.
Итак, хотя это работает правильно, графический код, который я написал и запустил в Jupyter Notebook нет. Как я уже упоминал, когда я запускаю его, ничего не отображается на графике, фактически вывода вообще нет, и он сразу же предлагает мне перейти к следующей ячейке в блокноте. Это указывает на то, что код завершился. Но я не уверен, почему это происходит. Вот код, который у меня есть для этого:

Код: Выделить всё

import paho.mqtt.client as mqtt
import matplotlib.pyplot as plt
import numpy as np
import time
import json

mqtt_server = "MyMQTTServerAddress"
mqtt_port = "MySensorsPort"
mqtt_topic = "sensorData"

data = {}

plt.ion()
fig, ax = plt.subplots(figsize=(12, 6))
lines = [ax.plot([], [], label=f'Sensor {i}') for i in range(8)]
ax.set_title('Olfactory Sensor Readings: Gas Resistance')
ax.set_xlabel('Time (seconds)')
ax.set_ylabel('Gas Resistance (Ohms)')
plt.legend()
plt.tight_layout()

last_message_time = time.time()
no_data_timeout = 60

def on_connect(client, userdata, flags, rc):
print(f"Connected with result code {str(rc)}")
client.subscribe(mqtt_topic)

def on_message(client, userdata, msg):
global last_message_time
try:
received_data = json.loads(msg.payload.decode())
datapoints = received_data["datapoints"]

for point in datapoints:
sensor_id = point[0]
timestamp = point[1]
gas_resistance = point[5]

if sensor_id not in data:
data[sensor_id] = {'timestamp': [], 'gas_resistance': []}

data[sensor_id]['timestamp'].append(timestamp)
data[sensor_id]['gas_resistance'].append(gas_resistance)

latest_timestamp = max([max(d['timestamp']) for d in data.values() if d['timestamp']])

for i in range(8):
if i in data:
closest_index = np.argmin(np.abs(np.array(data[i ['timestamp']) - latest_timestamp))
lines[i][0].set_data(latest_timestamp, data[i]['gas_resistance'][closest_index])

ax.set_xlim(min([min(d['timestamp']) for d in data.values() if d['timestamp']]), latest_timestamp + 10)

ax.relim()
ax.autoscale_view()

plt.draw()
plt.pause(0.01)

except Exception as e:
print(f"Error processing message: {e}")

last_message_time = time.time()

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message

client.connect(mqtt_server, mqtt_port, 60)

client.loop_start()

while True:
if time.time() - last_message_time > no_data_timeout:
print(f"No data received for {no_data_timeout} seconds. Exiting.")
client.loop_stop()
client.disconnect()
break

time.sleep(1)
Имейте в виду, что я это делаю именно так. Сначала я запускаю брокер, а затем подключаю датчик. Датчик записывает данные, как только он включается, и останавливается, как только он отключается. Датчик начинает записывать данные, и я могу сказать, что это правильно, потому что могу подписаться на эту тему и проверить точки данных. Я думаю запустить код Jupyter Notebook, и именно здесь возникает моя ошибка. Что странно, программа завершает работу автоматически, без кода ошибки.

Подробнее здесь: https://stackoverflow.com/questions/793 ... qtt-broker
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Невозможно отображать данные датчиков от подписанного брокера MQTT.
    Anonymous » » в форуме Python
    0 Ответы
    10 Просмотры
    Последнее сообщение Anonymous
  • Unity: Как отличить игрока, подписанного анонимно от игрока, подписанного с Apple Win
    Anonymous » » в форуме C#
    0 Ответы
    7 Просмотры
    Последнее сообщение Anonymous
  • Unity: Как отличить игрока, подписанного анонимно от игрока, подписанного с Apple Win
    Anonymous » » в форуме IOS
    0 Ответы
    7 Просмотры
    Последнее сообщение Anonymous
  • Unity: Как отличить игрока, подписанного анонимно от игрока, подписанного с Apple Win
    Anonymous » » в форуме C#
    0 Ответы
    8 Просмотры
    Последнее сообщение Anonymous
  • Unity: Как отличить игрока, подписанного анонимно от игрока, подписанного с Apple Win
    Anonymous » » в форуме IOS
    0 Ответы
    7 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»