Потеря сообщений в очереди MPMC без блокировки при выходе потоковC++

Программы на C++. Форум разработчиков
Ответить
Anonymous
 Потеря сообщений в очереди MPMC без блокировки при выходе потоков

Сообщение Anonymous »

Проблема
Я наблюдаю нестабильное поведение с потерей сообщений при попытке использовать очередь без блокировки MPMC (параллельную очередь) в реализации алгоритма взаимного исключения. В частности, я заметил следующее:
  • Потери сообщений не происходит до начала процедуры завершения.
  • Если критическое код раздела минимален (короткий миллисекундный сон), то процедура завершения приведет к потере сообщений, и поэтому потоки будут зависать в ожидании сокетов.
  • Если код критического раздела включает в себя некоторую работу (возможно, отправка нескольких TCP-запросов в несвязанный сокет), тогда процедура завершения завершится успешно, и все узлы корректно завершат работу.
Соответствующий код/настройка
Мне не удалось придумать минимальный пример, но базовая архитектура такова:
  • N< /em> узлов на N разных серверах
  • Каждый узел содержит:

    поток приложения (выполняет критические секции )
  • N-1 потоков чтения (получает сообщения от других узлов)
  • потока координатора (реализует протокол взаимного исключения)
  • поток записи (отправляет сообщения другим узлам)

Что касается передачи сообщений, у нас есть:
  • потоки чтения -> MPMC -> координатор (сообщения от других узлов)
  • координатор -> MPMC -> писатель (сообщения для отправки другим узлам)
    приложение -> MPMC -> координатор (запрос/выход критического раздела)
Вот, что я считаю соответствующими частями кода:
Приложение

Код: Выделить всё

for (int i = 0; i < 10; i++) {
client.cs_enter()  // Blocks until ready
// do_some_work()
sleep(5)
client.cs_leave()
}

// wait for threads
Сообщение

Код: Выделить всё

enum MessageKind { REQUEST, REPLY, CS_ENTER, CS_LEAVE, DONE, TERMINATE };
struct Message {
MessageKind kind;
uint32_t node;
uint32_t timestamp;
};
Писатель

Код: Выделить всё

while (true) {
std::pair tm({ 0, Message::cs_enter() });
q->wait_dequeue(tm);

// MAX node means no more messages to send
if (tm.first == std::numeric_limits::max()) break;

// map contains node_id -> socket_fd mapping
send_message((*map)[tm.first], tm.second);
}
// exit thread
Читатели

Код: Выделить всё

while (true) {
Message m = recv_message(fd);
MessageKind k = m.kind;
q->enqueue(m);
if (k == TERMINATE) break;
}
// exit thread
Координатор

Код: Выделить всё

while(true) {
if (this->done && this->terminated.size() == this->config->nodes.size() - 1)
break;

Message m = Message::cs_enter();
this->input_queue->wait_dequeue(m);
switch (m.kind) {
// handle message based on kind
}
}
// inform writer no more work to do
this->output_queue->enqueue({ std::numeric_limits::max(), Message::done() });
// exit thread
Я совершенно не понимаю, как действовать. Произвольное увеличение времени выполнения критической секции — ненадежное решение, и я понятия не имею, насколько хорошо оно будет работать во всех средах, в которых будет тестироваться эта реализация. Я также понятия не имею, почему вообще работает увеличение времени выполнения критической секции. , поскольку он не связан с кодом протокола.
Разве семантика моего кода не гарантирует доставку сообщения? Точнее, гарантирует ли следующий код, что потребитель получит сообщение?

Код: Выделить всё

// Producer - thread 1
q->enqueue(message);
thread_exit()

// Consumer - thread 2
Message m;
q->wait_dequeue(m);
Или мне нужно написать больше, чтобы гарантировать это свойство? (Ограждения памяти и т. д.)


Подробнее здесь: https://stackoverflow.com/questions/791 ... reads-exit
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «C++»