(Boost) Asio Custom Timeout Token -Aout и пользовательская асинхронная отмена распространенияC++

Программы на C++. Форум разработчиков
Ответить
Anonymous
 (Boost) Asio Custom Timeout Token -Aout и пользовательская асинхронная отмена распространения

Сообщение Anonymous »

Я пытаюсь реализовать протокол по гнездам TCP с использованием библиотеки ASIO (Boost). Для этого я могу создать std :: list , который хранит std :: pare предиката (чтобы определить, является ли полученное сообщение, которое мы ждем) и asio :: any_completion_handler Обработчик завершения, который запускается при получении сообщения, и соответствующий предикат верно.
Эта реализация, кажется, работает правильно, если я использую asio :: use_awaitable , но после этого работает. Первый шаг я хотел бы добавить пользовательский токен тайм -аута (т.е. Токен с операцией Asio Async, такой как ASYNC_READ_UNTIL, но если хочу использовать его для моей предыдущей пользовательской операции Async, я не полагаю, как правильно связать Слот ... < /p>
Вот код для пользовательского Timeout Token: < /p>
#include

struct timeout_provider;

// that's our completion token with the timeout attached
template
struct with_timeout {
timeout_provider * provider;
Token token;
};

// this is the timeout source
struct timeout_provider {
timeout_provider( asio::any_io_executor exec )
: timer{exec, std::chrono::steady_clock::time_point::max()}
{}

asio::steady_timer timer;

std::chrono::milliseconds timeout = std::chrono::milliseconds(10000);

asio::cancellation_slot cancellation_slot;
asio::cancellation_signal cancellation_signal;
asio::cancellation_type cancellation_type{asio::cancellation_type::terminal};

~timeout_provider() {
if (cancellation_slot.is_connected())
cancellation_slot.clear();
}

// to use it
template
auto operator()(Token && token)
{
return with_timeout{
this, std::forward(token)
};
}

// set up the timer and get ready to trigger
void arm()
{
timer.expires_after(timeout);
if (cancellation_slot.is_connected()) {

cancellation_slot.assign([this](asio::cancellation_type ct){
cancellation_signal.emit(ct);
});
}
timer.async_wait( [this](asio::error_code ec){
if (!ec) {
cancellation_signal.emit( cancellation_type );
}
});
}

};

template
struct with_timeout_binder
{
timeout_provider * provider;
Handler handler;

template
void operator()(Args && ... args) {
//cancel the time, we're done!
provider->timer.cancel();
std::move(handler)(std::forward(args)...);
}
};

namespace asio {

// This is the class to specialize when implementing a completion token.
template
struct async_result
{
//using return_type = typename async_result::return_type;

// this wrapper goes around the inner initiation, because we need to capture their cancellation slot
template
struct init_wrapper {
Initiation initiation;
timeout_provider * provider;

// the forwards to the initiation and lets us access the actual handler.
template
void operator()( Handler && handler, Args && ... args) {
auto sl = asio::get_associated_cancellation_slot(handler);
if (sl.is_connected()) {
provider->cancellation_slot = sl;
}
provider->arm();
std::move(initiation)(
with_timeout_binder{
provider,
std::forward(handler)
}, std::forward(args)...);
}
};

// the actual initiation
template
static auto initiate(Initiation && init, RawToken && token, Args && ... args) {
return async_result::initiate(
// here we wrap the initiation so we enable the above injection
init_wrapper(std::forward(init), token.provider),
std::move(token.token),
std::forward(args)...
);
}
};

// forward the other associators, such as allocator & executor
template
struct associator {
typedef typename Associator::type type;

static type get(const with_timeout_binder& b, const DefaultCandidate& c = DefaultCandidate()) noexcept {
return Associator::get(b.handler, c);
}
};

// set the slot explicitly
template
struct associated_cancellation_slot< with_timeout_binder, CancellationSlot1 > {
typedef asio::cancellation_slot type;

static type get(const with_timeout_binder& b, const CancellationSlot1& = CancellationSlot1()) noexcept {
return b.provider->cancellation_signal.slot();
}
};

}
< /code>
, а затем код для клиента TCP Socket: < /p>

class client : public std::enable_shared_from_this {
public:
using predicate = std::function;
using handler = asio::any_completion_handler;
using predicate_handler = std::pair< predicate, handler >;

client( tcp::socket socket )
: m_socket( std::move(socket) )
{
asio::co_spawn( m_socket.get_executor(), [&]() -> asio::awaitable {
std::string buffer;
for(;;) {
auto len = co_await asio::async_read_until( m_socket, asio::dynamic_buffer( buffer ), "\n", asio::use_awaitable );
auto msg = std::make_shared< std::string >( buffer.substr( 0, len ) );
for( auto it = m_predicate_handlers.begin() ; it != m_predicate_handlers.end() ; ) {
if( it->first( *msg ) ) {
asio::post( m_socket.get_executor(), [ msg, handler = std::move( it->second ) ]() mutable {
std::move( handler )( asio::error_code{}, msg );
});
it = m_predicate_handlers.erase( it );
}
else {
it++;
}
}
buffer.erase( 0, len );
}
}, asio::detached );
}

template< asio::completion_token_for CompletionToken >
auto async_wait_until( predicate pred, CompletionToken &&token ) {
return asio::async_initiate< CompletionToken, void( std::error_code, const std::shared_ptr ) >(
[&]( asio::completion_handler_for auto&& handler, predicate pred ) mutable {
m_predicate_handlers.push_back( std::make_pair(
pred,
std::forward(handler)
)
);
}, token, pred
);
}

private:
tcp::socket m_socket;
std::list< predicate_handler > m_predicate_handlers;

};
< /code>
и, наконец, основное приложение: < /p>
int main(int argc, char *argv[]) {

asio::io_context ctx;

asio::co_spawn( ctx, [&]() -> asio::awaitable {
tcp::acceptor acceptor(ctx, asio::ip::tcp::endpoint( asio::ip::tcp::v4(), 8000 ) );
for(;;) {
auto socket = co_await acceptor.async_accept( asio::use_awaitable );
auto cl = std::make_shared< client >( std::move( socket ) );
asio::co_spawn( ctx, [cl]() -> asio::awaitable {
for(;;){
auto msg = co_await cl->async_wait_until(
[]( const std::string &msg ) {
return msg.starts_with("REQUEST");
},
asio::use_awaitable
);
std::cout

Подробнее здесь: https://stackoverflow.com/questions/794 ... ion-propag
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «C++»