Параллельная очередь может быть реализована с помощью read_index и write_index? ⇐ C++
-
Anonymous
Параллельная очередь может быть реализована с помощью read_index и write_index?
Я хочу реализовать параллельную очередь на основе индекса чтения и подписки на запись, read_index и write_index являются атомарными.
Метод Offer пытается получить индекс записи путем конкуренции и поставить точку в индексе массива. Метод опроса пытается получить индекс чтения путем конкуренции и получить точку из индекса массива.
Когда я вызываю методы предложения и опроса в многопоточной среде, метод опроса получает один и тот же результат указателя в двух или более потоках. Возникла одновременная проблема, где что-то не так с этими кодами?
read_index всегда меньше, чем write_index, когда read_index равен write_index, poll метод ничего не сделает.
Есть 12 производителей и 1 потребитель.
#ifndef PAGE_ENGINE_CONCURRENT_QUEUE_H #define PAGE_ENGINE_CONCURRENT_QUEUE_H #include "атомарный" класс ConcurrentQueue { публика: внутренняя емкость; std::atomic *array; std::atomic read_index = {0}; std::atomic write_index = {0}; void init (int p_capacity) { это->емкость = p_capacity; this->array = static_cast(malloc(p_capacity * 8)); } недействительное предложение (Задача *значение) { int index = write_index.fetch_add(1); массив [индекс% (емкость)] = значение; } void poll(Task **value) { for (; read_index < write_index;) { int old_read_index = this->read_index; if (read_index.compare_exchange_weak(old_read_index, old_read_index + 1, std::memory_order_acquire)) { значение [0] = массив [old_read_index % (емкость)]; перерыв; } } } }; #endif //PAGE_ENGINE_CONCURRENT_QUEUE_H
Я хочу реализовать параллельную очередь на основе индекса чтения и подписки на запись, read_index и write_index являются атомарными.
Метод Offer пытается получить индекс записи путем конкуренции и поставить точку в индексе массива. Метод опроса пытается получить индекс чтения путем конкуренции и получить точку из индекса массива.
Когда я вызываю методы предложения и опроса в многопоточной среде, метод опроса получает один и тот же результат указателя в двух или более потоках. Возникла одновременная проблема, где что-то не так с этими кодами?
read_index всегда меньше, чем write_index, когда read_index равен write_index, poll метод ничего не сделает.
Есть 12 производителей и 1 потребитель.
#ifndef PAGE_ENGINE_CONCURRENT_QUEUE_H #define PAGE_ENGINE_CONCURRENT_QUEUE_H #include "атомарный" класс ConcurrentQueue { публика: внутренняя емкость; std::atomic *array; std::atomic read_index = {0}; std::atomic write_index = {0}; void init (int p_capacity) { это->емкость = p_capacity; this->array = static_cast(malloc(p_capacity * 8)); } недействительное предложение (Задача *значение) { int index = write_index.fetch_add(1); массив [индекс% (емкость)] = значение; } void poll(Task **value) { for (; read_index < write_index;) { int old_read_index = this->read_index; if (read_index.compare_exchange_weak(old_read_index, old_read_index + 1, std::memory_order_acquire)) { значение [0] = массив [old_read_index % (емкость)]; перерыв; } } } }; #endif //PAGE_ENGINE_CONCURRENT_QUEUE_H
Мобильная версия