Дальнейшее расследование, похоже, указывает на то, что контрольные сигналы пропускаются, когда возникает исключение тайм-аута, и сервер RabbitMQ разрывает соединение, вызывая эффект домино из сбоев на нашей стороне. Нам еще предстоит выяснить, что вызывает тайм-аут и заблокировать все операции, связанные с RabbitMQ.
РЕДАКТИРОВАТЬ
Вот некоторые подробности, которые могут помочь.
Между обновлениями версий не было никаких изменений ни в каких настройках.
- Целевая рабочая нагрузка — более 1500 клиентов, и остается без изменений.
- ВМ, Windows Server 2022 Standard x64, 32 ГБ ОЗУ, процессор 2,3 ГГц.
- Очереди по-прежнему классические и остаются без изменений.
Вот исходное исключение, которое возникает при первом возникновении ошибки:
at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
at RabbitMQ.Client.Impl.ModelBase.QueueDeclare(String queue, Boolean passive, Boolean
durable, Boolean exclusive, Boolean autoDelete, IDictionary`2 arguments)
at RabbitMQ.Client.Impl.ModelBase.ConsumerCount(String queue)
Ошибка EasyNetQ
System.TimeoutException: The operation has timed out.
at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
at RabbitMQ.Client.Impl.ModelBase.QueueDeclare(String queue, Boolean passive, Boolean durable, Boolean exclusive, Boolean autoDelete, IDictionary`2 arguments)
at RabbitMQ.Client.Impl.AutorecoveringModel.QueueDeclare(String queue, Boolean durable, Boolean exclusive, Boolean autoDelete, IDictionary`2 arguments)
at EasyNetQ.RabbitAdvancedBus.c__DisplayClass51_0.b__0(IModel x)
at EasyNetQ.Persistent.PersistentChannel.InvokeChannelActionAsync[TResult,TChannelAction](TChannelAction channelAction, CancellationToken cancellationToken)
at EasyNetQ.RabbitAdvancedBus.QueueDeclareAsync(String name, Action`1 configure, CancellationToken cancellationToken)
at EasyNetQ.AdvancedBusExtensions.QueueDeclare(IAdvancedBus bus, String name, Boolean durable, Boolean exclusive, Boolean autoDelete, CancellationToken cancellationToken)
Исходный потребительский код
ncChannel.QueueDeclare(queueName, true, false, false, null);
ncChannel.ExchangeDeclare(ExchangeName, ExchangeType.Topic, true, false, null);
ncChannel.QueueBind(queueName, ExchangeName, routingKey);
Channel responseChannel = Channel.CreateBounded(channelOptions);
EventHandler nextCommandHandler = async (object sender, BasicDeliverEventArgs msg) =>
{
try
{
if (msg != null && msg.Body.ToArray() != null)
{
string msgBody = Encoding.UTF8.GetString(msg.Body.ToArray());
//log
}
else
{
//log
}
await responseChannel.Writer.WriteAsync(msg);
responseChannel.Writer.TryComplete();
}
catch (Exception ex)
{
//log
}
};
consumer.Received += nextCommandHandler;
bool autoAck = true;
consumerTag = ncChannel.BasicConsume(queueName, autoAck, consumer);
Попытался изменить потребителя на базового потребителя асинхронных событий
Channel responseChannel = Channel.CreateBounded(channelOptions);
ncChannel.QueueDeclare(queueName, true, false, false, null);
ncChannel.ExchangeDeclare(ExchangeName, ExchangeType.Topic, true, false, null);
ncChannel.QueueBind(queueName, ExchangeName, routingKey);
AsyncEventingBasicConsumer consumer = new AsyncEventingBasicConsumer(ncChannel);
AsyncEventHandler nextCommandHandler = async (object sender, BasicDeliverEventArgs msg) =>
{
try
{
if (msg != null && msg.Body.ToArray() != null)
{
string msgBody = Encoding.UTF8.GetString(msg.Body.ToArray());
//log
}
else
{
//log
}
await responseChannel.Writer.WriteAsync(msg);
responseChannel.Writer.TryComplete();
await Task.Yield();
}
catch (Exception ex)
{
//log
}
};
consumer.Received += nextCommandHandler;
bool autoAck = true;
consumerTag = ncChannel.BasicConsume(queueName, autoAck, consumer);
Потребительский код EasyNetQ
var queue = _rabbitBus.Advanced.QueueDeclare(queueName, true, false, false);
consumer = _rabbitBus.Advanced.Consume(queue, (body, properties, info) => Task.Factory.StartNew(async () =>
{
try
{
string msgBody = null;
if (body.ToArray() != null)
{
msgBody = Encoding.UTF8.GetString(body.ToArray());
//log
}
else
{
//log
}
await responseChannel.Writer.WriteAsync(msgBody);
responseChannel.Writer.TryComplete();
await Task.Yield();
}
catch (Exception ex)
{
//log
}
}));
Подробнее здесь: https://stackoverflow.com/questions/787 ... e-consumer