Почему возникают повторные доставки сообщений при использовании AMQP-CPPC++

Программы на C++. Форум разработчиков
Ответить
Anonymous
 Почему возникают повторные доставки сообщений при использовании AMQP-CPP

Сообщение Anonymous »

Я выполнил две потребительские программы RabbitMQ. Во-первых, возникает много повторно доставленных сообщений, а во-вторых, они возникают редко. Почему?
  • Программа, создающая множество повторно доставленных сообщений
AMQPConsumer.h
#include
#include
#include

class AMQPConsumer
{
public:
AMQPConsumer();
~AMQPConsumer();
void consume(std::vector& messages);
static void timeoutCb(EV_P_ ev_timer* w, int revents);
static void errorCb(const char* message);
static voi startCb(const std::string& consumertag);
private:
std::string _queueName;
ev_timer _timer;
struct ev_loop* _loop;
AMQP::LibEvHandler* _handler;
AMQP::TcpConnection* _connection;
};

AMQPConsumer.cpp
#include "AMQPConsumer.h"

AMQPConsumer::AMQPConsumer()
{
_queueName = "MY_QUEUE";
_loop = ev_loop_new(0);
_handler = new AMQP::LibEvHandler(_loop);
_connection = new AMQP::TcpConnection(_handler, AMQP::Address("127.0.0.1", xxxx, AMQP::Login("user", "password"), "/"));
}

AMQPConsumer::~AMQPConsumer()
{
ev_loop_destroy(_loop);
}

void AMQPConsumer::consume(std::vector& messages)
{
try
{
AMQP::TCPChannel channel(_connection);
auto messageCb = [&channel, this](const AQP::Message& message, uint64_t deliveryTag, bool redelivered){
std::string request(message.body(), message.bodySize());
messages.push_back(request);

channel.ack(deliveryTag);

if(!Manager.isWorking())
{
ev_break(_loop);
}
};

channel.setQos(1);
channel.consume(_queueName).onReceived(messageCb).onSuccess(startCb).onError(errorCb);

do
{
ev_timer_init(&_timer, timeoutCb, 1, 0.0);
ev_timer_start(_loop, &_timer);
ev_run(_loop, 0);

double remain = ev_timer_remaining(_loop, &_timer);
ev_timer_stop(_loop, &_timer);
std::this_thread::sleep_for(std::chrono::milliseconds(10));

if(remain > 0.0)
break;
} while(Manager.isEnabled());

} catch(const std::exception& e)
{
ErrorLog("%s¥n", e.what());
throw;
}
}

void AMQPConsumer::errorCb(const char* message)
{
ErrorLog("consume failed %s¥n", message);
}

void AMQPConsumer::startCb(const std::string& consumertag)
{
InfoLog("consume started %s¥n", consumertag.c_str());
}

void AMQPConsumer::timeoutCb(EV_P_ ev_timer* w, int revents)
{
ev_break(EV_A_ EVBREAK_ONE);
}
  • Программа, которая редко создает повторно доставленные сообщения.
AMQPConsumer.h
#include
#include
#include

class AMQPConsumer
{
public:
AMQPConsumer();
~AMQPConsumer();
void consume(std::vector& messages);
static void timeoutCb(EV_P_ ev_timer* w, int revents);
static void errorCb(const char* message);
static voi startCb(const std::string& consumertag);
private:
std::string _queueName;
ev_timer _timer;
struct ev_loop* _loop;
AMQP::LibEvHandler* _handler;
AMQP::TcpConnection* _connection;
AMQP::TcpChannel* _channel; //This is newly added
};

AMQPConsumer.cpp
#include "AMQPConsumer.h"

AMQPConsumer::AMQPConsumer()
{
_queueName = "MY_QUEUE";
_loop = ev_loop_new(0);
_handler = new AMQP::LibEvHandler(_loop);
_connection = new AMQP::TcpConnection(_handler, AMQP::Address("127.0.0.1", xxxx, AMQP::Login("user", "password"), "/"));
_channel = new AMQP::TcpChannel(_connection); //This is newly added
}

AMQPConsumer::~AMQPConsumer()
{
ev_loop_destroy(_loop);
}

void AMQPConsumer::consume(std::vector& messages)
{
try
{
//AMQP::TCPChannel channel(_connection); //Commented out
auto messageCb = [&channel, this](const AQP::Message& message, uint64_t deliveryTag, bool redelivered){
std::string request(message.body(), message.bodySize());
messages.push_back(request);

_channel->ack(deliveryTag);

if(!Manager.isWorking())
{
ev_break(_loop);
}
};

_channel->setQos(1);
channel.consume(_queueName).onReceived(messageCb).onSuccess(startCb).onError(errorCb);

do
{
ev_timer_init(&_timer, timeoutCb, 1, 0.0);
ev_timer_start(_loop, &_timer);
ev_run(_loop, 0);

double remain = ev_timer_remaining(_loop, &_timer);
ev_timer_stop(_loop, &_timer);
std::this_thread::sleep_for(std::chrono::milliseconds(10));

if(remain > 0.0)
break;
} while(Manager.isEnabled());

} catch(const std::exception& e)
{
ErrorLog("%s¥n", e.what());
throw;
}
}

void AMQPConsumer::errorCb(const char* message)
{
ErrorLog("consume failed %s¥n", message);
}

void AMQPConsumer::startCb(const std::string& consumertag)
{
InfoLog("consume started %s¥n", consumertag.c_str());
}

void AMQPConsumer::timeoutCb(EV_P_ ev_timer* w, int revents)
{
ev_break(EV_A_ EVBREAK_ONE);
}

main.cpp одинаков для двух программ.
#include "AMQPConsumer.h"

int main(void)
{
AMQPConsumer consumer;
while(true)
{
std::vector messages{};
consumer.consume(messages);

// process messages
}
return 0;
}


Подробнее здесь: https://stackoverflow.com/questions/797 ... g-amqp-cpp
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «C++»