Очередь распределяется по разным потокам. Это многопоточный веб-сервер с 4+ потоками. Они могут вызывать процедуры, которые помещают значения в очередь. Затем у меня также есть запускаемый вручную поток pthread, который ищет доступные значения в очереди и пересылает их списку подписчиков.
Изначально у меня был простой оператор while цикл. Затем я прочитал о незанятом ожидании. Я изменил код очереди на следующий:
Код: Выделить всё
template
class PQueueShifting
{
public:
class TypeDataChecked {
public:
TypeData data;
bool is_valid = 0;
/* Here I have some operators that allow casting, assigning, copying, etc. to a TypeData object */
TypeDataChecked(TypeData data) : data(data), is_valid(true);
TypeDataChecked() : is_valid(false);
};
PQueueShifting()
{
this->count = 0;
}
void enqueue(TypeData item)
{
std::unique_lock lock(this->mutex);
this->data.push_back(item);
if (this->data.size() > MAX_QUEUE_SIZE)
this->data.pop_front();
else
this->count++;
this->not_empty.notify_one();
}
void interrupt()
{
std::unique_lock lock(this->mutex);
this->interrupted = true;
this->not_empty.notify_one();
}
TypeDataChecked dequeue()
{
std::unique_lock lock(this->mutex);
this->not_empty.wait(lock, [this] { return (this->count > 0 || this->interrupted); });
if (this->interrupted)
{
this->interrupted = false; // Reset this, so we don't need a resume() method.
return TypeDataChecked();
}
TypeDataChecked item = this->data.front(); this->data.pop_front();
this->count--;
// this->not_full.notify_one();
return item;
}
size_t getCount() {
return this->count;
}
private:
std::deque data;
size_t count = 0;
std::mutex mutex;
std::condition_variable not_empty;
bool interrupted = false;
// std::condition_variable not_full;
};
Код: Выделить всё
void *thread_function(void* objPtr) {
MyObject* obj = (MyObject*) objPtr;
while (true)
{
// obj -> queue is a PShiftingQueue
auto data = obj -> queue -> dequeue();
if (!data.is_valid)
continue;
// else
obj -> processData(data);
}
}
Код: Выделить всё
void MyObj::swapQueue(PShiftingQueue* newQueue)
{
auto tmp_queue = this->queue;
this->queue = newQueue;
tmp_queue->interrupt();
}
- Если я жду в методе dequeue(), я сначала меняю указатель местами, а затем принудительно завершаю ожидание старой очереди, устанавливая флаг прерывания и уведомляя об этом. Функция возвращает значение, и на следующей итерации while я смотрю на новую очередь.
- В случае, если я не жду, на следующей итерации while > итерация. Я все равно смотрю на новую очередь.
Код: Выделить всё
(SIGABRT) libc++abi: terminating due to uncaught exception of type std::__1::system_error: condition_variable wait failed: Invalid argumentКод: Выделить всё
(lldb) p __lk
(std::unique_lock &) 0x000000016fe86948: {
__m_ = 0x00000001189040f0
__owns_ = true
}
Код: Выделить всё
frame #9: 0x000000019b1d0b44 libc++.1.dylib`std::__1::condition_variable::wait(std::__1::unique_lock&) + 76
libc++.1.dylib`std::__1::condition_variable::__do_timed_wait:
-> 0x19b1d0b44 : pacibsp
0x19b1d0b48 : sub sp, sp, #0x20
0x19b1d0b4c : stp x29, x30, [sp, #0x10]
0x19b1d0b50 : add x29, sp, #0x10
Есть идеи?
Подробнее здесь: https://stackoverflow.com/questions/785 ... d-argument
Мобильная версия