Я выполнил две потребительские программы RabbitMQ. Один (А) — это много повторно доставленных сообщений, а другой (Б) — редко. Почему?
Разница между ними заключается в том, используется ли TcpChannel повторно или нет.
Но почему это влияет на поведение повторной доставки?
Ниже показаны различия между ними.
$ diff A/AMQPConsumer.h B/AMQPConsumer.h
19a20
> AMQP::TcpChannel* _channel; //This is newly added
$ diff A/AMQPConsumer.cpp B/AMQPConsumer.cpp
8a9
> _channel = new AMQP::TcpChannel(_connection); //This is newly added
20c21
< AMQP::TCPChannel channel(_connection);
---
> //AMQP::TCPChannel channel(_connection); //Commented out
25c26
< channel.ack(deliveryTag);
---
> _channel->ack(deliveryTag);
33c34
< channel.setQos(1);
---
> _channel->setQos(1);
main.cpp
#include "AMQPConsumer.h"
int main(void)
{
AMQPConsumer consumer;
while(true)
{
std::vector messages{};
consumer.consume(messages);
// process messages
}
return 0;
}
Полные коды (A) находятся здесь;
AMQPConsumer.h
#include
#include
#include
class AMQPConsumer
{
public:
AMQPConsumer();
~AMQPConsumer();
void consume(std::vector& messages);
static void timeoutCb(EV_P_ ev_timer* w, int revents);
static void errorCb(const char* message);
static voi startCb(const std::string& consumertag);
private:
std::string _queueName;
ev_timer _timer;
struct ev_loop* _loop;
AMQP::LibEvHandler* _handler;
AMQP::TcpConnection* _connection;
};
AMQPConsumer.cpp
#include "AMQPConsumer.h"
AMQPConsumer::AMQPConsumer()
{
_queueName = "MY_QUEUE";
_loop = ev_loop_new(0);
_handler = new AMQP::LibEvHandler(_loop);
_connection = new AMQP::TcpConnection(_handler, AMQP::Address("127.0.0.1", xxxx, AMQP::Login("user", "password"), "/"));
}
AMQPConsumer::~AMQPConsumer()
{
ev_loop_destroy(_loop);
}
void AMQPConsumer::consume(std::vector& messages)
{
try
{
AMQP::TCPChannel channel(_connection);
auto messageCb = [&channel, this](const AQP::Message& message, uint64_t deliveryTag, bool redelivered){
std::string request(message.body(), message.bodySize());
messages.push_back(request);
channel.ack(deliveryTag);
if(!Manager.isWorking())
{
ev_break(_loop);
}
};
channel.setQos(1);
channel.consume(_queueName).onReceived(messageCb).onSuccess(startCb).onError(errorCb);
do
{
ev_timer_init(&_timer, timeoutCb, 1, 0.0);
ev_timer_start(_loop, &_timer);
ev_run(_loop, 0);
double remain = ev_timer_remaining(_loop, &_timer);
ev_timer_stop(_loop, &_timer);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
if(remain > 0.0)
break;
} while(Manager.isEnabled());
} catch(const std::exception& e)
{
ErrorLog("%s¥n", e.what());
throw;
}
}
void AMQPConsumer::errorCb(const char* message)
{
ErrorLog("consume failed %s¥n", message);
}
void AMQPConsumer::startCb(const std::string& consumertag)
{
InfoLog("consume started %s¥n", consumertag.c_str());
}
void AMQPConsumer::timeoutCb(EV_P_ ev_timer* w, int revents)
{
ev_break(EV_A_ EVBREAK_ONE);
}
Полные коды (B) находятся здесь;
AMQPConsumer.h
#include
#include
#include
class AMQPConsumer
{
public:
AMQPConsumer();
~AMQPConsumer();
void consume(std::vector& messages);
static void timeoutCb(EV_P_ ev_timer* w, int revents);
static void errorCb(const char* message);
static voi startCb(const std::string& consumertag);
private:
std::string _queueName;
ev_timer _timer;
struct ev_loop* _loop;
AMQP::LibEvHandler* _handler;
AMQP::TcpConnection* _connection;
AMQP::TcpChannel* _channel; //This is newly added
};
AMQPConsumer.cpp
#include "AMQPConsumer.h"
AMQPConsumer::AMQPConsumer()
{
_queueName = "MY_QUEUE";
_loop = ev_loop_new(0);
_handler = new AMQP::LibEvHandler(_loop);
_connection = new AMQP::TcpConnection(_handler, AMQP::Address("127.0.0.1", xxxx, AMQP::Login("user", "password"), "/"));
_channel = new AMQP::TcpChannel(_connection); //This is newly added
}
AMQPConsumer::~AMQPConsumer()
{
ev_loop_destroy(_loop);
}
void AMQPConsumer::consume(std::vector& messages)
{
try
{
//AMQP::TCPChannel channel(_connection); //Commented out
auto messageCb = [&channel, this](const AQP::Message& message, uint64_t deliveryTag, bool redelivered){
std::string request(message.body(), message.bodySize());
messages.push_back(request);
_channel->ack(deliveryTag);
if(!Manager.isWorking())
{
ev_break(_loop);
}
};
_channel->setQos(1);
channel.consume(_queueName).onReceived(messageCb).onSuccess(startCb).onError(errorCb);
do
{
ev_timer_init(&_timer, timeoutCb, 1, 0.0);
ev_timer_start(_loop, &_timer);
ev_run(_loop, 0);
double remain = ev_timer_remaining(_loop, &_timer);
ev_timer_stop(_loop, &_timer);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
if(remain > 0.0)
break;
} while(Manager.isEnabled());
} catch(const std::exception& e)
{
ErrorLog("%s¥n", e.what());
throw;
}
}
void AMQPConsumer::errorCb(const char* message)
{
ErrorLog("consume failed %s¥n", message);
}
void AMQPConsumer::startCb(const std::string& consumertag)
{
InfoLog("consume started %s¥n", consumertag.c_str());
}
void AMQPConsumer::timeoutCb(EV_P_ ev_timer* w, int revents)
{
ev_break(EV_A_ EVBREAK_ONE);
}
Подробнее здесь: https://stackoverflow.com/questions/797 ... s-amqp-cpp
Почему повторное использование TcpChannel уменьшает количество повторно доставленных сообщений (AMQP-CPP) ⇐ C++
Программы на C++. Форум разработчиков
1761228243
Anonymous
Я выполнил две потребительские программы RabbitMQ. Один (А) — это много повторно доставленных сообщений, а другой (Б) — редко. Почему?
Разница между ними заключается в том, используется ли TcpChannel повторно или нет.
Но почему это влияет на поведение повторной доставки?
Ниже показаны различия между ними.
$ diff A/AMQPConsumer.h B/AMQPConsumer.h
19a20
> AMQP::TcpChannel* _channel; //This is newly added
$ diff A/AMQPConsumer.cpp B/AMQPConsumer.cpp
8a9
> _channel = new AMQP::TcpChannel(_connection); //This is newly added
20c21
< AMQP::TCPChannel channel(_connection);
---
> //AMQP::TCPChannel channel(_connection); //Commented out
25c26
< channel.ack(deliveryTag);
---
> _channel->ack(deliveryTag);
33c34
< channel.setQos(1);
---
> _channel->setQos(1);
main.cpp
#include "AMQPConsumer.h"
int main(void)
{
AMQPConsumer consumer;
while(true)
{
std::vector messages{};
consumer.consume(messages);
// process messages
}
return 0;
}
Полные коды (A) находятся здесь;
AMQPConsumer.h
#include
#include
#include
class AMQPConsumer
{
public:
AMQPConsumer();
~AMQPConsumer();
void consume(std::vector& messages);
static void timeoutCb(EV_P_ ev_timer* w, int revents);
static void errorCb(const char* message);
static voi startCb(const std::string& consumertag);
private:
std::string _queueName;
ev_timer _timer;
struct ev_loop* _loop;
AMQP::LibEvHandler* _handler;
AMQP::TcpConnection* _connection;
};
AMQPConsumer.cpp
#include "AMQPConsumer.h"
AMQPConsumer::AMQPConsumer()
{
_queueName = "MY_QUEUE";
_loop = ev_loop_new(0);
_handler = new AMQP::LibEvHandler(_loop);
_connection = new AMQP::TcpConnection(_handler, AMQP::Address("127.0.0.1", xxxx, AMQP::Login("user", "password"), "/"));
}
AMQPConsumer::~AMQPConsumer()
{
ev_loop_destroy(_loop);
}
void AMQPConsumer::consume(std::vector& messages)
{
try
{
AMQP::TCPChannel channel(_connection);
auto messageCb = [&channel, this](const AQP::Message& message, uint64_t deliveryTag, bool redelivered){
std::string request(message.body(), message.bodySize());
messages.push_back(request);
channel.ack(deliveryTag);
if(!Manager.isWorking())
{
ev_break(_loop);
}
};
channel.setQos(1);
channel.consume(_queueName).onReceived(messageCb).onSuccess(startCb).onError(errorCb);
do
{
ev_timer_init(&_timer, timeoutCb, 1, 0.0);
ev_timer_start(_loop, &_timer);
ev_run(_loop, 0);
double remain = ev_timer_remaining(_loop, &_timer);
ev_timer_stop(_loop, &_timer);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
if(remain > 0.0)
break;
} while(Manager.isEnabled());
} catch(const std::exception& e)
{
ErrorLog("%s¥n", e.what());
throw;
}
}
void AMQPConsumer::errorCb(const char* message)
{
ErrorLog("consume failed %s¥n", message);
}
void AMQPConsumer::startCb(const std::string& consumertag)
{
InfoLog("consume started %s¥n", consumertag.c_str());
}
void AMQPConsumer::timeoutCb(EV_P_ ev_timer* w, int revents)
{
ev_break(EV_A_ EVBREAK_ONE);
}
Полные коды (B) находятся здесь;
AMQPConsumer.h
#include
#include
#include
class AMQPConsumer
{
public:
AMQPConsumer();
~AMQPConsumer();
void consume(std::vector& messages);
static void timeoutCb(EV_P_ ev_timer* w, int revents);
static void errorCb(const char* message);
static voi startCb(const std::string& consumertag);
private:
std::string _queueName;
ev_timer _timer;
struct ev_loop* _loop;
AMQP::LibEvHandler* _handler;
AMQP::TcpConnection* _connection;
AMQP::TcpChannel* _channel; //This is newly added
};
AMQPConsumer.cpp
#include "AMQPConsumer.h"
AMQPConsumer::AMQPConsumer()
{
_queueName = "MY_QUEUE";
_loop = ev_loop_new(0);
_handler = new AMQP::LibEvHandler(_loop);
_connection = new AMQP::TcpConnection(_handler, AMQP::Address("127.0.0.1", xxxx, AMQP::Login("user", "password"), "/"));
_channel = new AMQP::TcpChannel(_connection); //This is newly added
}
AMQPConsumer::~AMQPConsumer()
{
ev_loop_destroy(_loop);
}
void AMQPConsumer::consume(std::vector& messages)
{
try
{
//AMQP::TCPChannel channel(_connection); //Commented out
auto messageCb = [&channel, this](const AQP::Message& message, uint64_t deliveryTag, bool redelivered){
std::string request(message.body(), message.bodySize());
messages.push_back(request);
_channel->ack(deliveryTag);
if(!Manager.isWorking())
{
ev_break(_loop);
}
};
_channel->setQos(1);
channel.consume(_queueName).onReceived(messageCb).onSuccess(startCb).onError(errorCb);
do
{
ev_timer_init(&_timer, timeoutCb, 1, 0.0);
ev_timer_start(_loop, &_timer);
ev_run(_loop, 0);
double remain = ev_timer_remaining(_loop, &_timer);
ev_timer_stop(_loop, &_timer);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
if(remain > 0.0)
break;
} while(Manager.isEnabled());
} catch(const std::exception& e)
{
ErrorLog("%s¥n", e.what());
throw;
}
}
void AMQPConsumer::errorCb(const char* message)
{
ErrorLog("consume failed %s¥n", message);
}
void AMQPConsumer::startCb(const std::string& consumertag)
{
InfoLog("consume started %s¥n", consumertag.c_str());
}
void AMQPConsumer::timeoutCb(EV_P_ ev_timer* w, int revents)
{
ev_break(EV_A_ EVBREAK_ONE);
}
Подробнее здесь: [url]https://stackoverflow.com/questions/79797509/why-do-reusing-tcpchannel-reduce-redelivered-messages-amqp-cpp[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия