структура кода
strong> program.cs ( Настройка службы)
Код: Выделить всё
var builder = Host.CreateApplicationBuilder(args);
// Configure services
builder.Services.AddSingleton();
builder.Services.AddHostedService();
// Other configuration...
var host = builder.Build();
await host.RunAsync();
Код: Выделить всё
public class MQTTClient
{
private readonly ILogger _logger;
private readonly IConfiguration _configuration;
private readonly IManagedMqttClient _mqttClient;
public MQTTClient(ILogger logger, IConfiguration configuration)
{
_logger = logger;
_configuration = configuration;
_mqttClient = new MqttFactory().CreateManagedMqttClient();
_mqttClient.ConnectedAsync += async (MqttClientConnectedEventArgs eventArgs) =>
{
_logger.LogInformation("Connected to MQTT server");
};
_mqttClient.DisconnectedAsync += async (MqttClientDisconnectedEventArgs eventArgs) =>
{
_logger.LogInformation("Disconnected from MQTT server");
};
_mqttClient.ApplicationMessageReceivedAsync += HandleMessageReceived;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
string connectionstring = _configuration["MQTT:Connectionstring"];
string username = _configuration["MQTT:Username"];
string password = _configuration["MQTT:Password"];
string mqttClientId = $"FetchImportService-{Guid.NewGuid()}";
bool ws = connectionstring.StartsWith("ws");
var options = new MqttClientOptionsBuilder()
.WithClientId(mqttClientId)
.WithCredentials(username, password)
.WithTls()
.WithCleanSession();
if (ws)
options.WithWebSocketServer(connectionstring);
else
options.WithTcpServer(connectionstring);
var builtOptions = options.Build();
var mqttOptions = new ManagedMqttClientOptionsBuilder()
.WithClientOptions(builtOptions)
.WithAutoReconnectDelay(TimeSpan.FromSeconds(3))
.WithPendingMessagesOverflowStrategy(MqttPendingMessagesOverflowStrategy.DropNewMessage)
.Build();
await _mqttClient.StartAsync(mqttOptions);
}
public async Task SubscribeAsync()
{
await _mqttClient.SubscribeAsync(_configuration["MQTT:Addtopic"]);
await _mqttClient.SubscribeAsync(_configuration["MQTT:Bulkaddtopic"]);
}
private async Task HandleMessageReceived(MqttApplicationMessageReceivedEventArgs eventArgs)
{
var topic = eventArgs.ApplicationMessage.Topic;
_logger.LogInformation("Message received: {topic}", topic);
try
{
if (topic.EndsWith(_configuration["MQTT:Addtopic"]))
{
_logger.LogInformation("Processing single client add request");
var data = JsonSerializer.Deserialize(eventArgs.ApplicationMessage.Payload);
// Process message...
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing message");
}
}
}
Код: Выделить всё
public class ServiceWorker : BackgroundService
{
private readonly ILogger _logger;
private readonly MQTTClient _mqttClient;
public ServiceWorker(ILogger logger, MQTTClient mqttClient)
{
_logger = logger;
_mqttClient = mqttClient;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("ServiceWorker running at: {time}", DateTimeOffset.Now);
await _mqttClient.StartAsync(stoppingToken);
await _mqttClient.SubscribeAsync();
while (!stoppingToken.IsCancellationRequested)
{
await Task.Delay(1000, stoppingToken);
}
await _mqttClient.StopAsync(stoppingToken);
}
}
< /code>
Вот журналы < /h3>
Использование того, что @brits, при условии, что я смог получить выходные данные. Вот как они выглядят. < /P>
[15:18:18 INF] Connection with server established.
[15:18:18 INF] Start receiving packets.
[15:18:18 INF] TX (81 bytes) >>> Connect: [ClientId=FetchImportService-488b512d-0b99-4073-9d59-93675cb7b682] [Username=test] [Password=****] [KeepAlivePeriod=15] [CleanSession=True]
[15:18:18 INF] RX (4 bytes) > Subscribe: [PacketIdentifier=1] [TopicFilters=test/lookupservice/clients/add@AtLeastOnce,test/lookupservice/clients/bulkadd@AtLeastOnce]
[15:18:18 INF] RX (6 bytes)
[*] Служба успешно подключается к EMQX (подтверждено через журналы и DASHBOARD EMQX)
Соединение остается стабильным < /li>
Я могу успешно публиковать тестовые сообщения, используя MQTTX на темы, которые я пытаюсь подписаться на < /li>
Сообщения MQTTX видны в истории MQTTX
[*] Метод моей службы HandlemesseGevived
< /ol>
Настройка тестирования: < /h3>
[*] Использование MQTTX для публикации Тестовые сообщения по темам < /li>
Темы в конфигурации точно соответствуют тому, что я публикую в mqttx < /li>
Сообщения успешно опубликованы (подтверждается в MQTTX, который является Подписывается на ту же тему)
Служба работает и подключается во время тестирования /> среда: < /h3>
.net 6.0 < /li>
mqttnet version = "4.1.4.563" < /li >
emqx Broker < /li>
mqttx < /li>
< /ul>
Вопрос: < /h3>
Почему моя служба не получает сообщения, которые я публикую через MQTTX? Клиент успешно подключается, но сообщения, отправляемые через MQTTX, никогда не запускают метод HandlemessageVived . Что может предотвратить прием сообщения?
Подробнее здесь: https://stackoverflow.com/questions/794 ... connection