Я наблюдаю нестабильное поведение с потерей сообщений при попытке использовать очередь без блокировки MPMC (параллельную очередь) в реализации алгоритма взаимного исключения. В частности, я заметил следующее:
- Потери сообщений не происходит до начала процедуры завершения.
- Если критическое код раздела минимален (короткий миллисекундный сон), то процедура завершения приведет к потере сообщений, и поэтому потоки будут зависать в ожидании сокетов.
- Если код критического раздела включает в себя некоторую работу (возможно, отправка нескольких TCP-запросов в несвязанный сокет), тогда процедура завершения завершится успешно, и все узлы корректно завершат работу.
Мне не удалось придумать минимальный пример, но базовая архитектура такова:
- N< /em> узлов на N разных серверах
- Каждый узел содержит:
поток приложения (выполняет критические секции ) - N-1 потоков чтения (получает сообщения от других узлов)
- потока координатора (реализует протокол взаимного исключения)
- поток записи (отправляет сообщения другим узлам)
Что касается передачи сообщений, у нас есть:
- потоки чтения -> MPMC -> координатор (сообщения от других узлов)
- координатор -> MPMC -> писатель (сообщения для отправки другим узлам)
приложение -> MPMC -> координатор (запрос/выход критического раздела)
Приложение
Код: Выделить всё
for (int i = 0; i < 10; i++) {
client.cs_enter() // Blocks until ready
// do_some_work()
sleep(5)
client.cs_leave()
}
// wait for threads
Код: Выделить всё
enum MessageKind { REQUEST, REPLY, CS_ENTER, CS_LEAVE, DONE, TERMINATE };
struct Message {
MessageKind kind;
uint32_t node;
uint32_t timestamp;
};
Код: Выделить всё
while (true) {
std::pair tm({ 0, Message::cs_enter() });
q->wait_dequeue(tm);
// MAX node means no more messages to send
if (tm.first == std::numeric_limits::max()) break;
// map contains node_id -> socket_fd mapping
send_message((*map)[tm.first], tm.second);
}
// exit thread
Код: Выделить всё
while (true) {
Message m = recv_message(fd);
MessageKind k = m.kind;
q->enqueue(m);
if (k == TERMINATE) break;
}
// exit thread
Код: Выделить всё
while(true) {
if (this->done && this->terminated.size() == this->config->nodes.size() - 1)
break;
Message m = Message::cs_enter();
this->input_queue->wait_dequeue(m);
switch (m.kind) {
// handle message based on kind
}
}
// inform writer no more work to do
this->output_queue->enqueue({ std::numeric_limits::max(), Message::done() });
// exit thread
Разве семантика моего кода не гарантирует доставку сообщения? Точнее, гарантирует ли следующий код, что потребитель получит сообщение?
Код: Выделить всё
// Producer - thread 1
q->enqueue(message);
thread_exit()
// Consumer - thread 2
Message m;
q->wait_dequeue(m);
Подробнее здесь: https://stackoverflow.com/questions/791 ... reads-exit
Мобильная версия