Почему libarchive замедляется при одновременном использовании в параллельных потоках? [закрыто]C++

Программы на C++. Форум разработчиков
Ответить Пред. темаСлед. тема
Anonymous
 Почему libarchive замедляется при одновременном использовании в параллельных потоках? [закрыто]

Сообщение Anonymous »

Я хочу распаковать и прочитать данные нескольких файлов в .tar.gz на C++.
Сначала я использовал libarchive, так как он быстрый, но когда я пытаюсь сделайте это в 4 разных потоках, это суммирует процессы распаковки и чтения (я не измерял, был ли это процесс распаковки или чтения), и все различные файлы .tar.gz, обработанные в разных потоках, заканчиваются тем, что я жду дольше.
Например, файл распаковывается и читается за 4 секунды. Когда 4 разных потока одновременно выполняют одну и ту же работу, это занимает примерно 16 секунд. Я думаю, это из-за способа обработки libarchive.
Я пробовал Intel igzip для распаковки и microtar для чтения, но в целом он медленнее, чем libarchive, обрабатывающий один файл.
Это код, который сравнивает время одного потока и нескольких потоков (4 потока). Фактическое увеличение времени ожидания при одновременной (должной) обработке находится в этом цикле и функции в моем проекте:

Код: Выделить всё

while (archive_read_next_header(a, &entry) == ARCHIVE_OK) {
processEntry(a, entry);
}
Вот код упрощен, но все делается почти так же, как в моем проекте:
Он пробует процесс на 1.tar.gz в одиночном режиме. поток 10 раз и выводит среднее общее время и среднее время цикла ProcessEntry while.
Затем он пытается сделать то же самое на 1.tar.gz, 2.tar.gz, 3.tar .gz и 4.tar.gz одновременно с потоками и печатает то же самое.
А еще вот как я сгенерировал файлы с помощью этой команды в Linux:

Код: Выделить всё

for i in {1..101}; do dd if=/dev/urandom of=file_$i.txt bs=1M count=1 status=none; done && tar -czf 1.tar.gz file_*.txt && rm file_*.txt

Код: Выделить всё

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

std::mutex printMutex;

class ArchiveProcessor {
public:
static constexpr size_t BUFFER_SIZE = 1024 * 1024;
std::vector processBuffer;
std::vector memoryBuffer;
size_t currentPos = 0;

void processEntry(struct archive* a, struct archive_entry* entry) {
const size_t size = archive_entry_size(entry);
if (size == 0) return;

if (processBuffer.capacity() < size) {
processBuffer.reserve(size);
}
processBuffer.resize(size);

size_t totalRead = 0;
while (totalRead < size) {
const void* buff;
size_t readSize;
int64_t offset;

int r = archive_read_data_block(a, &buff, &readSize, &offset);
if (r != ARCHIVE_OK) break;

if (offset + readSize > size) break;

std::memcpy(processBuffer.data() + offset, buff, readSize);
totalRead += readSize;
}
}

std::pair processFile(const std::string& filepath) {
auto totalStartTime = std::chrono::high_resolution_clock::now();

int fd = open(filepath.c_str(), O_RDONLY);
if (fd == -1) return {-1, -1};

size_t fileSize = std::filesystem::file_size(filepath);
void* mapped = mmap(nullptr, fileSize, PROT_READ, MAP_PRIVATE, fd, 0);
if (mapped == MAP_FAILED) {
close(fd);
return {-1, -1};
}

madvise(mapped, fileSize, MADV_SEQUENTIAL | MADV_WILLNEED);

memoryBuffer = std::vector(
static_cast(mapped),
static_cast(mapped) + fileSize
);

currentPos = 0;
struct archive* a = archive_read_new();
archive_read_support_filter_gzip(a);
archive_read_support_format_tar(a);

int r = archive_read_open_memory(a, memoryBuffer.data(), memoryBuffer.size());

auto processStartTime = std::chrono::high_resolution_clock::now();

if (r == ARCHIVE_OK) {
struct archive_entry* entry;
while (archive_read_next_header(a, &entry) == ARCHIVE_OK) {
processEntry(a, entry);
}
}

auto totalDuration = std::chrono::duration_cast(
std::chrono::high_resolution_clock::now() - totalStartTime).count();

auto processDuration = std::chrono::duration_cast(
std::chrono::high_resolution_clock::now() - processStartTime).count();

archive_read_close(a);
archive_read_free(a);
munmap(mapped, fileSize);
close(fd);

return {totalDuration, processDuration};
}
};

void runSingleThread(const std::string& file, int iterations) {
std::vector totalTimes;
std::vector processTimes;
ArchiveProcessor processor;

for(int i = 0; i < iterations; i++) {
auto [total, process] = processor.processFile(file);
totalTimes.push_back(total);
processTimes.push_back(process);
}

double totalAvg = std::accumulate(totalTimes.begin(), totalTimes.end(), 0.0) / iterations;
double processAvg = std::accumulate(processTimes.begin(), processTimes.end(), 0.0) / iterations;

std::cout 

Подробнее здесь: [url]https://stackoverflow.com/questions/79250598/why-does-libarchive-slow-down-when-used-in-concurrent-threads-at-the-same-time[/url]
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • RejectedElementException при использовании ManagedBlocker во вложенных параллельных потоках
    Anonymous » » в форуме JAVA
    0 Ответы
    8 Просмотры
    Последнее сообщение Anonymous
  • RejectedExecutionException при использовании ManagedBlocker во вложенных параллельных потоках
    Anonymous » » в форуме JAVA
    0 Ответы
    9 Просмотры
    Последнее сообщение Anonymous
  • RejectedExecutionException при использовании ManagedBlocker во вложенных параллельных потоках
    Anonymous » » в форуме JAVA
    0 Ответы
    16 Просмотры
    Последнее сообщение Anonymous
  • Kafka Consumer.poll занимает разное время в параллельных потоках
    Anonymous » » в форуме JAVA
    0 Ответы
    24 Просмотры
    Последнее сообщение Anonymous
  • Как обеспечить порядок обработки в параллельных потоках?
    Anonymous » » в форуме JAVA
    0 Ответы
    10 Просмотры
    Последнее сообщение Anonymous

Вернуться в «C++»