Ошибка ожидания Condition_variable: неверный аргументC++

Программы на C++. Форум разработчиков
Ответить
Anonymous
 Ошибка ожидания Condition_variable: неверный аргумент

Сообщение Anonymous »

У меня есть реализация оболочки для простого std::deque, в которой я предоставляю только пару методов; в частности, у меня есть методы enqueue() и dequeue() для помещения значений в очередь и их извлечения. Пока все хорошо.
Очередь распределяется по разным потокам. Это многопоточный веб-сервер с 4+ потоками. Они могут вызывать процедуры, которые помещают значения в очередь. Затем у меня также есть запускаемый вручную поток pthread, который ищет доступные значения в очереди и пересылает их списку подписчиков.
Изначально у меня был простой оператор while цикл. Затем я прочитал о незанятом ожидании. Я изменил код очереди на следующий:

Код: Выделить всё

template 
class PQueueShifting
{
public:

class TypeDataChecked {
public:
TypeData data;
bool is_valid = 0;
/* Here I have some operators that allow casting, assigning, copying, etc. to a TypeData object */
TypeDataChecked(TypeData data) : data(data), is_valid(true);
TypeDataChecked() : is_valid(false);
};

PQueueShifting()
{
this->count = 0;
}

void enqueue(TypeData item)
{
std::unique_lock lock(this->mutex);

this->data.push_back(item);
if (this->data.size() > MAX_QUEUE_SIZE)
this->data.pop_front();
else
this->count++;

this->not_empty.notify_one();
}

void interrupt()
{
std::unique_lock lock(this->mutex);
this->interrupted = true;
this->not_empty.notify_one();
}

TypeDataChecked dequeue()
{
std::unique_lock lock(this->mutex);
this->not_empty.wait(lock, [this] { return (this->count > 0 || this->interrupted); });

if (this->interrupted)
{
this->interrupted = false;  // Reset this, so we don't need a resume() method.
return TypeDataChecked();
}

TypeDataChecked item = this->data.front(); this->data.pop_front();
this->count--;

// this->not_full.notify_one();
return item;
}

size_t getCount() {
return this->count;
}

private:
std::deque data;
size_t count = 0;
std::mutex mutex;
std::condition_variable not_empty;
bool interrupted = false;
// std::condition_variable not_full;
};
Внешне у меня есть поток, который выглядит примерно так:

Код: Выделить всё

void *thread_function(void* objPtr) {
MyObject* obj = (MyObject*) objPtr;

while (true)
{
// obj -> queue is a PShiftingQueue
auto data = obj -> queue -> dequeue();

if (!data.is_valid)
continue;

// else
obj -> processData(data);

}
}
И из другого потока мне нужно вызвать прерывание(), чтобы разорвать цикл, изменить очередь для просмотра и перезапустить просмотр очереди. Для этого я сначала ставлю obj->в очередь, а затем вызываю прерывание(), вот так:

Код: Выделить всё

void MyObj::swapQueue(PShiftingQueue* newQueue)
{
auto tmp_queue = this->queue;
this->queue = newQueue;
tmp_queue->interrupt();
}

Так:
  • Если я жду в методе dequeue(), я сначала меняю указатель местами, а затем принудительно завершаю ожидание старой очереди, устанавливая флаг прерывания и уведомляя об этом. Функция возвращает значение, и на следующей итерации while я смотрю на новую очередь.
  • В случае, если я не жду, на следующей итерации while > итерация. Я все равно смотрю на новую очередь.
Это работает; однако время от времени я получаю эту ошибку:

Код: Выделить всё

(SIGABRT) libc++abi: terminating due to uncaught exception of type std::__1::system_error: condition_variable wait failed: Invalid argument
При использовании lldb все выглядит нормально: в переменной условия void Condition_variable::wait(...) метод __lk мьютекс выглядит так:

Код: Выделить всё

(lldb) p __lk
(std::unique_lock &) 0x000000016fe86948: {
__m_ = 0x00000001189040f0
__owns_ = true
}
Это говорит о том, что блокировка действительно принадлежит потоку. На уровне сборки (ARM M1 Pro) я вижу, что после этого стек попадает в std::terminate() clang:

Код: Выделить всё

frame #9: 0x000000019b1d0b44 libc++.1.dylib`std::__1::condition_variable::wait(std::__1::unique_lock&) + 76
libc++.1.dylib`std::__1::condition_variable::__do_timed_wait:
->  0x19b1d0b44 :  pacibsp
0x19b1d0b48 :  sub    sp, sp, #0x20
0x19b1d0b4c :  stp    x29, x30, [sp, #0x10]
0x19b1d0b50 : add    x29, sp, #0x10
Я понятия не имею, как это отладить. Я думаю, что-то не так, потому что, поднимаясь вверх, я вижу, что this->interrupted = 0, тогда как оно должно было быть установлено в 1 вызовом прерывания(). Я действительно вижу сообщения (2024-05-21 18:29:49) [QUEUE ] Прерывание очереди..
Есть идеи?

Подробнее здесь: https://stackoverflow.com/questions/785 ... d-argument
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «C++»