Я пытался написать круговую очередь без блокировки (кольцевой буфер) для многопрофильного, многопонаноженного (mpmc) использования. Теперь, прежде чем ненавистники заправочников колесо спускаются на меня, я хотел бы предотвратить это, заявив о моем академическом любопытство в решении этой проблемы, а не просто чтобы получить чью-то хорошо проверяемую очередь, работая с репо.front, Back имеет тип std :: atomic и представляют индекс передней и задней части круговой очереди соответственно.
[*] container - это обычный старый массив указателей размера _capacity для шаблона dataType t объектов в памяти
[*] meta_data - массив типаstd::atomic где метадат - тип данных struct, определяемый следующим образом:
struct MetaDatum{
bool occupied;
unsigned int counter;
};
[*] Цель занявшего - указать, имеет ли соответствующее местоположение массива данные или нет, и цель счетчика состоит в том, чтобы предотвратить проблему «ABA» путем изменений версий в соответствии с соответствующим местоположением массива. is_always_lock_free и Assert производится в начале Main для обеспечения этой гарантии.
_capacity-это неатомник без знака int , который представляет количество элементов, которые мы хотим, чтобы выступить в любой момент плюс. Выбор «плюс один» будет объяснен в ближайшее время. POP и if (Back+1)%_ емкость evall Front нам не разрешено нажимать . Последняя идиосинкразия состоит в том, чтобы избежать проблематичного сценария различия между «полностью заполненной» и «полностью пустой» очередью в отсутствие размера , поскольку в обоих сценариях Front и Back совпадает. Так что вместо этого мы просто никогда не позволяем нити нажимать, если индекс задней части находится за пределами переднего индекса. Вот почему мы также устанавливаем _capacity на способность пользователя, плюс один, чтобы дать пользователю очередь их деньги. Бесконечная петля на других. Это та часть, которую я не смог отладить, несмотря на обширные устранения неполадок. Вот полный код: < /p>
'mpmcqueue.h'
#include
#include
#include
#include
#include
#include
struct MetaDatum{
unsigned int counter;
bool occupied;
};
template
struct MPMCQueue{
private:
T** container;
std::atomic* meta_data;
std::atomic front;
std::atomic back;
size_t _capacity;
public:
MPMCQueue(size_t capacity);
bool try_push(T* ptr);
T* pop();
};
< /code>
'mpmcqueue.cpp'
#include "mpmcqueue.h"
/* Public member functions */
template
MPMCQueue::MPMCQueue(size_t capacity): _capacity(capacity+1), front(0), back(0){
container = (T**) new T*[_capacity];
meta_data = new std::atomic[_capacity];
for (int i=0; itrue gets through
MetaDatum comparison_meta_datum = old_meta_datum;
comparison_meta_datum.occupied = false;
MetaDatum new_meta_datum = MetaDatum{old_counter+1, true};
if (!meta_data[old_index].compare_exchange_strong(comparison_meta_datum, new_meta_datum)){
continue;
}
// push the value non-atomically because there is no other thread competing here
container[old_index] = ptr;
// update the back index without CAS because only the thread that succeeded in the above CAS should reach here
back.store((old_index+1)%_capacity);
return true;
}
}
template
T* MPMCQueue::pop(){
// only pop if front index is not back index
while (true){
// get front index
unsigned int old_index = front.load();
MetaDatum old_meta_datum = meta_data[old_index].load();
unsigned int old_counter = old_meta_datum.counter;
// verify that the queue is not empty
if (back.load() == old_index){
return nullptr;
}
// perform a CAS on the array location such that only the thread that succeeds in changing true->false gets through
MetaDatum comparison_meta_datum = old_meta_datum;
comparison_meta_datum.occupied = true;
MetaDatum new_meta_datum = MetaDatum{old_counter+1, false};
if (!meta_data[old_index].compare_exchange_strong(comparison_meta_datum, new_meta_datum)){
continue;
}
// pop the value non-atomically because there is no other thread competing here
T* ptr = container[old_index];
// update the front index without CAS because only the thread that succeeded in the above CAS should reach here
front.store((old_index+1)%_capacity);
return ptr;
}
}
/* Testing */
std::atomic counter{1};
std::atomic sum{0};
std::atomic out_sum{0};
size_t max_counter = 1000;
std::atomic done{false};
void keep_popping(MPMCQueue& MPMCQueue){
int* next_ptr = MPMCQueue.pop();
while (!done || next_ptr!=nullptr){
if (next_ptr == nullptr){
next_ptr = MPMCQueue.pop();
continue;
}
out_sum.fetch_add(*next_ptr);
delete next_ptr;
next_ptr = MPMCQueue.pop();
}
}
void keep_pushing(MPMCQueue& MPMCQueue){
while (true){
size_t old_counter = counter.fetch_add(1);
if (old_counter > max_counter){
done.store(true);
break;
}
sum.fetch_add(old_counter);
int* x = new int(old_counter);
while(!MPMCQueue.try_push(x));
}
}
int main(){
assert(std::atomic{}.is_always_lock_free == true); // must be guaranteed lock free on this system for the ring buffer to work
assert(std::atomic{}.is_always_lock_free == true);
int num_threads = 8;
size_t capacity = 10;
MPMCQueue MPMCQueue{capacity};
std::vector threadpool{};
for (int i=0; i
Подробнее здесь: https://stackoverflow.com/questions/795 ... e-with-cas
Проектирование беспилосовой круговой шейки с CAS ⇐ C++
Программы на C++. Форум разработчиков
-
Anonymous
1743649362
Anonymous
Я пытался написать круговую очередь без блокировки (кольцевой буфер) для многопрофильного, многопонаноженного (mpmc) использования. Теперь, прежде чем ненавистники заправочников колесо спускаются на меня, я хотел бы предотвратить это, заявив о моем академическом любопытство в решении этой проблемы, а не просто чтобы получить чью-то хорошо проверяемую очередь, работая с репо.front, Back имеет тип std :: atomic и представляют индекс передней и задней части круговой очереди соответственно.
[*] container - это обычный старый массив указателей размера _capacity для шаблона dataType t объектов в памяти
[*] meta_data - массив типаstd::atomic где метадат - тип данных struct, определяемый следующим образом:
struct MetaDatum{
bool occupied;
unsigned int counter;
};
[*] Цель занявшего - указать, имеет ли соответствующее местоположение массива данные или нет, и цель счетчика состоит в том, чтобы предотвратить проблему «ABA» путем изменений версий в соответствии с соответствующим местоположением массива. is_always_lock_free и Assert производится в начале Main для обеспечения этой гарантии.
_capacity-это неатомник без знака int , который представляет количество элементов, которые мы хотим, чтобы выступить в любой момент плюс. Выбор «плюс один» будет объяснен в ближайшее время. POP и if (Back+1)%_ емкость evall Front нам не разрешено нажимать . Последняя идиосинкразия состоит в том, чтобы избежать проблематичного сценария различия между «полностью заполненной» и «полностью пустой» очередью в отсутствие размера , поскольку в обоих сценариях Front и Back совпадает. Так что вместо этого мы просто никогда не позволяем нити нажимать, если индекс задней части находится за пределами переднего индекса. Вот почему мы также устанавливаем _capacity на способность пользователя, плюс один, чтобы дать пользователю очередь их деньги. Бесконечная петля на других. Это та часть, которую я не смог отладить, несмотря на обширные устранения неполадок. Вот полный код: < /p>
'mpmcqueue.h'
#include
#include
#include
#include
#include
#include
struct MetaDatum{
unsigned int counter;
bool occupied;
};
template
struct MPMCQueue{
private:
T** container;
std::atomic* meta_data;
std::atomic front;
std::atomic back;
size_t _capacity;
public:
MPMCQueue(size_t capacity);
bool try_push(T* ptr);
T* pop();
};
< /code>
'mpmcqueue.cpp'
#include "mpmcqueue.h"
/* Public member functions */
template
MPMCQueue::MPMCQueue(size_t capacity): _capacity(capacity+1), front(0), back(0){
container = (T**) new T*[_capacity];
meta_data = new std::atomic[_capacity];
for (int i=0; itrue gets through
MetaDatum comparison_meta_datum = old_meta_datum;
comparison_meta_datum.occupied = false;
MetaDatum new_meta_datum = MetaDatum{old_counter+1, true};
if (!meta_data[old_index].compare_exchange_strong(comparison_meta_datum, new_meta_datum)){
continue;
}
// push the value non-atomically because there is no other thread competing here
container[old_index] = ptr;
// update the back index without CAS because only the thread that succeeded in the above CAS should reach here
back.store((old_index+1)%_capacity);
return true;
}
}
template
T* MPMCQueue::pop(){
// only pop if front index is not back index
while (true){
// get front index
unsigned int old_index = front.load();
MetaDatum old_meta_datum = meta_data[old_index].load();
unsigned int old_counter = old_meta_datum.counter;
// verify that the queue is not empty
if (back.load() == old_index){
return nullptr;
}
// perform a CAS on the array location such that only the thread that succeeds in changing true->false gets through
MetaDatum comparison_meta_datum = old_meta_datum;
comparison_meta_datum.occupied = true;
MetaDatum new_meta_datum = MetaDatum{old_counter+1, false};
if (!meta_data[old_index].compare_exchange_strong(comparison_meta_datum, new_meta_datum)){
continue;
}
// pop the value non-atomically because there is no other thread competing here
T* ptr = container[old_index];
// update the front index without CAS because only the thread that succeeded in the above CAS should reach here
front.store((old_index+1)%_capacity);
return ptr;
}
}
/* Testing */
std::atomic counter{1};
std::atomic sum{0};
std::atomic out_sum{0};
size_t max_counter = 1000;
std::atomic done{false};
void keep_popping(MPMCQueue& MPMCQueue){
int* next_ptr = MPMCQueue.pop();
while (!done || next_ptr!=nullptr){
if (next_ptr == nullptr){
next_ptr = MPMCQueue.pop();
continue;
}
out_sum.fetch_add(*next_ptr);
delete next_ptr;
next_ptr = MPMCQueue.pop();
}
}
void keep_pushing(MPMCQueue& MPMCQueue){
while (true){
size_t old_counter = counter.fetch_add(1);
if (old_counter > max_counter){
done.store(true);
break;
}
sum.fetch_add(old_counter);
int* x = new int(old_counter);
while(!MPMCQueue.try_push(x));
}
}
int main(){
assert(std::atomic{}.is_always_lock_free == true); // must be guaranteed lock free on this system for the ring buffer to work
assert(std::atomic{}.is_always_lock_free == true);
int num_threads = 8;
size_t capacity = 10;
MPMCQueue MPMCQueue{capacity};
std::vector threadpool{};
for (int i=0; i
Подробнее здесь: [url]https://stackoverflow.com/questions/79551314/designing-a-lock-free-circular-queue-with-cas[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия