Нарушение памяти при выполнении многопоточного кода, составленного с компилятором Intel C ++ 2025C++

Программы на C++. Форум разработчиков
Ответить
Anonymous
 Нарушение памяти при выполнении многопоточного кода, составленного с компилятором Intel C ++ 2025

Сообщение Anonymous »

Я использую базовую Threadpool, найденную по адресу
https://codereview.stackexchange.com/qu ... 4444444444 /> , к которому я применил некоторые исправления /улучшение, предложенные в ответе: < /p>

Код: Выделить всё

#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include 
#include 
#include "concurrent_queue.h"
#include 

class ThreadPool
{
public:
using Task = std::packaged_task;
using TaskHandle = std::future;
private:
//  The one and only instance
static ThreadPool instance_;
//  The task queue
concurrent_queue concurrent_queue_;
//  The threads
std::vector threads_;
//  Active indicator
bool is_active_;
//  Interruption indicator
bool interrupt_;
//  Thread number
static thread_local size_t my_tls_num_;

//  The function that is executed on every thread
void threadFunc(const size_t num)
{
my_tls_num_ = num;
//  "Infinite" loop, only broken on destruction
while (auto t = concurrent_queue_.pop())
{
(*t)();
}
}

//  The constructor stays private, ensuring single instance
ThreadPool() : is_active_(false), interrupt_(false) {}

public:
//  Access the instance
static ThreadPool* getInstance()
{
return &instance_;
}
//  Number of threads
size_t numThreads() const { return threads_.size(); }
//  The number of the caller thread
static size_t threadNum() { return my_tls_num_; }
//  Starter
void start(const size_t nThread = std::thread::hardware_concurrency() - 1)
{
if (!is_active_)  //  Only start once
{
threads_.reserve(nThread);

//  Launch threads on threadFunc and keep handles in a vector
for (size_t i = 0; i < nThread; i++)
{
std::thread loc = std::thread(&ThreadPool::threadFunc, this, i + 1);
threads_.push_back(std::move(loc));
}

is_active_ = true;
}
}
//dtor
~ThreadPool()
{
stop();
}
void stop()
{
if (is_active_)
{
//  Interrupt mode
interrupt_ = true;
//  Interrupt all waiting threads
concurrent_queue_.interrupt();
//  Wait for them all to join
for_each(threads_.begin(), threads_.end(), std::mem_fn(&std::thread::join));
//  Clear all threads
threads_.clear();
//  Clear the queue and reset interrupt
concurrent_queue_.clear();
concurrent_queue_.reset_interrupt();
//  Mark as inactive
is_active_ = false;
//  Reset interrupt
interrupt_ = false;
}
}
//  Forbid copies etc
ThreadPool(const ThreadPool& rhs) = delete;
ThreadPool& operator=(const ThreadPool& rhs) = delete;
ThreadPool(ThreadPool&& rhs) = delete;
ThreadPool& operator=(ThreadPool&& rhs) = delete;
//  Spawn task
template
TaskHandle spawnTask(Callable c)
{
Task t(std::move(c));
TaskHandle f = t.get_future();
concurrent_queue_.push(std::move(t));
return f;
}

//  Run queued tasks synchronously
//  while waiting on a future,
//  returns true if at least one task was run
bool activeWait(const TaskHandle&  f)
{
using namespace std::chrono_literals;
bool b = false;
//  Check whether or not the future is ready without blocking
//  by waiting 0 seconds and returning status
while (f.wait_for(0s) != std::future_status::ready)
{
//  Non blocking
if (auto t = concurrent_queue_.try_pop())
{
(*t)();
b = true;
}
else // Nothing in the queue: go to sleep
{
f.wait();
}
}
return b;
}
};

inline ThreadPool ThreadPool::instance_;
inline thread_local size_t ThreadPool::my_tls_num_ = 0;

#endif//THREAD_POOL_H
< /code>
Я закодировал простой класс консультанта: < /p>
#include 
#include "ThreadPool.h"
#include "func.h"

class Advisor
{
private:
static Advisor * pinstance_;
static std::mutex mutex_;
protected:
Advisor(const size_t nThread = std::thread::hardware_concurrency() - 1)
{
ThreadPool::getInstance()->start(nThread);
}
virtual  ~Advisor()
{
ThreadPool::getInstance()->stop();
}
public:
Advisor(const Advisor& rhs) = delete;
Advisor& operator=(const Advisor& rhs) = delete;
Advisor(Advisor&& rhs) = delete;
Advisor& operator=(Advisor&& rhs) = delete;

static Advisor* GetInstance(const size_t nThread = std::thread::hardware_concurrency() - 1);

double some_func(const size_t N)
{
return some_func_calc(N);
}
};

/**
* Static methods should be defined outside the class.
*/
Advisor* Advisor::pinstance_{ nullptr };
std::mutex Advisor::mutex_;

/**
* The first time we call GetInstance we will lock the storage location
*      and then we make sure again that the variable is null and then we
*      set the value.  RU:
*/
Advisor* Advisor::GetInstance(const size_t nThread)
{
std::lock_guard lock(mutex_);
if (pinstance_ == nullptr)
{
pinstance_ = new Advisor(nThread);
}
return pinstance_;
}
< /code>
на основе свободной функции, которая выполняет среднее, разделяя цикл для различных потоков: < /p>
inline double some_func_calc(const size_t nbPaths)
{
const std::unique_ptr rng_unique_ptr = std::make_unique();
std::vector resultMat(nbPaths);

ThreadPool::getInstance()->start(std::thread::hardware_concurrency() - 1);
ThreadPool* pool = ThreadPool::getInstance();
const size_t nThread = pool->numThreads();

std::vector unifVecs(nThread + 1);    //  +1 for the main thread ;)
for (auto& vec : unifVecs)
vec.resize(1);
std::vector paths(nThread + 1);

//  One RNG per thread
std::vector rngs(nThread + 1);
for (auto& random : rngs)
{
random = rng_unique_ptr->clone();
random->init(1);
}

#define BATCHSIZE size_t{64}

//  Reserve memory for futures
std::vector futures;

futures.reserve(nbPaths / BATCHSIZE + 1);

//  Start
size_t firstPath = 0;
size_t pathsLeft = nbPaths;

while (pathsLeft > 0)
{
size_t pathsInTask = std::min(pathsLeft, BATCHSIZE);

futures.push_back(pool->spawnTask([&, firstPath, pathsInTask]()
{
//  Inside the parallel task,
//  pick the right pre-allocated vectors
const size_t threadNum = pool->threadNum();
std::vector& unifVec = unifVecs[threadNum];
std::vector& path = paths[threadNum];

//  Get a RNG and position it correctly
auto& random = rngs[threadNum];
random->skipTo(firstPath);

//  And conduct the simulations
for (size_t i = 0; i < pathsInTask; i++)
{
//  Next Gaussian vector, dimension D
random->nextU(unifVec);
//value on path
double avg_of_elems = std::reduce(unifVec.begin(), unifVec.end()) / unifVec.size();
//store result
resultMat[firstPath + i] = avg_of_elems;
}
//  Remember tasks must return bool
return true;
}));

pathsLeft -= pathsInTask;
firstPath += pathsInTask;
}

for (auto& future : futures)
pool->activeWait(future);

double actual = std::reduce(resultMat.begin(), resultMat.end()) / resultMat.size();

return actual;
}
< /code>
на основе простого случайного генератора: < /p>
#ifndef MY_RNG_H
#define MY_RNG_H

#include 
#include 

enum RngType
{
sob = 0,
};

class my_rng
{
public:

//  Initialise with dimension simDim
virtual void init(const size_t simDim) = 0;
virtual size_t dim() const = 0;

//  Compute the next vector[simDim] of independent Uniforms or Gaussians
//  The vector is filled by the function and must be pre-allocated
virtual void nextU(std::vector& uVec) = 0;

virtual std::unique_ptr clone() const = 0;

virtual ~my_rng() = default;

//  Skip ahead
virtual void skipTo(const unsigned b) = 0;
};

#endif//MY_RNG_H
< /code>
с бетонным классом: < /p>
#ifndef SOBOL_H
#define SOBOL_H

#include 
#include "my_rng.h"

#define ONEOVER2POW32 2.3283064365387E-10

const unsigned* const* getjkDir();

class sobol : public my_rng
{
//  Dimension
size_t                      myDim;

//  State Y
std::vector            myState;

//  Current index in the sequence
unsigned                    myIndex;

//  The direction numbers listed in sobol.cpp
//  Note jkDir[i][dim] gives the i-th (0 to 31)
//      direction number of dimension dim
const unsigned* const* jkDir;

public:
size_t dim() const override
{
return myDim;
}
//  Virtual copy constructor
std::unique_ptr clone() const override
{
return std::make_unique(*this);
}

//  Initializer
void init(const size_t simDim) override
{
//  Set pointer on direction numbers
jkDir = getjkDir();

//  Dimension
myDim = simDim;
myState.resize(myDim);

//  Reset to 0
reset();
}

void reset()
{
//  Set state to 0
memset(myState.data(), 0, myDim * sizeof(unsigned));
//  Set index to 0
myIndex = 0;
}

//  Next point
void next()
{
//  Gray code, find position j
//      of rightmost zero bit of current index n
unsigned n = myIndex, j = 0;
while (n & 1)
{
n >>= 1;
++j;
}

//  Direction numbers
const unsigned* dirNums = jkDir[j];

//  XOR the appropriate direction number
//      into each component of the integer sequence
for (int i = 0; i < myDim; ++i)
{
myState[i] ^= dirNums[i];
}

//  Update count
++myIndex;
}

void nextU(std::vector& uVec) override
{
next();
transform(myState.begin(), myState.end(), uVec.begin(),
[](const unsigned long i)
{
return ONEOVER2POW32 * i;
});
}

//  Skip ahead (from 0 to b)
void skipTo(const unsigned b) override
{
//  Check skip
if (!b) return;

//  Reset Sobol to 0
reset();

//  The actual Sobol skipping algo
unsigned im = b;
unsigned two_i = 1, two_i_plus_one = 2;

unsigned i = 0;
while (two_i 

Подробнее здесь: [url]https://stackoverflow.com/questions/79691095/memory-violation-at-execution-of-multithreaded-code-compiled-with-intel-c-2025[/url]
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «C++»