У меня есть класс QIODevice, который обертывает поток фонового производителя. Он потребляет данные из исходного QIODevice в потоке-производителе и предоставляет их через стандартный API QIODevice (чтение, поиск и т. д.) в основном потоке. Он считывает данные в кольцевом буфере, производитель добавляет данные в конце, а потребитель потребляет их с самого начала. Я также поддерживаю внутри этого подкруглую очередь, чтобы обеспечить быстрый поиск, когда позиция поиска находится в уже прочитанных данных. Обратите внимание, что здесь возможны только два потока - производитель и потребитель, т.е. это один производитель и один потребитель.
Мне удалось реализовать чтение без блокировки, когда данные уже находятся в буфере, и теперь пытаюсь реализовать поиск без блокировки по быстрому пути, т.е. когда позиция поиска находится в уже прочитанном буфере, и здесь я столкнулся с некоторыми проблемами. По сути, я не могу придумать решение для правильного/безопасного обновления переменной чтения слева (m_readLeft) в этой переменной быстрого поиска. И если я попытаюсь устранить m_readLeft, то поддержание m_count внутри makeSpaceForMoreReading станет затруднительным. Может ли кто-нибудь с большим опытом помочь мне в этом?
#include
#include
#include
#include
#include
#include
/**
* @brief A QIODevice that wraps a background producer thread.
* It consumes data from a source QIODevice and provides it via
* the standard QIODevice API (read, seek, etc.) asynchronously.
*/
class AsyncBufferedReader : public QIODevice
{
Q_OBJECT
public:
class Buffer
{
Q_DISABLE_COPY(Buffer)
public:
Buffer() = default;
Buffer(size_t capacity)
: m_buffer((char *) std::malloc(capacity))
, m_capacity(capacity)
{}
Buffer(Buffer &&buffer)
: m_buffer(std::exchange(buffer.m_buffer, nullptr))
, m_capacity(std::exchange(buffer.m_capacity, 0))
{}
Buffer &operator=(Buffer &&buffer)
{
if (this == &buffer)
return *this;
if (m_buffer)
std::free(m_buffer);
m_buffer = std::exchange(buffer.m_buffer, nullptr);
m_capacity = std::move(buffer.m_capacity);
return *this;
}
~Buffer()
{
if (m_buffer)
std::free(m_buffer);
}
void alloc(size_t cap)
{
if (m_capacity == 0) {
m_buffer = (char *) std::malloc(cap);
m_capacity = cap;
} else {
m_buffer = (char *)std::realloc(m_buffer, cap);
m_capacity = cap;
}
}
size_t capacity() const { return m_capacity; }
char &operator[](const size_t index)
{
assert(index >= 0 && index < m_capacity);
return m_buffer[index];
}
const char &operator[](const size_t index) const
{
assert(index >= 0 && index < m_capacity);
return m_buffer[index];
}
private:
char *m_buffer = nullptr;
size_t m_capacity = 0;
};
static qint64 idealBufferCapacity(qint64 sourceSize);
static constexpr size_t default_capacity = 2 * 1024 * 1024; // 2MB
explicit AsyncBufferedReader(QObject *parent = nullptr);
explicit AsyncBufferedReader(Buffer &&buffer, size_t capacity, QObject *parent = nullptr);
explicit AsyncBufferedReader(size_t capacity, QObject *parent = nullptr);
~AsyncBufferedReader();
/** Starts the worker thread. Takes ownership of the source device. */
bool openSource(std::unique_ptr source,
qint64 startPos = 0,
QIODevice::OpenMode openMode = QIODevice::ReadOnly);
void abort();
// QIODevice overrides
bool isSequential() const override { return false; }
qint64 size() const override;
bool seek(qint64 pos) override;
void close() override;
Buffer closeAndReleaseBuffer();
protected:
qint64 readData(char *data, qint64 maxlen) override;
qint64 writeData(const char *, qint64) override { return -1; }
private:
friend class AsyncBufferedReaderTest;
// only support producer thread and once consumer thread
void runWorker(std::unique_ptr source, qint64 startPos);
void handleSeekInWorker(QIODevice *source, qint64 ¤tPos);
void abortWorkerAndWait();
// thread safe
bool canMakeSpaceForMoreReading();
// requires mutex to be locked
bool makeSpaceForMoreReading();
mutable QMutex m_mutex;
QWaitCondition m_dataWait;
QWaitCondition m_bufferSpaceWait;
QWaitCondition m_seekFinishedWait;
QWaitCondition m_threadFinishedWait;
Buffer m_buffer;
const size_t m_capacity;
// denotes full circular queue with all the data
size_t m_head = 0;
size_t m_tail = 0;
std::atomic m_count = 0; // m_buffer ptr is only valid if count>0, only reading from m_count is thread safe, modification should be under lock
// sub circular queue of next read
// this is maintained to allow fast backward seeks
std::atomic m_readPos = 0;
std::atomic m_readLeft = 0;
bool m_workerRunning {false};
std::atomic m_aborted {false};
bool m_seekRequested{false};
std::atomic m_seekPos{0};
qint64 m_totalSourceSize = 0;
bool m_seekSuccess = false;
bool m_sourceEof = false;
};
#include "AsyncBufferedReader.h"
#include
#include
#include
#include
#include
#include
constexpr qint64 CHUNK_SIZE = 256 * 1024;
// use macro to not get quotes in qDebug output
#define formatMiB(bytes) (QString("%1 MiB").arg(QString::number(static_cast(bytes) / (1024. * 1024), 'f')).toStdString().c_str())
qint64 AsyncBufferedReader::idealBufferCapacity(qint64 sourceSize)
{
return std::clamp(sourceSize * .6,
qMin(sourceSize, 200 * 1024 * 1024),
700 * 1024 * 1024);
}
AsyncBufferedReader::AsyncBufferedReader(QObject *parent)
: AsyncBufferedReader(default_capacity, parent)
{}
AsyncBufferedReader::AsyncBufferedReader(AsyncBufferedReader::Buffer &&buffer,
size_t capacity,
QObject *parent)
: AsyncBufferedReader(capacity, parent)
{
m_buffer = std::move(buffer);
}
AsyncBufferedReader::AsyncBufferedReader(size_t capacity, QObject *parent)
: QIODevice(parent)
, m_capacity(capacity)
{
auto timer = new QTimer(this);
connect(timer, &QTimer::timeout, this, [this]() {
qDebug() start([this, src = std::move(source), startPos]() mutable {
runWorker(std::move(src), startPos);
});
return true;
}
void AsyncBufferedReader::runWorker(std::unique_ptr source, qint64 startPos)
{
auto cleanup = qScopeGuard([&] {
QMutexLocker locker(&m_mutex);
m_workerRunning = false;
m_dataWait.notify_all();
m_threadFinishedWait.notify_all();
});
if (startPos > 0 && !source->seek(startPos))
return;
if (m_buffer.capacity() < m_capacity)
m_buffer.alloc(m_capacity);
qint64 currentPos = startPos;
QMutexLocker locker(&m_mutex);
while (!m_aborted.load()) {
if (m_seekRequested) {
handleSeekInWorker(source.get(), currentPos);
continue;
}
// Wait as long as the buffer is absolutely full
while ((m_sourceEof || m_count == m_capacity) && !m_aborted && !m_seekRequested) {
m_bufferSpaceWait.wait(&m_mutex);
}
if (m_aborted || m_seekRequested || m_sourceEof)
continue;
// --- 2. Calculate Contiguous Space ---
// spaceAtTail is the linear memory available before we have to wrap
size_t spaceAtTail = m_capacity - m_tail;
// totalBufferSpace is how much we can add before hitting m_head
size_t totalBufferSpace = m_capacity - m_count;
size_t recommendedReadSize = 0;
if (m_count == 0)
recommendedReadSize = 32 * 1024;
else if (m_count < 1024 * 1024)
recommendedReadSize = 128 * 1024;
else
recommendedReadSize = 1024 * 1024;
assert(recommendedReadSize > 0);
// We can only read the smaller of:
// - Our desired chunk size
// - The linear space until the end of the vector
// - The total space remaining in the buffer
size_t toRead = std::min({recommendedReadSize, spaceAtTail, totalBufferSpace});
if (toRead == 0)
continue;
locker.unlock();
qint64 bytesRead = source->read(&m_buffer[m_tail], toRead);
locker.relock();
if (bytesRead
Подробнее здесь: https://stackoverflow.com/questions/799 ... r-consumer
Сделайте поиск внутри блокировки подкруглого буфера свободным в Producer Consumer. ⇐ C++
Программы на C++. Форум разработчиков
1772601649
Anonymous
У меня есть класс QIODevice, который обертывает поток фонового производителя. Он потребляет данные из исходного QIODevice в потоке-производителе и предоставляет их через стандартный API QIODevice (чтение, поиск и т. д.) в основном потоке. Он считывает данные в кольцевом буфере, производитель добавляет данные в конце, а потребитель потребляет их с самого начала. Я также поддерживаю внутри этого подкруглую очередь, чтобы обеспечить быстрый поиск, когда позиция поиска находится в уже прочитанных данных. Обратите внимание, что здесь возможны только два потока - производитель и потребитель, т.е. это один производитель и один потребитель.
Мне удалось реализовать чтение без блокировки, когда данные уже находятся в буфере, и теперь пытаюсь реализовать поиск без блокировки по быстрому пути, т.е. когда позиция поиска находится в уже прочитанном буфере, и здесь я столкнулся с некоторыми проблемами. По сути, я не могу придумать решение для правильного/безопасного обновления переменной чтения слева (m_readLeft) в этой переменной быстрого поиска. И если я попытаюсь устранить m_readLeft, то поддержание m_count внутри makeSpaceForMoreReading станет затруднительным. Может ли кто-нибудь с большим опытом помочь мне в этом?
#include
#include
#include
#include
#include
#include
/**
* @brief A QIODevice that wraps a background producer thread.
* It consumes data from a source QIODevice and provides it via
* the standard QIODevice API (read, seek, etc.) asynchronously.
*/
class AsyncBufferedReader : public QIODevice
{
Q_OBJECT
public:
class Buffer
{
Q_DISABLE_COPY(Buffer)
public:
Buffer() = default;
Buffer(size_t capacity)
: m_buffer((char *) std::malloc(capacity))
, m_capacity(capacity)
{}
Buffer(Buffer &&buffer)
: m_buffer(std::exchange(buffer.m_buffer, nullptr))
, m_capacity(std::exchange(buffer.m_capacity, 0))
{}
Buffer &operator=(Buffer &&buffer)
{
if (this == &buffer)
return *this;
if (m_buffer)
std::free(m_buffer);
m_buffer = std::exchange(buffer.m_buffer, nullptr);
m_capacity = std::move(buffer.m_capacity);
return *this;
}
~Buffer()
{
if (m_buffer)
std::free(m_buffer);
}
void alloc(size_t cap)
{
if (m_capacity == 0) {
m_buffer = (char *) std::malloc(cap);
m_capacity = cap;
} else {
m_buffer = (char *)std::realloc(m_buffer, cap);
m_capacity = cap;
}
}
size_t capacity() const { return m_capacity; }
char &operator[](const size_t index)
{
assert(index >= 0 && index < m_capacity);
return m_buffer[index];
}
const char &operator[](const size_t index) const
{
assert(index >= 0 && index < m_capacity);
return m_buffer[index];
}
private:
char *m_buffer = nullptr;
size_t m_capacity = 0;
};
static qint64 idealBufferCapacity(qint64 sourceSize);
static constexpr size_t default_capacity = 2 * 1024 * 1024; // 2MB
explicit AsyncBufferedReader(QObject *parent = nullptr);
explicit AsyncBufferedReader(Buffer &&buffer, size_t capacity, QObject *parent = nullptr);
explicit AsyncBufferedReader(size_t capacity, QObject *parent = nullptr);
~AsyncBufferedReader();
/** Starts the worker thread. Takes ownership of the source device. */
bool openSource(std::unique_ptr source,
qint64 startPos = 0,
QIODevice::OpenMode openMode = QIODevice::ReadOnly);
void abort();
// QIODevice overrides
bool isSequential() const override { return false; }
qint64 size() const override;
bool seek(qint64 pos) override;
void close() override;
Buffer closeAndReleaseBuffer();
protected:
qint64 readData(char *data, qint64 maxlen) override;
qint64 writeData(const char *, qint64) override { return -1; }
private:
friend class AsyncBufferedReaderTest;
// only support producer thread and once consumer thread
void runWorker(std::unique_ptr source, qint64 startPos);
void handleSeekInWorker(QIODevice *source, qint64 ¤tPos);
void abortWorkerAndWait();
// thread safe
bool canMakeSpaceForMoreReading();
// requires mutex to be locked
bool makeSpaceForMoreReading();
mutable QMutex m_mutex;
QWaitCondition m_dataWait;
QWaitCondition m_bufferSpaceWait;
QWaitCondition m_seekFinishedWait;
QWaitCondition m_threadFinishedWait;
Buffer m_buffer;
const size_t m_capacity;
// denotes full circular queue with all the data
size_t m_head = 0;
size_t m_tail = 0;
std::atomic m_count = 0; // m_buffer ptr is only valid if count>0, only reading from m_count is thread safe, modification should be under lock
// sub circular queue of next read
// this is maintained to allow fast backward seeks
std::atomic m_readPos = 0;
std::atomic m_readLeft = 0;
bool m_workerRunning {false};
std::atomic m_aborted {false};
bool m_seekRequested{false};
std::atomic m_seekPos{0};
qint64 m_totalSourceSize = 0;
bool m_seekSuccess = false;
bool m_sourceEof = false;
};
#include "AsyncBufferedReader.h"
#include
#include
#include
#include
#include
#include
constexpr qint64 CHUNK_SIZE = 256 * 1024;
// use macro to not get quotes in qDebug output
#define formatMiB(bytes) (QString("%1 MiB").arg(QString::number(static_cast(bytes) / (1024. * 1024), 'f')).toStdString().c_str())
qint64 AsyncBufferedReader::idealBufferCapacity(qint64 sourceSize)
{
return std::clamp(sourceSize * .6,
qMin(sourceSize, 200 * 1024 * 1024),
700 * 1024 * 1024);
}
AsyncBufferedReader::AsyncBufferedReader(QObject *parent)
: AsyncBufferedReader(default_capacity, parent)
{}
AsyncBufferedReader::AsyncBufferedReader(AsyncBufferedReader::Buffer &&buffer,
size_t capacity,
QObject *parent)
: AsyncBufferedReader(capacity, parent)
{
m_buffer = std::move(buffer);
}
AsyncBufferedReader::AsyncBufferedReader(size_t capacity, QObject *parent)
: QIODevice(parent)
, m_capacity(capacity)
{
auto timer = new QTimer(this);
connect(timer, &QTimer::timeout, this, [this]() {
qDebug() start([this, src = std::move(source), startPos]() mutable {
runWorker(std::move(src), startPos);
});
return true;
}
void AsyncBufferedReader::runWorker(std::unique_ptr source, qint64 startPos)
{
auto cleanup = qScopeGuard([&] {
QMutexLocker locker(&m_mutex);
m_workerRunning = false;
m_dataWait.notify_all();
m_threadFinishedWait.notify_all();
});
if (startPos > 0 && !source->seek(startPos))
return;
if (m_buffer.capacity() < m_capacity)
m_buffer.alloc(m_capacity);
qint64 currentPos = startPos;
QMutexLocker locker(&m_mutex);
while (!m_aborted.load()) {
if (m_seekRequested) {
handleSeekInWorker(source.get(), currentPos);
continue;
}
// Wait as long as the buffer is absolutely full
while ((m_sourceEof || m_count == m_capacity) && !m_aborted && !m_seekRequested) {
m_bufferSpaceWait.wait(&m_mutex);
}
if (m_aborted || m_seekRequested || m_sourceEof)
continue;
// --- 2. Calculate Contiguous Space ---
// spaceAtTail is the linear memory available before we have to wrap
size_t spaceAtTail = m_capacity - m_tail;
// totalBufferSpace is how much we can add before hitting m_head
size_t totalBufferSpace = m_capacity - m_count;
size_t recommendedReadSize = 0;
if (m_count == 0)
recommendedReadSize = 32 * 1024;
else if (m_count < 1024 * 1024)
recommendedReadSize = 128 * 1024;
else
recommendedReadSize = 1024 * 1024;
assert(recommendedReadSize > 0);
// We can only read the smaller of:
// - Our desired chunk size
// - The linear space until the end of the vector
// - The total space remaining in the buffer
size_t toRead = std::min({recommendedReadSize, spaceAtTail, totalBufferSpace});
if (toRead == 0)
continue;
locker.unlock();
qint64 bytesRead = source->read(&m_buffer[m_tail], toRead);
locker.relock();
if (bytesRead
Подробнее здесь: [url]https://stackoverflow.com/questions/79900606/make-seeking-inside-sub-circular-buffer-lock-free-in-producer-consumer[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия