Безопасен ли этот метод защиты в многопоточном режиме?C++

Программы на C++. Форум разработчиков
Ответить
Anonymous
 Безопасен ли этот метод защиты в многопоточном режиме?

Сообщение Anonymous »

Я написал MQTT-клиент для встраиваемой системы с RTOS (понимаю не POSIX, а вообще FreeRTOS). В системе нет pthread. Поддерживаются только 32-битные атомарные инструкции.
Использование клиентом многопотоков/задач выглядит следующим образом:
  • Один поток выполняет работу по приему/подключению/подписке (он вызывает обратный вызов при получении сообщения). Он называется потоком цикла событий.
  • N потоков могут публиковать сообщения (это также может происходить в потоке цикла событий, но не в принимающем коде).
Клиент хранит указатель на реализацию (в идиоме PImpl), и эта реализация удаляется в случае ошибки. Повторное подключение к серверу создает новый экземпляр этой реализации.
В большинстве случаев использования в системе есть только один поток, поэтому я не хочу добавлять блокировку/мьютекс/семафор для защиты указателя реализации, который будет использоваться для каждого доступа в цикле событий.
Итак, чтобы обработать случай публикации в другом потоке, я рассмотрел что:
  • Потоку публикации нужен только указатель реализации для отправки (локальных данных потока) в сокет реализации. использование сокета является многопоточным, поскольку только поток публикации записывает в сокет, когда клиент не находится в состоянии ошибки.
  • Поток событий должен обнаружить ошибку и закрыть/уничтожить реализацию только тогда, когда ни один поток публикации не публикует.
  • Пока он это делает, ни один поток публикации не может публиковать
Я сделал реализация, подобная этому псевдокоду (ссылка внизу поста для реального рабочего минимального примера):

Код: Выделить всё

struct Impl
{
[...]
};

struct A
{
atomic usage_count = 0;
atomic errored = true;
Impl *      impl = nullptr;

bool exchangeUsage(int step, u32 error)
{
u32 previous = usage.load();
while (true)
{
u32 next = (previous + step);
if (usage.compare_exchange_strong(previous, next))
{
errored |= error;
break;
}
}
}

Impl * acquire()
{
if (error.load()) return nullptr; // Precheck: prevent acquiring impl if an error is already signaled
exchangeUsage(1, false);
// Postcheck: If the error happened during CAS, we need to account for it
if (errored.load()) {
exchangeUsage(-1, true); // Release our usage since we don't use the implementation anymore
return nullptr;
}
return impl;
}

void release(bool withError) { exchangeUsage(-1, withError); }

void closeIfError(bool errorHappened) {
bool inError = errored.load();
if (!inError && !errorHappened) return;

errored.store(1); // Mark the object as errored so starting from now, no publish can acquire it anymore

// Wait until the usage count is 1 again, meaning all publish thread are done with the implementation
u32 previous = usage_count.load();
while (previous != 1)
{
// Yield here
sched_yield();
previous = usage_count.load();
}
// Ok, here it's not possible for a publish to acquire anymore since it's errored
delete impl; impl = nullptr;
}
void resetState() { usage_count = 1; errored = false; }
};

A client;
// Usage is like this for publishing thread:
void publish()
{
Impl * impl = client.acquire();
if (impl) { bool error = impl->publish(...); client.release(error); }
}

// Usage is like this in the event loop thread
void eventLoop()
{
while(true) {
bool error = receiveAndCallCallback(client.impl);
if (error)
{
closeIfError(error);
client.impl = reconnect(...);
client.resetState();
}
}
}

// main() will call client.resetState() before creating threads.
Я протестировал этот код как с очистителем потоков GCC, так и с DRD valgrind, и он работает без ошибок (Valgrind жалуется только на несвязанные проблемы с printf, а не с этим кодом).
Поэтому возникает вопрос:
  • Безопасно ли использовать эту схему?
  • Заметили логику? ошибка в шаблоне?
Ссылка на минимальный воспроизводимый пример кода:
https://godbolt.org/z/er9jxnqx9

Подробнее здесь: https://stackoverflow.com/questions/797 ... hread-safe
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «C++»