Как безопасно дождаться новых данных в потоковом реакторе сервера gRPC C++ с помощью API обратного вызова?C++

Программы на C++. Форум разработчиков
Ответить
Anonymous
 Как безопасно дождаться новых данных в потоковом реакторе сервера gRPC C++ с помощью API обратного вызова?

Сообщение Anonymous »

Я реализовал потоковый реактор gRPC-сервера, используя API обратного вызова C++. Вот основная идея: отдельный поток приложения генерирует данные каждые 10 мс и вызывает метод response() в реакторе. Я помещаю данные в очередь, а поток gRPC удаляет их из очереди и отправляет клиенту. Поток gRPC ожидает данных, используя std::condition_variable. На практике это работает надежно.
Однако, просматривая официальные рекомендации gRPC C++, я обнаружил следующее правило:

Реакция должна быть быстрой. Не выполняйте блокировку, длительную работу или работу с тяжелым весом, а также не спите. Это может повлиять на другие RPC в процессе.

Очевидно, что мой текущий проект нарушает эту рекомендацию, поскольку он блокируется внутри метода Work() с условной переменной.
Я пытался найти дополнительную информацию о модели внутренних потоков gRPC, но не смог найти в документации ничего убедительного. В результате я не уверен, каким потокам разрешено вызывать методы gRPC API, поэтому в целях безопасности я вызываю их только из потоков gRPC.
Одна из идей, которые я рассматривал, заключалась в использовании std::async для ожидания новых данных в отдельном потоке и уведомления реактора через grpc::Alarm, но я не смог найти информацию о том, является ли Alarm потокобезопасным в этом контексте.
Мой вопрос: каков безопасный и наиболее эффективный способ ожидания входящих данных в серверном потоковом реакторе с использованием API обратного вызова gRPC C++?
Этот ответ выглядит ненадежным, поскольку противоречит рекомендациям, упомянутым выше.
Ниже приведена упрощенная версия моей текущей реализации:

Код: Выделить всё

class EcatStreamReactor final
: public grpc::ServerWriteReactor, public IBusClient
{
public:
EcatStreamReactor(
grpc::CallbackServerContext *context,
const ecat::ControlMessage *request
)
: _request(request)
, _context(context)
{}

void set_bus_controller(std::weak_ptr bus_controller);   // Implementation omitted

/**
* Called from application thread every 10 ms
* Implements IBusClient
*/
void response(const EcatResponse &response) override
{
std::unique_lock lock(_data_mutex);
_data_queue.push(response);
_data_cv.notify_one();
}

// Called once from service handler after reactor construction
void Work()
{
EcatResponse data;
grpc::Status status;
bool has_data = false;
auto deadline = std::chrono::system_clock::now() + std::chrono::milliseconds(100);
{
std::unique_lock lock(_data_mutex);
_data_cv.wait_until(lock, deadline, [this] {
return !_data_queue.empty() || _cancelled.load();
});

if (_cancelled.load()) {
status = grpc::Status::OK;
} else if (!_data_queue.empty()) {
data = _data_queue.front();
_data_queue.pop();
has_data = true;
} else {
status = grpc::Status(grpc::StatusCode::DEADLINE_EXCEEDED,
"No data received from bus controller");
}
}

if (has_data) {
_res = ecat_conv::convert(data);
StartWrite(&_res);
} else {
Finish(status);
}
}

void OnWriteDone(bool ok) override
{
if (ok)
Work();
else
Finish(grpc::Status::CANCELLED);
}

void OnCancel() override; // Implementation omitted

void OnDone() override;   // Implementation omitted

private:
ecat::EcatResponse _res;
std::queue _data_queue;
std::mutex _data_mutex;
std::condition_variable_any _data_cv;
std::weak_ptr _bus_controller;
std::atomic _cancelled{false};
grpc::CallbackServerContext *_context;
const ecat::ControlMessage *_request;
};
Будем признательны за любые советы или шаблоны для правильной и безопасной реализации этого.


Подробнее здесь: https://stackoverflow.com/questions/796 ... -using-the
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «C++»