Гость
C++ ASIO работает с сопрограммами и цепями
Сообщение
Гость » 09 мар 2024, 19:34
I'm struggling to use isio::strand on coroutine.
I need to send request to a server which is mainly asynchronous, for each request I send I would like to wait to receive the ACK before performing the next request.
I have tested the following implementation
Код: Выделить всё
#include
namespace io = asio;
using tcp = io::ip::tcp;
struct message {
static constexpr std::size_t HDR_LEN = 32;
std::uint32_t id;
enum type {
REQ, ACK, KEEP_ALIVE
} type;
enum code {
OK, SYNTAX_ERROR, PROTOCOL_ERROR
} code;
std::string data;
std::string pack() const;
static message unpack( const std::string & );
};
io::awaitable send_message( tcp::socket &socket, const message &msg ) {
auto buffer = msg.pack();
co_await io::async_write( socket, io::buffer( buffer ), io::use_awaitable );
}
io::awaitable< message > recv_message( tcp::socket &socket ) {
std::string buffer( message::HDR_LEN, 0 );
co_await io::async_read( socket, io::buffer( buffer ), io::use_awaitable );
auto msg = message::unpack( buffer );
if( msg.data.size() ){
co_await io::async_read( socket, io::buffer( msg.data ), io::use_awaitable );
}
}
io::awaitable< message > send_request( tcp::socket &socket, const message &req ) {
co_await send_message( socket, req );
co_await recv_message( socket );
}
int main(int argc, char **argv) {
io::io_context ctx;
tcp::socket client( ctx );
io::strand< io::any_io_executor > strand( ctx );
io::co_spawn( ctx, [&client]() -> io::awaitable< void > {
auto ex = co_await io::this_coro::executor;
tcp::acceptor acceptor( ex, { tcp::v4(), 1234 } );
client = co_await acceptor.async_accept( io::use_awaitable );
io::steady_timer keepalive_timer( ex );
for(;;) {
keepalive_timer.expires_after( std::chrono::seconds( 5 ) );
co_await send_request( client, {
.type = message::type::KEEP_ALIVE,
.code = message::code::OK,
.data = {}
});
}
}, io::detached );
// Just for the test to see in which order req/ack are sent/received
for( std::uint32_t i = 0 ; i < 10 ; i++ ) {
std::thread([&ctx, &strand, &client, i](){
io::co_spawn( ctx, [&client, &strand, i]() -> io::awaitable{
co_await io::post( strand, io::use_awaitable ); // How to lock other threads before the full completion of this one ?
co_await send_request( client, {
.id = i,
.type = message::type::REQ,
.code = message::code::OK,
.data = std::format("Thread n°{}", i)
});
}, io::detached );
});
}
ctx.run();
return 0;
}
But the sequence of the completions are the one I expected:
Код: Выделить всё
- Send REQ 1
- Send REQ 2
- Send REQ 3
- Send REQ 4
- ...
- Recv ACK 1
- Recv ACK 2
- Recv ACK 3
- Recv ACK 4
What did I need to change to get:
Код: Выделить всё
- Send REQ 1
- Recv ACK 1
- Send REQ 2
- Recv ACK 2
- ...
Источник:
https://stackoverflow.com/questions/781 ... nd-strands
1710002086
Гость
I'm struggling to use isio::strand on coroutine. I need to send request to a server which is mainly asynchronous, for each request I send I would like to wait to receive the ACK before performing the next request. I have tested the following implementation [code]#include namespace io = asio; using tcp = io::ip::tcp; struct message { static constexpr std::size_t HDR_LEN = 32; std::uint32_t id; enum type { REQ, ACK, KEEP_ALIVE } type; enum code { OK, SYNTAX_ERROR, PROTOCOL_ERROR } code; std::string data; std::string pack() const; static message unpack( const std::string & ); }; io::awaitable send_message( tcp::socket &socket, const message &msg ) { auto buffer = msg.pack(); co_await io::async_write( socket, io::buffer( buffer ), io::use_awaitable ); } io::awaitable< message > recv_message( tcp::socket &socket ) { std::string buffer( message::HDR_LEN, 0 ); co_await io::async_read( socket, io::buffer( buffer ), io::use_awaitable ); auto msg = message::unpack( buffer ); if( msg.data.size() ){ co_await io::async_read( socket, io::buffer( msg.data ), io::use_awaitable ); } } io::awaitable< message > send_request( tcp::socket &socket, const message &req ) { co_await send_message( socket, req ); co_await recv_message( socket ); } int main(int argc, char **argv) { io::io_context ctx; tcp::socket client( ctx ); io::strand< io::any_io_executor > strand( ctx ); io::co_spawn( ctx, [&client]() -> io::awaitable< void > { auto ex = co_await io::this_coro::executor; tcp::acceptor acceptor( ex, { tcp::v4(), 1234 } ); client = co_await acceptor.async_accept( io::use_awaitable ); io::steady_timer keepalive_timer( ex ); for(;;) { keepalive_timer.expires_after( std::chrono::seconds( 5 ) ); co_await send_request( client, { .type = message::type::KEEP_ALIVE, .code = message::code::OK, .data = {} }); } }, io::detached ); // Just for the test to see in which order req/ack are sent/received for( std::uint32_t i = 0 ; i < 10 ; i++ ) { std::thread([&ctx, &strand, &client, i](){ io::co_spawn( ctx, [&client, &strand, i]() -> io::awaitable{ co_await io::post( strand, io::use_awaitable ); // How to lock other threads before the full completion of this one ? co_await send_request( client, { .id = i, .type = message::type::REQ, .code = message::code::OK, .data = std::format("Thread n°{}", i) }); }, io::detached ); }); } ctx.run(); return 0; } [/code] But the sequence of the completions are the one I expected: [code]- Send REQ 1 - Send REQ 2 - Send REQ 3 - Send REQ 4 - ... - Recv ACK 1 - Recv ACK 2 - Recv ACK 3 - Recv ACK 4 [/code] What did I need to change to get: [code]- Send REQ 1 - Recv ACK 1 - Send REQ 2 - Recv ACK 2 - ... [/code] Источник: [url]https://stackoverflow.com/questions/78132949/c-asio-deal-with-coroutine-and-strands[/url]