Я пытаюсь реализовать протокол по гнездам TCP с использованием библиотеки ASIO (Boost). Для этого я могу создать std :: list , который хранит std :: pare предиката (чтобы определить, является ли полученное сообщение, которое мы ждем) и asio :: any_completion_handler Обработчик завершения, который запускается при получении сообщения, и соответствующий предикат верно.
Эта реализация, кажется, работает правильно, если я использую asio :: use_awaitable , но после этого работает. Первый шаг я хотел бы добавить пользовательский токен тайм -аута (т.е. Токен с операцией Asio Async, такой как ASYNC_READ_UNTIL, но если хочу использовать его для моей предыдущей пользовательской операции Async, я не полагаю, как правильно связать Слот ... < /p>
Вот код для пользовательского Timeout Token: < /p>
#include
struct timeout_provider;
// that's our completion token with the timeout attached
template
struct with_timeout {
timeout_provider * provider;
Token token;
};
// this is the timeout source
struct timeout_provider {
timeout_provider( asio::any_io_executor exec )
: timer{exec, std::chrono::steady_clock::time_point::max()}
{}
asio::steady_timer timer;
std::chrono::milliseconds timeout = std::chrono::milliseconds(10000);
asio::cancellation_slot cancellation_slot;
asio::cancellation_signal cancellation_signal;
asio::cancellation_type cancellation_type{asio::cancellation_type::terminal};
~timeout_provider() {
if (cancellation_slot.is_connected())
cancellation_slot.clear();
}
// to use it
template
auto operator()(Token && token)
{
return with_timeout{
this, std::forward(token)
};
}
// set up the timer and get ready to trigger
void arm()
{
timer.expires_after(timeout);
if (cancellation_slot.is_connected()) {
cancellation_slot.assign([this](asio::cancellation_type ct){
cancellation_signal.emit(ct);
});
}
timer.async_wait( [this](asio::error_code ec){
if (!ec) {
cancellation_signal.emit( cancellation_type );
}
});
}
};
template
struct with_timeout_binder
{
timeout_provider * provider;
Handler handler;
template
void operator()(Args && ... args) {
//cancel the time, we're done!
provider->timer.cancel();
std::move(handler)(std::forward(args)...);
}
};
namespace asio {
// This is the class to specialize when implementing a completion token.
template
struct async_result
{
//using return_type = typename async_result::return_type;
// this wrapper goes around the inner initiation, because we need to capture their cancellation slot
template
struct init_wrapper {
Initiation initiation;
timeout_provider * provider;
// the forwards to the initiation and lets us access the actual handler.
template
void operator()( Handler && handler, Args && ... args) {
auto sl = asio::get_associated_cancellation_slot(handler);
if (sl.is_connected()) {
provider->cancellation_slot = sl;
}
provider->arm();
std::move(initiation)(
with_timeout_binder{
provider,
std::forward(handler)
}, std::forward(args)...);
}
};
// the actual initiation
template
static auto initiate(Initiation && init, RawToken && token, Args && ... args) {
return async_result::initiate(
// here we wrap the initiation so we enable the above injection
init_wrapper(std::forward(init), token.provider),
std::move(token.token),
std::forward(args)...
);
}
};
// forward the other associators, such as allocator & executor
template
struct associator {
typedef typename Associator::type type;
static type get(const with_timeout_binder& b, const DefaultCandidate& c = DefaultCandidate()) noexcept {
return Associator::get(b.handler, c);
}
};
// set the slot explicitly
template
struct associated_cancellation_slot< with_timeout_binder, CancellationSlot1 > {
typedef asio::cancellation_slot type;
static type get(const with_timeout_binder& b, const CancellationSlot1& = CancellationSlot1()) noexcept {
return b.provider->cancellation_signal.slot();
}
};
}
< /code>
, а затем код для клиента TCP Socket: < /p>
class client : public std::enable_shared_from_this {
public:
using predicate = std::function;
using handler = asio::any_completion_handler;
using predicate_handler = std::pair< predicate, handler >;
client( tcp::socket socket )
: m_socket( std::move(socket) )
{
asio::co_spawn( m_socket.get_executor(), [&]() -> asio::awaitable {
std::string buffer;
for(;;) {
auto len = co_await asio::async_read_until( m_socket, asio::dynamic_buffer( buffer ), "\n", asio::use_awaitable );
auto msg = std::make_shared< std::string >( buffer.substr( 0, len ) );
for( auto it = m_predicate_handlers.begin() ; it != m_predicate_handlers.end() ; ) {
if( it->first( *msg ) ) {
asio::post( m_socket.get_executor(), [ msg, handler = std::move( it->second ) ]() mutable {
std::move( handler )( asio::error_code{}, msg );
});
it = m_predicate_handlers.erase( it );
}
else {
it++;
}
}
buffer.erase( 0, len );
}
}, asio::detached );
}
template< asio::completion_token_for CompletionToken >
auto async_wait_until( predicate pred, CompletionToken &&token ) {
return asio::async_initiate< CompletionToken, void( std::error_code, const std::shared_ptr ) >(
[&]( asio::completion_handler_for auto&& handler, predicate pred ) mutable {
m_predicate_handlers.push_back( std::make_pair(
pred,
std::forward(handler)
)
);
}, token, pred
);
}
private:
tcp::socket m_socket;
std::list< predicate_handler > m_predicate_handlers;
};
< /code>
и, наконец, основное приложение: < /p>
int main(int argc, char *argv[]) {
asio::io_context ctx;
asio::co_spawn( ctx, [&]() -> asio::awaitable {
tcp::acceptor acceptor(ctx, asio::ip::tcp::endpoint( asio::ip::tcp::v4(), 8000 ) );
for(;;) {
auto socket = co_await acceptor.async_accept( asio::use_awaitable );
auto cl = std::make_shared< client >( std::move( socket ) );
asio::co_spawn( ctx, [cl]() -> asio::awaitable {
for(;;){
auto msg = co_await cl->async_wait_until(
[]( const std::string &msg ) {
return msg.starts_with("REQUEST");
},
asio::use_awaitable
);
std::cout
Подробнее здесь: https://stackoverflow.com/questions/794 ... ion-propag
(Boost) Asio Custom Timeout Token -Aout и пользовательская асинхронная отмена распространения ⇐ C++
Программы на C++. Форум разработчиков
1739930035
Anonymous
Я пытаюсь реализовать протокол по гнездам TCP с использованием библиотеки ASIO (Boost). Для этого я могу создать std :: list , который хранит std :: pare предиката (чтобы определить, является ли полученное сообщение, которое мы ждем) и asio :: any_completion_handler Обработчик завершения, который запускается при получении сообщения, и соответствующий предикат верно.
Эта реализация, кажется, работает правильно, если я использую asio :: use_awaitable , но после этого работает. Первый шаг я хотел бы добавить пользовательский токен тайм -аута (т.е. Токен с операцией Asio Async, такой как ASYNC_READ_UNTIL, но если хочу использовать его для моей предыдущей пользовательской операции Async, я не полагаю, как правильно связать Слот ... < /p>
Вот код для пользовательского Timeout Token: < /p>
#include
struct timeout_provider;
// that's our completion token with the timeout attached
template
struct with_timeout {
timeout_provider * provider;
Token token;
};
// this is the timeout source
struct timeout_provider {
timeout_provider( asio::any_io_executor exec )
: timer{exec, std::chrono::steady_clock::time_point::max()}
{}
asio::steady_timer timer;
std::chrono::milliseconds timeout = std::chrono::milliseconds(10000);
asio::cancellation_slot cancellation_slot;
asio::cancellation_signal cancellation_signal;
asio::cancellation_type cancellation_type{asio::cancellation_type::terminal};
~timeout_provider() {
if (cancellation_slot.is_connected())
cancellation_slot.clear();
}
// to use it
template
auto operator()(Token && token)
{
return with_timeout{
this, std::forward(token)
};
}
// set up the timer and get ready to trigger
void arm()
{
timer.expires_after(timeout);
if (cancellation_slot.is_connected()) {
cancellation_slot.assign([this](asio::cancellation_type ct){
cancellation_signal.emit(ct);
});
}
timer.async_wait( [this](asio::error_code ec){
if (!ec) {
cancellation_signal.emit( cancellation_type );
}
});
}
};
template
struct with_timeout_binder
{
timeout_provider * provider;
Handler handler;
template
void operator()(Args && ... args) {
//cancel the time, we're done!
provider->timer.cancel();
std::move(handler)(std::forward(args)...);
}
};
namespace asio {
// This is the class to specialize when implementing a completion token.
template
struct async_result
{
//using return_type = typename async_result::return_type;
// this wrapper goes around the inner initiation, because we need to capture their cancellation slot
template
struct init_wrapper {
Initiation initiation;
timeout_provider * provider;
// the forwards to the initiation and lets us access the actual handler.
template
void operator()( Handler && handler, Args && ... args) {
auto sl = asio::get_associated_cancellation_slot(handler);
if (sl.is_connected()) {
provider->cancellation_slot = sl;
}
provider->arm();
std::move(initiation)(
with_timeout_binder{
provider,
std::forward(handler)
}, std::forward(args)...);
}
};
// the actual initiation
template
static auto initiate(Initiation && init, RawToken && token, Args && ... args) {
return async_result::initiate(
// here we wrap the initiation so we enable the above injection
init_wrapper(std::forward(init), token.provider),
std::move(token.token),
std::forward(args)...
);
}
};
// forward the other associators, such as allocator & executor
template
struct associator {
typedef typename Associator::type type;
static type get(const with_timeout_binder& b, const DefaultCandidate& c = DefaultCandidate()) noexcept {
return Associator::get(b.handler, c);
}
};
// set the slot explicitly
template
struct associated_cancellation_slot< with_timeout_binder, CancellationSlot1 > {
typedef asio::cancellation_slot type;
static type get(const with_timeout_binder& b, const CancellationSlot1& = CancellationSlot1()) noexcept {
return b.provider->cancellation_signal.slot();
}
};
}
< /code>
, а затем код для клиента TCP Socket: < /p>
class client : public std::enable_shared_from_this {
public:
using predicate = std::function;
using handler = asio::any_completion_handler;
using predicate_handler = std::pair< predicate, handler >;
client( tcp::socket socket )
: m_socket( std::move(socket) )
{
asio::co_spawn( m_socket.get_executor(), [&]() -> asio::awaitable {
std::string buffer;
for(;;) {
auto len = co_await asio::async_read_until( m_socket, asio::dynamic_buffer( buffer ), "\n", asio::use_awaitable );
auto msg = std::make_shared< std::string >( buffer.substr( 0, len ) );
for( auto it = m_predicate_handlers.begin() ; it != m_predicate_handlers.end() ; ) {
if( it->first( *msg ) ) {
asio::post( m_socket.get_executor(), [ msg, handler = std::move( it->second ) ]() mutable {
std::move( handler )( asio::error_code{}, msg );
});
it = m_predicate_handlers.erase( it );
}
else {
it++;
}
}
buffer.erase( 0, len );
}
}, asio::detached );
}
template< asio::completion_token_for CompletionToken >
auto async_wait_until( predicate pred, CompletionToken &&token ) {
return asio::async_initiate< CompletionToken, void( std::error_code, const std::shared_ptr ) >(
[&]( asio::completion_handler_for auto&& handler, predicate pred ) mutable {
m_predicate_handlers.push_back( std::make_pair(
pred,
std::forward(handler)
)
);
}, token, pred
);
}
private:
tcp::socket m_socket;
std::list< predicate_handler > m_predicate_handlers;
};
< /code>
и, наконец, основное приложение: < /p>
int main(int argc, char *argv[]) {
asio::io_context ctx;
asio::co_spawn( ctx, [&]() -> asio::awaitable {
tcp::acceptor acceptor(ctx, asio::ip::tcp::endpoint( asio::ip::tcp::v4(), 8000 ) );
for(;;) {
auto socket = co_await acceptor.async_accept( asio::use_awaitable );
auto cl = std::make_shared< client >( std::move( socket ) );
asio::co_spawn( ctx, [cl]() -> asio::awaitable {
for(;;){
auto msg = co_await cl->async_wait_until(
[]( const std::string &msg ) {
return msg.starts_with("REQUEST");
},
asio::use_awaitable
);
std::cout
Подробнее здесь: [url]https://stackoverflow.com/questions/79445152/boost-asio-custom-timeout-token-and-custom-async-operation-cancellation-propag[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия