Перехват сообщения RabbitMQClient C#C#

Место общения программистов C#
Ответить
Anonymous
 Перехват сообщения RabbitMQClient C#

Сообщение Anonymous »

Я создаю службу интеграции для стороннего приложения. Это приложение использует RabbitMQ для связи. Проблема, с которой я сталкиваюсь, заключается в том, что всякий раз, когда кролик публикует сообщение, при первой отправке моя служба не подтверждает его, но во второй раз сообщение действительно оказывается перехваченным. В чем может быть проблема и возможные решения?
public class RabbitMQClient : IRabbitMQClient
{
private readonly ILogger _logger;
private readonly string _rabbitMqUrl = "amqp://localhost:localhostpassword!@localhost:5672"; //añadir conexion al appsettingsJson
private readonly IConnection _connection;
private readonly IModel _model;

public RabbitMQClient(ILogger logger)
{
//_bus = bus ?? throw new ArgumentNullException(nameof(bus));
_logger = logger;
_connection = CreateConnection();
_model = _connection.CreateModel();
}

private IConnection CreateConnection()
{
var connectionFactory = new ConnectionFactory
{
Uri = new Uri(this._rabbitMqUrl),
AutomaticRecoveryEnabled = true,
NetworkRecoveryInterval = TimeSpan.FromSeconds(5)
};

return connectionFactory.CreateConnection();
}

public void Publish(string queueName, string message)
{
ConfigureQueue(queueName, _model);
_model.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: Encoding.UTF8.GetBytes(message));
}

public void Listen(string queueName, Action onMessageReceived)
{
ConfigureQueue(queueName, _model);
var consumer = new EventingBasicConsumer(_model);
consumer.Received += (sender, @event) =>
{
var body = @event.Body;
var message = Encoding.UTF8.GetString(body.ToArray());

if (!string.IsNullOrWhiteSpace(message))
{
_model.BasicAck(@event.DeliveryTag, true);
onMessageReceived?.Invoke(message);
}
};
_model.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
}

public async Task ListenAsync(string queueName, Func onMessageReceivedAsync)
{
int retries = 0;
const int maxRetries = 5;
bool existeConexion = false;

while (!existeConexion && retries < maxRetries)
{
try
{
ConfigureQueue(queueName, _model);
var consumer = new EventingBasicConsumer(_model);
consumer.Received += async (sender, @event) =>
{
try
{
var body = @event.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
if (!string.IsNullOrWhiteSpace(message))
{
if (onMessageReceivedAsync != null)
{
await onMessageReceivedAsync(message);
}
_model.BasicAck(@event.DeliveryTag, false);
}
}
catch (Exception ex)
{
_logger.LogError($"Error processing message: {ex.Message}");
_model.BasicNack(@event.DeliveryTag, false, true);
}
};

_model.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

existeConexion = true;
_logger.LogInformation($"Consumer connected to queue '{queueName}' and listening for messages.");
}
catch (Exception ex)
{
retries++;
Console.WriteLine($"Failed to connect consumer. Retrying ({retries}/{maxRetries})... Error: {ex.Message}");
await Task.Delay(1000);
}
}

if (!existeConexion)
{
throw new Exception($"Failed to connect consumer after {maxRetries} retries.");
}
}

private void ConfigureQueue(string queueName, IModel model)
{
//TODO Modificar el exchange, este esta puesto solo para hacer pruebas
model.ExchangeDeclarePassive("app.topicfan");
_logger.LogWarning("modelmessages"+model.MessageCount(queueName).ToString());
model.BasicQos(0, 250, true);
model.QueueDeclarePassive(queueName);
}
public void Dispose()
{
_connection.Dispose();
}
}

public class Worker : BackgroundService{
private readonly ILogger _logger;
private readonly IRecurringJobManager _backgroundJobClient;
private IAPIManagementService _apiManagementService;
private IRabbitMQClient _rabbitMQClient;
private IConnection? _connection;
private IModel? _channel;
public Worker(ILogger logger ,IRecurringJobManager backgroundJobClient, IAPIManagementService apiManagementService, IRabbitMQClient rabbitMQClient)
{
_logger = logger;
_backgroundJobClient = backgroundJobClient;
_apiManagementService = apiManagementService;
_rabbitMQClient = rabbitMQClient;

}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
//while (!stoppingToken.IsCancellationRequested)
//{
// //_backgroundJobClient.AddOrUpdate(recurringJobId,() => TestClients(), Cron.Minutely, recurringJobOptions);
// //var client = await _mfClient.GetClientAsync(2);
// //IEnumerable clients = await _apiManagementService.GetClientsAsync();
// //CollectionModelEntityModelBatchDto movements = await _visualSGAClient.GetBatchesAsync(1);
// //IEnumerable steps = await _visualTrackingClient.GetStepBudgetsAsync(1);

// await Task.Delay(Timeout.Infinite, stoppingToken);

await Task.Delay(Timeout.Infinite, stoppingToken);

}

public override async Task StartAsync(CancellationToken cancellationToken)
{
await _rabbitMQClient.ListenAsync("message.thirdparyapp", async func =>
{
await OnMessageReceivedAsync(func);
});

await base.StartAsync(cancellationToken);
}

private async Task OnMessageReceivedAsync(string message)
{

_logger.LogWarning($"Received message (Async): {message}");
await Task.Delay(1000);
}
}


Подробнее здесь: https://stackoverflow.com/questions/793 ... nt-c-sharp
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «C#»