- Программа, создающая множество повторно доставленных сообщений
#include
#include
#include
class AMQPConsumer
{
public:
AMQPConsumer();
~AMQPConsumer();
void consume(std::vector& messages);
static void timeoutCb(EV_P_ ev_timer* w, int revents);
static void errorCb(const char* message);
static voi startCb(const std::string& consumertag);
private:
std::string _queueName;
ev_timer _timer;
struct ev_loop* _loop;
AMQP::LibEvHandler* _handler;
AMQP::TcpConnection* _connection;
};
AMQPConsumer.cpp
#include "AMQPConsumer.h"
AMQPConsumer::AMQPConsumer()
{
_queueName = "MY_QUEUE";
_loop = ev_loop_new(0);
_handler = new AMQP::LibEvHandler(_loop);
_connection = new AMQP::TcpConnection(_handler, AMQP::Address("127.0.0.1", xxxx, AMQP::Login("user", "password"), "/"));
}
AMQPConsumer::~AMQPConsumer()
{
ev_loop_destroy(_loop);
}
void AMQPConsumer::consume(std::vector& messages)
{
try
{
AMQP::TCPChannel channel(_connection);
auto messageCb = [&channel, this](const AQP::Message& message, uint64_t deliveryTag, bool redelivered){
std::string request(message.body(), message.bodySize());
messages.push_back(request);
channel.ack(deliveryTag);
if(!Manager.isWorking())
{
ev_break(_loop);
}
};
channel.setQos(1);
channel.consume(_queueName).onReceived(messageCb).onSuccess(startCb).onError(errorCb);
do
{
ev_timer_init(&_timer, timeoutCb, 1, 0.0);
ev_timer_start(_loop, &_timer);
ev_run(_loop, 0);
double remain = ev_timer_remaining(_loop, &_timer);
ev_timer_stop(_loop, &_timer);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
if(remain > 0.0)
break;
} while(Manager.isEnabled());
} catch(const std::exception& e)
{
ErrorLog("%s¥n", e.what());
throw;
}
}
void AMQPConsumer::errorCb(const char* message)
{
ErrorLog("consume failed %s¥n", message);
}
void AMQPConsumer::startCb(const std::string& consumertag)
{
InfoLog("consume started %s¥n", consumertag.c_str());
}
void AMQPConsumer::timeoutCb(EV_P_ ev_timer* w, int revents)
{
ev_break(EV_A_ EVBREAK_ONE);
}
- Программа, которая редко создает повторно доставленные сообщения.
#include
#include
#include
class AMQPConsumer
{
public:
AMQPConsumer();
~AMQPConsumer();
void consume(std::vector& messages);
static void timeoutCb(EV_P_ ev_timer* w, int revents);
static void errorCb(const char* message);
static voi startCb(const std::string& consumertag);
private:
std::string _queueName;
ev_timer _timer;
struct ev_loop* _loop;
AMQP::LibEvHandler* _handler;
AMQP::TcpConnection* _connection;
AMQP::TcpChannel* _channel; //This is newly added
};
AMQPConsumer.cpp
#include "AMQPConsumer.h"
AMQPConsumer::AMQPConsumer()
{
_queueName = "MY_QUEUE";
_loop = ev_loop_new(0);
_handler = new AMQP::LibEvHandler(_loop);
_connection = new AMQP::TcpConnection(_handler, AMQP::Address("127.0.0.1", xxxx, AMQP::Login("user", "password"), "/"));
_channel = new AMQP::TcpChannel(_connection); //This is newly added
}
AMQPConsumer::~AMQPConsumer()
{
ev_loop_destroy(_loop);
}
void AMQPConsumer::consume(std::vector& messages)
{
try
{
//AMQP::TCPChannel channel(_connection); //Commented out
auto messageCb = [&channel, this](const AQP::Message& message, uint64_t deliveryTag, bool redelivered){
std::string request(message.body(), message.bodySize());
messages.push_back(request);
_channel->ack(deliveryTag);
if(!Manager.isWorking())
{
ev_break(_loop);
}
};
_channel->setQos(1);
channel.consume(_queueName).onReceived(messageCb).onSuccess(startCb).onError(errorCb);
do
{
ev_timer_init(&_timer, timeoutCb, 1, 0.0);
ev_timer_start(_loop, &_timer);
ev_run(_loop, 0);
double remain = ev_timer_remaining(_loop, &_timer);
ev_timer_stop(_loop, &_timer);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
if(remain > 0.0)
break;
} while(Manager.isEnabled());
} catch(const std::exception& e)
{
ErrorLog("%s¥n", e.what());
throw;
}
}
void AMQPConsumer::errorCb(const char* message)
{
ErrorLog("consume failed %s¥n", message);
}
void AMQPConsumer::startCb(const std::string& consumertag)
{
InfoLog("consume started %s¥n", consumertag.c_str());
}
void AMQPConsumer::timeoutCb(EV_P_ ev_timer* w, int revents)
{
ev_break(EV_A_ EVBREAK_ONE);
}
main.cpp одинаков для двух программ.
#include "AMQPConsumer.h"
int main(void)
{
AMQPConsumer consumer;
while(true)
{
std::vector messages{};
consumer.consume(messages);
// process messages
}
return 0;
}
Подробнее здесь: https://stackoverflow.com/questions/797 ... g-amqp-cpp
Мобильная версия