Однако, просматривая официальные рекомендации gRPC C++, я обнаружил следующее правило:
Реакция должна быть быстрой. Не выполняйте блокировку, длительную работу или работу с тяжелым весом, а также не спите. Это может повлиять на другие RPC в процессе.
Очевидно, что мой текущий проект нарушает эту рекомендацию, поскольку он блокируется внутри метода Work() с условной переменной.
Я пытался найти дополнительную информацию о модели внутренних потоков gRPC, но не смог найти в документации ничего убедительного. В результате я не уверен, каким потокам разрешено вызывать методы gRPC API, поэтому в целях безопасности я вызываю их только из потоков gRPC.
Одна из идей, которые я рассматривал, заключалась в использовании std::async для ожидания новых данных в отдельном потоке и уведомления реактора через grpc::Alarm, но я не смог найти информацию о том, является ли Alarm потокобезопасным в этом контексте.
Мой вопрос: каков безопасный и наиболее эффективный способ ожидания входящих данных в серверном потоковом реакторе с использованием API обратного вызова gRPC C++?
Этот ответ выглядит ненадежным, поскольку противоречит рекомендациям, упомянутым выше.
Ниже приведена упрощенная версия моей текущей реализации:
Код: Выделить всё
class EcatStreamReactor final
: public grpc::ServerWriteReactor, public IBusClient
{
public:
EcatStreamReactor(
grpc::CallbackServerContext *context,
const ecat::ControlMessage *request
)
: _request(request)
, _context(context)
{}
void set_bus_controller(std::weak_ptr bus_controller); // Implementation omitted
/**
* Called from application thread every 10 ms
* Implements IBusClient
*/
void response(const EcatResponse &response) override
{
std::unique_lock lock(_data_mutex);
_data_queue.push(response);
_data_cv.notify_one();
}
// Called once from service handler after reactor construction
void Work()
{
EcatResponse data;
grpc::Status status;
bool has_data = false;
auto deadline = std::chrono::system_clock::now() + std::chrono::milliseconds(100);
{
std::unique_lock lock(_data_mutex);
_data_cv.wait_until(lock, deadline, [this] {
return !_data_queue.empty() || _cancelled.load();
});
if (_cancelled.load()) {
status = grpc::Status::OK;
} else if (!_data_queue.empty()) {
data = _data_queue.front();
_data_queue.pop();
has_data = true;
} else {
status = grpc::Status(grpc::StatusCode::DEADLINE_EXCEEDED,
"No data received from bus controller");
}
}
if (has_data) {
_res = ecat_conv::convert(data);
StartWrite(&_res);
} else {
Finish(status);
}
}
void OnWriteDone(bool ok) override
{
if (ok)
Work();
else
Finish(grpc::Status::CANCELLED);
}
void OnCancel() override; // Implementation omitted
void OnDone() override; // Implementation omitted
private:
ecat::EcatResponse _res;
std::queue _data_queue;
std::mutex _data_mutex;
std::condition_variable_any _data_cv;
std::weak_ptr _bus_controller;
std::atomic _cancelled{false};
grpc::CallbackServerContext *_context;
const ecat::ControlMessage *_request;
};
Подробнее здесь: https://stackoverflow.com/questions/796 ... -using-the
Мобильная версия