Клиент MQTT (MQTTNET) не успешно подписывается на темы, несмотря на подключение к EMQX BrokerC#

Место общения программистов C#
Ответить Пред. темаСлед. тема
Anonymous
 Клиент MQTT (MQTTNET) не успешно подписывается на темы, несмотря на подключение к EMQX Broker

Сообщение Anonymous »

Я строю сервис .NET, который подключается к брокеру EMQX с использованием MQTTnet. В то время как клиент подключается к EMQX, когда я тестирую отправку сообщений через MQTTX (клиент MQTT Client Tool), моя служба никогда не получает их. Похоже, что подписка не подписана, несмотря на использование nefbibeasync
структура кода
strong> program.cs ( Настройка службы)

Код: Выделить всё

var builder = Host.CreateApplicationBuilder(args);

// Configure services
builder.Services.AddSingleton();
builder.Services.AddHostedService();

// Other configuration...

var host = builder.Build();
await host.RunAsync();
mqttclient.cs>

Код: Выделить всё

public class MQTTClient
{
private readonly ILogger _logger;
private readonly IConfiguration _configuration;
private readonly IManagedMqttClient _mqttClient;

public MQTTClient(ILogger logger, IConfiguration configuration)
{
_logger = logger;
_configuration = configuration;

_mqttClient = new MqttFactory().CreateManagedMqttClient();
_mqttClient.ConnectedAsync += async (MqttClientConnectedEventArgs eventArgs) =>
{
_logger.LogInformation("Connected to MQTT server");
};
_mqttClient.DisconnectedAsync += async (MqttClientDisconnectedEventArgs eventArgs) =>
{
_logger.LogInformation("Disconnected from MQTT server");
};
_mqttClient.ApplicationMessageReceivedAsync += HandleMessageReceived;
}

public async Task StartAsync(CancellationToken cancellationToken)
{
string connectionstring = _configuration["MQTT:Connectionstring"];
string username = _configuration["MQTT:Username"];
string password = _configuration["MQTT:Password"];
string mqttClientId = $"FetchImportService-{Guid.NewGuid()}";

bool ws = connectionstring.StartsWith("ws");

var options = new MqttClientOptionsBuilder()
.WithClientId(mqttClientId)
.WithCredentials(username, password)
.WithTls()
.WithCleanSession();

if (ws)
options.WithWebSocketServer(connectionstring);
else
options.WithTcpServer(connectionstring);

var builtOptions = options.Build();
var mqttOptions = new ManagedMqttClientOptionsBuilder()
.WithClientOptions(builtOptions)
.WithAutoReconnectDelay(TimeSpan.FromSeconds(3))
.WithPendingMessagesOverflowStrategy(MqttPendingMessagesOverflowStrategy.DropNewMessage)
.Build();

await _mqttClient.StartAsync(mqttOptions);
}

public async Task SubscribeAsync()
{
await _mqttClient.SubscribeAsync(_configuration["MQTT:Addtopic"]);
await _mqttClient.SubscribeAsync(_configuration["MQTT:Bulkaddtopic"]);
}

private async Task HandleMessageReceived(MqttApplicationMessageReceivedEventArgs eventArgs)
{
var topic = eventArgs.ApplicationMessage.Topic;
_logger.LogInformation("Message received: {topic}", topic);

try
{
if (topic.EndsWith(_configuration["MQTT:Addtopic"]))
{
_logger.LogInformation("Processing single client add request");
var data = JsonSerializer.Deserialize(eventArgs.ApplicationMessage.Payload);
// Process message...
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing message");
}
}
}
serviceworker.cs>

Код: Выделить всё

public class ServiceWorker : BackgroundService
{
private readonly ILogger _logger;
private readonly MQTTClient _mqttClient;

public ServiceWorker(ILogger  logger, MQTTClient mqttClient)
{
_logger = logger;
_mqttClient = mqttClient;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("ServiceWorker running at: {time}", DateTimeOffset.Now);

await _mqttClient.StartAsync(stoppingToken);
await _mqttClient.SubscribeAsync();

while (!stoppingToken.IsCancellationRequested)
{
await Task.Delay(1000, stoppingToken);
}

await _mqttClient.StopAsync(stoppingToken);
}
}
< /code>
 Вот журналы < /h3>
Использование того, что @brits, при условии, что я смог получить выходные данные. Вот как они выглядят. < /P>

[15:18:18 INF] Connection with server established.
[15:18:18 INF] Start receiving packets.
[15:18:18 INF] TX (81 bytes) >>> Connect: [ClientId=FetchImportService-488b512d-0b99-4073-9d59-93675cb7b682] [Username=test] [Password=****] [KeepAlivePeriod=15] [CleanSession=True]
[15:18:18 INF] RX (4 bytes) > Subscribe: [PacketIdentifier=1] [TopicFilters=test/lookupservice/clients/add@AtLeastOnce,test/lookupservice/clients/bulkadd@AtLeastOnce]
[15:18:18 INF] RX (6 bytes) 

[*] Служба успешно подключается к EMQX (подтверждено через журналы и DASHBOARD EMQX) 
 Соединение остается стабильным < /li>
 Я могу успешно публиковать тестовые сообщения, используя MQTTX на темы, которые я пытаюсь подписаться на < /li>
 Сообщения MQTTX видны в истории MQTTX
[*] Метод моей службы HandlemesseGevived 
никогда не запускается, когда сообщения отправляются через MQTTX
< /ol>
Настройка тестирования: < /h3>

[*] Использование MQTTX для публикации Тестовые сообщения по темам < /li>
Темы в конфигурации точно соответствуют тому, что я публикую в mqttx < /li>
Сообщения успешно опубликованы (подтверждается в MQTTX, который является Подписывается на ту же тему)
Служба работает и подключается во время тестирования /> среда: < /h3>

.net 6.0 < /li>
mqttnet version = "4.1.4.563" < /li >
emqx Broker < /li>
mqttx < /li>
< /ul>
Вопрос: < /h3>
Почему моя служба не получает сообщения, которые я публикую через MQTTX? Клиент успешно подключается, но сообщения, отправляемые через MQTTX, никогда не запускают метод HandlemessageVived . Что может предотвратить прием сообщения?


Подробнее здесь: https://stackoverflow.com/questions/794 ... connection
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «C#»