Использование клиентом многопотоков/задач выглядит следующим образом:
- Один поток выполняет работу по приему/подключению/подписке (он вызывает обратный вызов при получении сообщения). Он называется потоком цикла событий.
- N потоков могут публиковать сообщения (это также может происходить в потоке цикла событий, но не в принимающем коде).
В большинстве случаев использования в системе есть только один поток, поэтому я не хочу добавлять блокировку/мьютекс/семафор для защиты указателя реализации, который будет использоваться для каждого доступа в цикле событий.
Итак, чтобы обработать случай публикации в другом потоке, я рассмотрел что:
- Потоку публикации нужен только указатель реализации для отправки (локальных данных потока) в сокет реализации. использование сокета является многопоточным, поскольку только поток публикации записывает в сокет, когда клиент не находится в состоянии ошибки.
- Поток событий должен обнаружить ошибку и закрыть/уничтожить реализацию только тогда, когда ни один поток публикации не публикует.
- Пока он это делает, ни один поток публикации не может публиковать
Код: Выделить всё
struct Impl
{
[...]
};
struct A
{
atomic usage_count = 0;
atomic errored = true;
Impl * impl = nullptr;
bool exchangeUsage(int step, u32 error)
{
u32 previous = usage.load();
while (true)
{
u32 next = (previous + step);
if (usage.compare_exchange_strong(previous, next))
{
errored |= error;
break;
}
}
}
Impl * acquire()
{
if (error.load()) return nullptr; // Precheck: prevent acquiring impl if an error is already signaled
exchangeUsage(1, false);
// Postcheck: If the error happened during CAS, we need to account for it
if (errored.load()) {
exchangeUsage(-1, true); // Release our usage since we don't use the implementation anymore
return nullptr;
}
return impl;
}
void release(bool withError) { exchangeUsage(-1, withError); }
void closeIfError(bool errorHappened) {
bool inError = errored.load();
if (!inError && !errorHappened) return;
errored.store(1); // Mark the object as errored so starting from now, no publish can acquire it anymore
// Wait until the usage count is 1 again, meaning all publish thread are done with the implementation
u32 previous = usage_count.load();
while (previous != 1)
{
// Yield here
sched_yield();
previous = usage_count.load();
}
// Ok, here it's not possible for a publish to acquire anymore since it's errored
delete impl; impl = nullptr;
}
void resetState() { usage_count = 1; errored = false; }
};
A client;
// Usage is like this for publishing thread:
void publish()
{
Impl * impl = client.acquire();
if (impl) { bool error = impl->publish(...); client.release(error); }
}
// Usage is like this in the event loop thread
void eventLoop()
{
while(true) {
bool error = receiveAndCallCallback(client.impl);
if (error)
{
closeIfError(error);
client.impl = reconnect(...);
client.resetState();
}
}
}
// main() will call client.resetState() before creating threads.
Поэтому возникает вопрос:
- Безопасно ли использовать эту схему?
- Заметили логику? ошибка в шаблоне?
https://godbolt.org/z/er9jxnqx9
Подробнее здесь: https://stackoverflow.com/questions/797 ... hread-safe
Мобильная версия