Сделайте поиск внутри блокировки подкруглого буфера свободным в Producer Consumer.C++

Программы на C++. Форум разработчиков
Ответить
Anonymous
 Сделайте поиск внутри блокировки подкруглого буфера свободным в Producer Consumer.

Сообщение Anonymous »

У меня есть класс QIODevice, который обертывает поток фонового производителя. Он потребляет данные из исходного QIODevice в потоке-производителе и предоставляет их через стандартный API QIODevice (чтение, поиск и т. д.) в основном потоке. Он считывает данные в кольцевом буфере, производитель добавляет данные в конце, а потребитель потребляет их с самого начала. Я также поддерживаю внутри этого подкруглую очередь, чтобы обеспечить быстрый поиск, когда позиция поиска находится в уже прочитанных данных. Обратите внимание, что здесь возможны только два потока - производитель и потребитель, т.е. это один производитель и один потребитель.
Мне удалось реализовать чтение без блокировки, когда данные уже находятся в буфере, и теперь пытаюсь реализовать поиск без блокировки по быстрому пути, т.е. когда позиция поиска находится в уже прочитанном буфере, и здесь я столкнулся с некоторыми проблемами. По сути, я не могу придумать решение для правильного/безопасного обновления переменной чтения слева (m_readLeft) в этой переменной быстрого поиска. И если я попытаюсь устранить m_readLeft, то поддержание m_count внутри makeSpaceForMoreReading станет затруднительным. Может ли кто-нибудь с большим опытом помочь мне в этом?
#include
#include
#include
#include
#include
#include

/**
* @brief A QIODevice that wraps a background producer thread.
* It consumes data from a source QIODevice and provides it via
* the standard QIODevice API (read, seek, etc.) asynchronously.
*/
class AsyncBufferedReader : public QIODevice
{
Q_OBJECT
public:
class Buffer
{
Q_DISABLE_COPY(Buffer)
public:
Buffer() = default;

Buffer(size_t capacity)
: m_buffer((char *) std::malloc(capacity))
, m_capacity(capacity)
{}

Buffer(Buffer &&buffer)
: m_buffer(std::exchange(buffer.m_buffer, nullptr))
, m_capacity(std::exchange(buffer.m_capacity, 0))
{}

Buffer &operator=(Buffer &&buffer)
{
if (this == &buffer)
return *this;
if (m_buffer)
std::free(m_buffer);
m_buffer = std::exchange(buffer.m_buffer, nullptr);
m_capacity = std::move(buffer.m_capacity);
return *this;
}

~Buffer()
{
if (m_buffer)
std::free(m_buffer);
}

void alloc(size_t cap)
{
if (m_capacity == 0) {
m_buffer = (char *) std::malloc(cap);
m_capacity = cap;
} else {
m_buffer = (char *)std::realloc(m_buffer, cap);
m_capacity = cap;
}
}

size_t capacity() const { return m_capacity; }

char &operator[](const size_t index)
{
assert(index >= 0 && index < m_capacity);
return m_buffer[index];
}

const char &operator[](const size_t index) const
{
assert(index >= 0 && index < m_capacity);
return m_buffer[index];
}

private:
char *m_buffer = nullptr;
size_t m_capacity = 0;
};

static qint64 idealBufferCapacity(qint64 sourceSize);

static constexpr size_t default_capacity = 2 * 1024 * 1024; // 2MB

explicit AsyncBufferedReader(QObject *parent = nullptr);
explicit AsyncBufferedReader(Buffer &&buffer, size_t capacity, QObject *parent = nullptr);
explicit AsyncBufferedReader(size_t capacity, QObject *parent = nullptr);
~AsyncBufferedReader();

/** Starts the worker thread. Takes ownership of the source device. */
bool openSource(std::unique_ptr source,
qint64 startPos = 0,
QIODevice::OpenMode openMode = QIODevice::ReadOnly);

void abort();

// QIODevice overrides
bool isSequential() const override { return false; }
qint64 size() const override;
bool seek(qint64 pos) override;
void close() override;
Buffer closeAndReleaseBuffer();

protected:
qint64 readData(char *data, qint64 maxlen) override;
qint64 writeData(const char *, qint64) override { return -1; }

private:
friend class AsyncBufferedReaderTest;

// only support producer thread and once consumer thread
void runWorker(std::unique_ptr source, qint64 startPos);
void handleSeekInWorker(QIODevice *source, qint64 &currentPos);
void abortWorkerAndWait();

// thread safe
bool canMakeSpaceForMoreReading();

// requires mutex to be locked
bool makeSpaceForMoreReading();

mutable QMutex m_mutex;
QWaitCondition m_dataWait;
QWaitCondition m_bufferSpaceWait;
QWaitCondition m_seekFinishedWait;
QWaitCondition m_threadFinishedWait;

Buffer m_buffer;
const size_t m_capacity;

// denotes full circular queue with all the data
size_t m_head = 0;
size_t m_tail = 0;
std::atomic m_count = 0; // m_buffer ptr is only valid if count>0, only reading from m_count is thread safe, modification should be under lock

// sub circular queue of next read
// this is maintained to allow fast backward seeks
std::atomic m_readPos = 0;
std::atomic m_readLeft = 0;

bool m_workerRunning {false};
std::atomic m_aborted {false};
bool m_seekRequested{false};
std::atomic m_seekPos{0};

qint64 m_totalSourceSize = 0;
bool m_seekSuccess = false;
bool m_sourceEof = false;
};

#include "AsyncBufferedReader.h"
#include
#include
#include
#include
#include
#include

constexpr qint64 CHUNK_SIZE = 256 * 1024;

// use macro to not get quotes in qDebug output
#define formatMiB(bytes) (QString("%1 MiB").arg(QString::number(static_cast(bytes) / (1024. * 1024), 'f')).toStdString().c_str())

qint64 AsyncBufferedReader::idealBufferCapacity(qint64 sourceSize)
{
return std::clamp(sourceSize * .6,
qMin(sourceSize, 200 * 1024 * 1024),
700 * 1024 * 1024);
}

AsyncBufferedReader::AsyncBufferedReader(QObject *parent)
: AsyncBufferedReader(default_capacity, parent)
{}

AsyncBufferedReader::AsyncBufferedReader(AsyncBufferedReader::Buffer &&buffer,
size_t capacity,
QObject *parent)
: AsyncBufferedReader(capacity, parent)
{
m_buffer = std::move(buffer);
}

AsyncBufferedReader::AsyncBufferedReader(size_t capacity, QObject *parent)
: QIODevice(parent)
, m_capacity(capacity)
{
auto timer = new QTimer(this);
connect(timer, &QTimer::timeout, this, [this]() {
qDebug() start([this, src = std::move(source), startPos]() mutable {
runWorker(std::move(src), startPos);
});

return true;
}

void AsyncBufferedReader::runWorker(std::unique_ptr source, qint64 startPos)
{
auto cleanup = qScopeGuard([&] {
QMutexLocker locker(&m_mutex);
m_workerRunning = false;
m_dataWait.notify_all();
m_threadFinishedWait.notify_all();
});

if (startPos > 0 && !source->seek(startPos))
return;

if (m_buffer.capacity() < m_capacity)
m_buffer.alloc(m_capacity);

qint64 currentPos = startPos;

QMutexLocker locker(&m_mutex);
while (!m_aborted.load()) {
if (m_seekRequested) {
handleSeekInWorker(source.get(), currentPos);
continue;
}

// Wait as long as the buffer is absolutely full
while ((m_sourceEof || m_count == m_capacity) && !m_aborted && !m_seekRequested) {
m_bufferSpaceWait.wait(&m_mutex);
}

if (m_aborted || m_seekRequested || m_sourceEof)
continue;

// --- 2. Calculate Contiguous Space ---
// spaceAtTail is the linear memory available before we have to wrap
size_t spaceAtTail = m_capacity - m_tail;
// totalBufferSpace is how much we can add before hitting m_head
size_t totalBufferSpace = m_capacity - m_count;

size_t recommendedReadSize = 0;
if (m_count == 0)
recommendedReadSize = 32 * 1024;
else if (m_count < 1024 * 1024)
recommendedReadSize = 128 * 1024;
else
recommendedReadSize = 1024 * 1024;
assert(recommendedReadSize > 0);

// We can only read the smaller of:
// - Our desired chunk size
// - The linear space until the end of the vector
// - The total space remaining in the buffer
size_t toRead = std::min({recommendedReadSize, spaceAtTail, totalBufferSpace});

if (toRead == 0)
continue;

locker.unlock();
qint64 bytesRead = source->read(&m_buffer[m_tail], toRead);
locker.relock();

if (bytesRead

Подробнее здесь: https://stackoverflow.com/questions/799 ... r-consumer
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «C++»