Код: Выделить всё
#pragma once
#include
#include
#include
#include
#include
#include
#include
#include
class ThreadPool
{
public:
ThreadPool(size_t numThreads);
~ThreadPool();
template
auto enqueue(F&& f) -> std::future;
private:
std::vector workers;
std::queue tasks;
std::mutex queueMutex;
std::condition_variable condition;
bool stop;
};
template
inline auto ThreadPool::enqueue(F&& f) -> std::future
{
using return_type = typename std::invoke_result::type;
auto task = std::make_shared(std::forward(f));
std::future res = task->get_future();
{
std::lock_guard lock(queueMutex);
tasks.emplace([task]() { (*task)(); });
}
condition.notify_one();
return res;
}
Код: Выделить всё
#include
ThreadPool::ThreadPool(size_t numThreads) :
stop(false)
{
workers.reserve(numThreads);
for (size_t i = 0; i < numThreads; ++i) {
LOG("thread 0");
workers.emplace_back([&]() {
LOG("thread 1");
for (;;) {
LOG("thread 2");
std::function task;
{
std::unique_lock lock(this->queueMutex);
this->condition.wait(lock, [this] {
return this->stop || !this->tasks.empty();
});
if (this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
});
}
}
ThreadPool::~ThreadPool()
{
{
std::unique_lock lock(queueMutex);
stop = true;
}
condition.notify_all();
for (std::thread& worker : workers) {
worker.join();
}
}
Код: Выделить всё
std::condition_variable cv;
std::mutex cv_mutex;
size_t completed_tasks = 0;
bool process_file(const std::string& p_src, std::vector& xmls, std::vector& skipped,
std::mutex& xmls_mutex, std::mutex& skipped_mutex)
{
try {
XMLfile xml_file(static_cast(p_src));
if (xml_file.has_value()) {
std::lock_guard lock(xmls_mutex);
xmls.push_back(xml_file);
return true;
} else {
std::lock_guard lock(skipped_mutex);
skipped.push_back(p_src + " : has no value");
}
} catch (const std::exception& e) {
LOG("Error processing {}: {}", p_src, e.what());
} catch (...) {
LOG("Unknown error processing {}", p_src);
}
return false;
}
void parse_files(const std::filesystem::path& from, std::vector& xmls, std::vector& skipped)
{
std::mutex xmls_mutex;
std::mutex skipped_mutex;
xmls.clear();
skipped.clear();
LOG.clearLog();
LOG("started success!");
std::vector filtered_files;
for (const auto& entry : std::filesystem::directory_iterator(from)) {
auto p_src = entry.path().string();
try {
if (entry.file_size() < 10) {
skipped.emplace_back(p_src + " : < 10 bytes");
continue;
}
if (entry.path().extension() != ".xml") {
skipped.emplace_back(p_src + " : not .xml");
continue;
}
filtered_files.emplace_back(p_src);
} catch (const std::exception& e) {
LOG("Error processing entry {}: {}", p_src, e.what());
} catch (...) {
LOG("Unknown error processing entry {}", p_src);
}
}
LOG("skipped : {}", skipped.size());
LOG("filtered_files : {}", filtered_files.size());
ThreadPool pool(filtered_files.size());
std::vector futures;
for (const auto& p_src : filtered_files) {
futures.push_back(pool.enqueue([=, &xmls, &skipped, &xmls_mutex, &skipped_mutex]() -> bool {
bool result = process_file(p_src, xmls, skipped, xmls_mutex, skipped_mutex);
{
std::lock_guard lock(cv_mutex);
completed_tasks++;
}
cv.notify_one();
return result;
}));
}
{
std::unique_lock lock(cv_mutex);
cv.wait(lock, [&] { return completed_tasks == futures.size(); });
}
size_t counter = 0;
for (const auto& s : xmls) {
LOG("{}) {} success", counter++, s.filename());
}
counter = 0;
for (const auto& s : skipped) {
LOG("{}) {} ", counter++, s);
}
}
Код: Выделить всё
2024-11-20 00:02:08 - skipped : 13
2024-11-20 00:02:08 - filtered_files : 632
2024-11-20 00:02:10 - thread 0
2024-11-20 00:02:11 - thread 0
2024-11-20 00:02:12 - thread 0
...
(632 times thread 0)
Что у меня не получается?
Поток создан, имеет идентификатор, но лямбда никогда не будет вызвана.
Может ли пожалуйста, помогите мне понять, что я делаю не так? Спасибо!
Подробнее здесь: https://stackoverflow.com/questions/792 ... n-creation
Мобильная версия