Я создаю службу интеграции для стороннего приложения. Это приложение использует RabbitMQ для связи. Проблема, с которой я сталкиваюсь, заключается в том, что всякий раз, когда кролик публикует сообщение, при первой отправке моя служба не подтверждает его, но во второй раз сообщение действительно оказывается перехваченным. В чем может быть проблема и возможные решения?
public class RabbitMQClient : IRabbitMQClient
{
private readonly ILogger _logger;
private readonly string _rabbitMqUrl = "amqp://localhost:localhostpassword!@localhost:5672"; //añadir conexion al appsettingsJson
private readonly IConnection _connection;
private readonly IModel _model;
public RabbitMQClient(ILogger logger)
{
//_bus = bus ?? throw new ArgumentNullException(nameof(bus));
_logger = logger;
_connection = CreateConnection();
_model = _connection.CreateModel();
}
private IConnection CreateConnection()
{
var connectionFactory = new ConnectionFactory
{
Uri = new Uri(this._rabbitMqUrl),
AutomaticRecoveryEnabled = true,
NetworkRecoveryInterval = TimeSpan.FromSeconds(5)
};
return connectionFactory.CreateConnection();
}
public void Publish(string queueName, string message)
{
ConfigureQueue(queueName, _model);
_model.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: Encoding.UTF8.GetBytes(message));
}
public void Listen(string queueName, Action onMessageReceived)
{
ConfigureQueue(queueName, _model);
var consumer = new EventingBasicConsumer(_model);
consumer.Received += (sender, @event) =>
{
var body = @event.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
if (!string.IsNullOrWhiteSpace(message))
{
_model.BasicAck(@event.DeliveryTag, true);
onMessageReceived?.Invoke(message);
}
};
_model.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
}
public async Task ListenAsync(string queueName, Func onMessageReceivedAsync)
{
int retries = 0;
const int maxRetries = 5;
bool existeConexion = false;
while (!existeConexion && retries < maxRetries)
{
try
{
ConfigureQueue(queueName, _model);
var consumer = new EventingBasicConsumer(_model);
consumer.Received += async (sender, @event) =>
{
try
{
var body = @event.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
if (!string.IsNullOrWhiteSpace(message))
{
if (onMessageReceivedAsync != null)
{
await onMessageReceivedAsync(message);
}
_model.BasicAck(@event.DeliveryTag, false);
}
}
catch (Exception ex)
{
_logger.LogError($"Error processing message: {ex.Message}");
_model.BasicNack(@event.DeliveryTag, false, true);
}
};
_model.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
existeConexion = true;
_logger.LogInformation($"Consumer connected to queue '{queueName}' and listening for messages.");
}
catch (Exception ex)
{
retries++;
Console.WriteLine($"Failed to connect consumer. Retrying ({retries}/{maxRetries})... Error: {ex.Message}");
await Task.Delay(1000);
}
}
if (!existeConexion)
{
throw new Exception($"Failed to connect consumer after {maxRetries} retries.");
}
}
private void ConfigureQueue(string queueName, IModel model)
{
//TODO Modificar el exchange, este esta puesto solo para hacer pruebas
model.ExchangeDeclarePassive("app.topicfan");
_logger.LogWarning("modelmessages"+model.MessageCount(queueName).ToString());
model.BasicQos(0, 250, true);
model.QueueDeclarePassive(queueName);
}
public void Dispose()
{
_connection.Dispose();
}
}
public class Worker : BackgroundService{
private readonly ILogger _logger;
private readonly IRecurringJobManager _backgroundJobClient;
private IAPIManagementService _apiManagementService;
private IRabbitMQClient _rabbitMQClient;
private IConnection? _connection;
private IModel? _channel;
public Worker(ILogger logger ,IRecurringJobManager backgroundJobClient, IAPIManagementService apiManagementService, IRabbitMQClient rabbitMQClient)
{
_logger = logger;
_backgroundJobClient = backgroundJobClient;
_apiManagementService = apiManagementService;
_rabbitMQClient = rabbitMQClient;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
//while (!stoppingToken.IsCancellationRequested)
//{
// //_backgroundJobClient.AddOrUpdate(recurringJobId,() => TestClients(), Cron.Minutely, recurringJobOptions);
// //var client = await _mfClient.GetClientAsync(2);
// //IEnumerable clients = await _apiManagementService.GetClientsAsync();
// //CollectionModelEntityModelBatchDto movements = await _visualSGAClient.GetBatchesAsync(1);
// //IEnumerable steps = await _visualTrackingClient.GetStepBudgetsAsync(1);
// await Task.Delay(Timeout.Infinite, stoppingToken);
await Task.Delay(Timeout.Infinite, stoppingToken);
}
public override async Task StartAsync(CancellationToken cancellationToken)
{
await _rabbitMQClient.ListenAsync("message.thirdparyapp", async func =>
{
await OnMessageReceivedAsync(func);
});
await base.StartAsync(cancellationToken);
}
private async Task OnMessageReceivedAsync(string message)
{
_logger.LogWarning($"Received message (Async): {message}");
await Task.Delay(1000);
}
}
Подробнее здесь: https://stackoverflow.com/questions/793 ... nt-c-sharp
Перехват сообщения RabbitMQClient C# ⇐ C#
Место общения программистов C#
1736500019
Anonymous
Я создаю службу интеграции для стороннего приложения. Это приложение использует RabbitMQ для связи. Проблема, с которой я сталкиваюсь, заключается в том, что всякий раз, когда кролик публикует сообщение, при первой отправке моя служба не подтверждает его, но во второй раз сообщение действительно оказывается перехваченным. В чем может быть проблема и возможные решения?
public class RabbitMQClient : IRabbitMQClient
{
private readonly ILogger _logger;
private readonly string _rabbitMqUrl = "amqp://localhost:localhostpassword!@localhost:5672"; //añadir conexion al appsettingsJson
private readonly IConnection _connection;
private readonly IModel _model;
public RabbitMQClient(ILogger logger)
{
//_bus = bus ?? throw new ArgumentNullException(nameof(bus));
_logger = logger;
_connection = CreateConnection();
_model = _connection.CreateModel();
}
private IConnection CreateConnection()
{
var connectionFactory = new ConnectionFactory
{
Uri = new Uri(this._rabbitMqUrl),
AutomaticRecoveryEnabled = true,
NetworkRecoveryInterval = TimeSpan.FromSeconds(5)
};
return connectionFactory.CreateConnection();
}
public void Publish(string queueName, string message)
{
ConfigureQueue(queueName, _model);
_model.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: Encoding.UTF8.GetBytes(message));
}
public void Listen(string queueName, Action onMessageReceived)
{
ConfigureQueue(queueName, _model);
var consumer = new EventingBasicConsumer(_model);
consumer.Received += (sender, @event) =>
{
var body = @event.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
if (!string.IsNullOrWhiteSpace(message))
{
_model.BasicAck(@event.DeliveryTag, true);
onMessageReceived?.Invoke(message);
}
};
_model.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
}
public async Task ListenAsync(string queueName, Func onMessageReceivedAsync)
{
int retries = 0;
const int maxRetries = 5;
bool existeConexion = false;
while (!existeConexion && retries < maxRetries)
{
try
{
ConfigureQueue(queueName, _model);
var consumer = new EventingBasicConsumer(_model);
consumer.Received += async (sender, @event) =>
{
try
{
var body = @event.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
if (!string.IsNullOrWhiteSpace(message))
{
if (onMessageReceivedAsync != null)
{
await onMessageReceivedAsync(message);
}
_model.BasicAck(@event.DeliveryTag, false);
}
}
catch (Exception ex)
{
_logger.LogError($"Error processing message: {ex.Message}");
_model.BasicNack(@event.DeliveryTag, false, true);
}
};
_model.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
existeConexion = true;
_logger.LogInformation($"Consumer connected to queue '{queueName}' and listening for messages.");
}
catch (Exception ex)
{
retries++;
Console.WriteLine($"Failed to connect consumer. Retrying ({retries}/{maxRetries})... Error: {ex.Message}");
await Task.Delay(1000);
}
}
if (!existeConexion)
{
throw new Exception($"Failed to connect consumer after {maxRetries} retries.");
}
}
private void ConfigureQueue(string queueName, IModel model)
{
//TODO Modificar el exchange, este esta puesto solo para hacer pruebas
model.ExchangeDeclarePassive("app.topicfan");
_logger.LogWarning("modelmessages"+model.MessageCount(queueName).ToString());
model.BasicQos(0, 250, true);
model.QueueDeclarePassive(queueName);
}
public void Dispose()
{
_connection.Dispose();
}
}
public class Worker : BackgroundService{
private readonly ILogger _logger;
private readonly IRecurringJobManager _backgroundJobClient;
private IAPIManagementService _apiManagementService;
private IRabbitMQClient _rabbitMQClient;
private IConnection? _connection;
private IModel? _channel;
public Worker(ILogger logger ,IRecurringJobManager backgroundJobClient, IAPIManagementService apiManagementService, IRabbitMQClient rabbitMQClient)
{
_logger = logger;
_backgroundJobClient = backgroundJobClient;
_apiManagementService = apiManagementService;
_rabbitMQClient = rabbitMQClient;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
//while (!stoppingToken.IsCancellationRequested)
//{
// //_backgroundJobClient.AddOrUpdate(recurringJobId,() => TestClients(), Cron.Minutely, recurringJobOptions);
// //var client = await _mfClient.GetClientAsync(2);
// //IEnumerable clients = await _apiManagementService.GetClientsAsync();
// //CollectionModelEntityModelBatchDto movements = await _visualSGAClient.GetBatchesAsync(1);
// //IEnumerable steps = await _visualTrackingClient.GetStepBudgetsAsync(1);
// await Task.Delay(Timeout.Infinite, stoppingToken);
await Task.Delay(Timeout.Infinite, stoppingToken);
}
public override async Task StartAsync(CancellationToken cancellationToken)
{
await _rabbitMQClient.ListenAsync("message.thirdparyapp", async func =>
{
await OnMessageReceivedAsync(func);
});
await base.StartAsync(cancellationToken);
}
private async Task OnMessageReceivedAsync(string message)
{
_logger.LogWarning($"Received message (Async): {message}");
await Task.Delay(1000);
}
}
Подробнее здесь: [url]https://stackoverflow.com/questions/79345108/intercept-a-message-rabbitmqclient-c-sharp[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия