У меня есть следующий код, использующий библиотеку ASIO C++, который предназначен для обеспечения того, чтобы сообщения не чередовались в сокете, в который они записываются (как описано здесь):
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
using namespace boost;
using tcp = asio::ip::tcp;
namespace
{
class sender : public std::enable_shared_from_this
{
public:
explicit sender(std::shared_ptr sock)
: _socket(std::move(sock))
{
}
template
auto async_send_message(asio::const_buffer buf, CompletionToken&& h)
{
// Adapted from the C++14 callback_wrapper example on the ASIO
// documentation pages. The operation is initiated by pushing the
// buffer onto the send queue, and completes once the buffer is sent.
auto init =
[me = shared_from_this()](auto handler, asio::const_buffer buf)
{
// Default to using the socket's executor to track outstanding work
auto work = asio::make_work_guard(
handler, me->_socket->get_executor());
me->async_send_message_impl(
buf,
[handler = std::move(handler),
work = std::move(work)](system::error_code ec) mutable
{
const auto alloc
= asio::get_associated_allocator(handler);
asio::dispatch(
work.get_executor(),
asio::bind_allocator(
alloc,
[handler = std::move(handler),
ec]() mutable { handler(ec); }));
});
};
// Initiate the async operation on the socket's strand
return asio::async_initiate(
init, h, buf);
}
private:
template
void async_send_message_impl(asio::const_buffer buf, CompletionHandler&& h)
{
_send_queue.emplace(buf, std::forward(h));
// Queue was empty meaning there's no active send strand, so start one.
if (_send_queue.size() == 1)
send_queued_message();
}
void send_queued_message()
{
asio::async_write(
*_socket,
_send_queue.front().first,
[me = shared_from_this()](system::error_code ec, std::size_t)
{ me->handle_send_complete(ec); });
}
void handle_send_complete(system::error_code ec)
{
auto& h = _send_queue.front().second;
const auto& ex
= asio::get_associated_executor(h, _socket->get_executor());
asio::dispatch(ex, [h = std::move(h), ec]() mutable { h(ec); });
_send_queue.pop();
// Kepp sending until no messages left to send.
if (!_send_queue.empty())
send_queued_message();
}
std::shared_ptr _socket;
std::queue>
_send_queue;
};
Я провел стресс-тестирование с помощью следующего кода. Он успешно избегает чередующейся записи, но я не думаю, что он работает правильно в многопоточной среде. В конечном итоге у меня возникают состояния гонки (демонстрируемые с помощью средства очистки потоков), когда я их не ожидаю.
У меня есть следующий код, использующий библиотеку ASIO C++, который предназначен для обеспечения того, чтобы сообщения не чередовались в сокете, в который они записываются (как описано здесь): [code]#include #include #include #include #include #include #include #include #include #include #include #include
using namespace boost; using tcp = asio::ip::tcp;
namespace {
class sender : public std::enable_shared_from_this { public: explicit sender(std::shared_ptr sock) : _socket(std::move(sock)) { }
template auto async_send_message(asio::const_buffer buf, CompletionToken&& h) { // Adapted from the C++14 callback_wrapper example on the ASIO // documentation pages. The operation is initiated by pushing the // buffer onto the send queue, and completes once the buffer is sent. auto init = [me = shared_from_this()](auto handler, asio::const_buffer buf) { // Default to using the socket's executor to track outstanding work auto work = asio::make_work_guard( handler, me->_socket->get_executor()); me->async_send_message_impl( buf, [handler = std::move(handler), work = std::move(work)](system::error_code ec) mutable { const auto alloc = asio::get_associated_allocator(handler); asio::dispatch( work.get_executor(), asio::bind_allocator( alloc, [handler = std::move(handler), ec]() mutable { handler(ec); })); }); };
// Initiate the async operation on the socket's strand return asio::async_initiate( init, h, buf); }
// Kepp sending until no messages left to send. if (!_send_queue.empty()) send_queued_message(); }
std::shared_ptr _socket; std::queue> _send_queue; }; [/code] Я провел стресс-тестирование с помощью следующего кода. Он успешно избегает чередующейся записи, но я не думаю, что он работает правильно в многопоточной среде. В конечном итоге у меня возникают состояния гонки (демонстрируемые с помощью средства очистки потоков), когда я их не ожидаю. [code]const std::string msg = R"longstring(...)longstring";