Состояние гонки с упорядоченной очередью отправки в ASIOC++

Программы на C++. Форум разработчиков
Ответить
Anonymous
 Состояние гонки с упорядоченной очередью отправки в ASIO

Сообщение Anonymous »

У меня есть следующий код, использующий библиотеку ASIO C++, который предназначен для обеспечения того, чтобы сообщения не чередовались в сокете, в который они записываются (как описано здесь):

Код: Выделить всё

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

using namespace boost;
using tcp = asio::ip::tcp;

namespace
{

class sender : public std::enable_shared_from_this
{
public:
explicit sender(std::shared_ptr sock)
: _socket(std::move(sock))
{
}

template 
auto async_send_message(asio::const_buffer buf, CompletionToken&& h)
{
// Adapted from the C++14 callback_wrapper example on the ASIO
// documentation pages. The operation is initiated by pushing the
// buffer onto the send queue, and completes once the buffer is sent.
auto init =
[me = shared_from_this()](auto handler, asio::const_buffer buf)
{
// Default to using the socket's executor to track outstanding work
auto work = asio::make_work_guard(
handler, me->_socket->get_executor());
me->async_send_message_impl(
buf,
[handler = std::move(handler),
work = std::move(work)](system::error_code ec) mutable
{
const auto alloc
= asio::get_associated_allocator(handler);
asio::dispatch(
work.get_executor(),
asio::bind_allocator(
alloc,
[handler = std::move(handler),
ec]() mutable { handler(ec); }));
});
};

// Initiate the async operation on the socket's strand
return asio::async_initiate(
init, h, buf);
}

private:
template 
void async_send_message_impl(asio::const_buffer buf, CompletionHandler&& h)
{
_send_queue.emplace(buf, std::forward(h));

// Queue was empty meaning there's no active send strand, so start one.
if (_send_queue.size() == 1)
send_queued_message();
}

void send_queued_message()
{
asio::async_write(
*_socket,
_send_queue.front().first,
[me = shared_from_this()](system::error_code ec, std::size_t)
{ me->handle_send_complete(ec); });
}

void handle_send_complete(system::error_code ec)
{
auto& h = _send_queue.front().second;
const auto& ex
= asio::get_associated_executor(h, _socket->get_executor());
asio::dispatch(ex, [h = std::move(h), ec]() mutable { h(ec); });
_send_queue.pop();

// Kepp sending until no messages left to send.
if (!_send_queue.empty())
send_queued_message();
}

std::shared_ptr _socket;
std::queue>
_send_queue;
};
Я провел стресс-тестирование с помощью следующего кода. Он успешно избегает чередующейся записи, но я не думаю, что он работает правильно в многопоточной среде. В конечном итоге у меня возникают состояния гонки (демонстрируемые с помощью средства очистки потоков), когда я их не ожидаю.

Код: Выделить всё

const std::string msg = R"longstring(...)longstring";

void run_connection_impl(const std::shared_ptr& conn)
{
conn->async_send_message(
asio::buffer(msg),
[conn](boost::system::error_code ec)
{
if (ec)
{
std::cerr 

Подробнее здесь: [url]https://stackoverflow.com/questions/79844885/race-condition-with-ordered-send-queue-in-asio[/url]
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «C++»