Как запустить несколько потоков одновременно?C++

Программы на C++. Форум разработчиков
Ответить
Anonymous
 Как запустить несколько потоков одновременно?

Сообщение Anonymous »

Оценивая библиотеки пулов потоков для кратковременных задач, я заметил, что все они работают значительно хуже, чем OpenMP. Основная причина, по-видимому, заключается в том, что другие библиотеки с трудом запускают несколько потоков одновременно, в то время как OpenMP каким-то образом может это сделать.
Чтобы продемонстрировать проблему, я создал упрощенный пример Parallel_for. Я запускаю 8 потоков, а затем заставляю их ждать либо с помощью std::condition_variable, либо с помощью просмотра вращения с использованием std::atomic, пока не будет получен сигнал об их запуске. Это делается для того, чтобы исключить накладные расходы на запуск потоков. Время начала и окончания каждого потока записывается в память, а затем записывается в файл для визуализации. Я также распараллеливаю тот же объем работы, используя OpenMP.
Результаты можно увидеть ниже. Потоки не начинают свою работу одновременно при использовании обычной блокировки или спин-блокировки, но при использовании OpenMP все потоки начинаются примерно в одно и то же время.
Я скомпилировал с помощью g++ -O3 -fopenmp -lm -std=c++20 main.cpp -o main и провел эксперимент на процессоре i5-10300H с 8 ядрами (4 из них «настоящие»).
Изображение

main.cpp

#include
#include
#include
#include
#include
#include
#include
#include

size_t num_threads = 8;

double sec(){
std::chrono::duration d = std::chrono::high_resolution_clock::now().time_since_epoch();
return d.count();
}

void work(){
volatile double accumulator = 0.0;
for (size_t i = 0; i < 10 * 1000; i++){
accumulator += std::sin(i);
}
}

struct LogItem {
double start, end;
size_t thread_id;
};

void write_log(const std::vector& log, const char* filename) {
FILE* f = fopen(filename, "w");
fprintf(f, "start,end,thread_id\n");
for (const auto& item : log) {
fprintf(f, "%f,%f,%zu\n", item.start, item.end, item.thread_id);
}
fclose(f);
}

void parallel_for_lock(){
std::vector threads;
std::vector log(num_threads);

std::mutex mtx;
std::condition_variable cv;
bool start_flag = false;

for (size_t thread_id = 0; thread_id < num_threads; thread_id++){
threads.emplace_back([thread_id, &log, &mtx, &cv, &start_flag]{
// wait until start
{
std::unique_lock lock(mtx);
cv.wait(lock, [&start_flag]{ return start_flag; });
}
// do work and log the time it takes
double start = sec();
work();
double end = sec();
log[thread_id] = LogItem{start, end, thread_id};
});
}

// (attempt to) start all threads at once
{
std::lock_guard lock(mtx);
start_flag = true;
}
cv.notify_all();

for (auto& thread : threads){
thread.join();
}

write_log(log, "log_lock.csv");
}

void parallel_for_spin_lock(){
std::vector threads;
std::vector log(num_threads);

std::atomic start_flag{false};

for (size_t thread_id = 0; thread_id < num_threads; thread_id++){
threads.emplace_back([thread_id, &log, &start_flag]{
while (!start_flag.load(std::memory_order_acquire));
double start = sec();
work();
double end = sec();
log[thread_id] = LogItem{start, end, thread_id};
});
}

start_flag.store(true, std::memory_order_release);

for (auto& thread : threads){
thread.join();
}

write_log(log, "log_spin_lock.csv");
}

void parallel_for_omp(){
std::vector log(num_threads);

#pragma omp parallel for
for (size_t thread_id = 0; thread_id < num_threads; thread_id++){
double start = sec();
work();
double end = sec();
log[thread_id] = LogItem{start, end, thread_id};
}

write_log(log, "log_omp.csv");
}

int main(){
// run a few times for warmup
for (size_t i = 0; i < 10; i++){
parallel_for_lock();
parallel_for_spin_lock();
parallel_for_omp();
}

return 0;
}

plot_results.py

import csv, matplotlib.pyplot as plt

def plot_log(filename):
with open(filename) as f:
rows = list(csv.DictReader(f))

thread_ids = [int(row["thread_id"]) for row in rows]

# convert to ms
start_times = [float(row["start"]) * 1e3 for row in rows]
end_times = [float(row["end"]) * 1e3 for row in rows]

# start time at 0
min_time = min(start_times)
start_times =
end_times = [e - min_time for e in end_times]

for start, end, tid in zip(start_times, end_times, thread_ids):
plt.barh(tid, end - start, left=start)

plt.xlabel("Time [ms]")
plt.ylabel("Thread ID")
plt.yticks(range(len(thread_ids)))
plt.grid(axis="y", alpha=0.5)
plt.xlim([0, 2])

def main():
plt.figure(figsize=(10, 16))
for i, name in enumerate(["lock", "spin_lock", "omp"], 1):
plt.subplot(3, 1, i)
plot_log(f"log_{name}.csv")
plt.title(name)
plt.tight_layout()
plt.show()

if __name__ == "__main__":
main()


Подробнее здесь: https://stackoverflow.com/questions/798 ... ly-at-once
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «C++»