Я строю A c ++ grpc client , используя API обратного вызова . Клиент отправляет много асинхронных RPC, ожидает, что все они завершат, а затем немедленно выходят. 0). < /P>
Добавление короткого сна после последнего RPC помогает, но я хотел бы избежать произвольных задержек и чисто и детерминированно отключаться. GRPC по -прежнему выполняет внутреннюю очистку потока, когда все обратные вызовы завершены? /> зависимости < /em> < /p>
grpc версия 1.50.1 < /li>
Googletest < /li>
< /ul>
Пример кода ниже был запущен с помощью -gtest_repeat = 1000 < /ul>. Повторите.#include "helloworld.grpc.pb.h"
#include
#include
#include
#include
#include
#include
#include
#include
namespace {
class PendingRpcCounter
{
public:
void increment()
{
{
std::lock_guard lock{m_mutex};
++m_pendingCalls;
}
m_conditionVariable.notify_all();
}
void decrement()
{
{
std::lock_guard lock{m_mutex};
--m_pendingCalls;
}
m_conditionVariable.notify_all();
}
void waitForZeroPendingCalls()
{
std::unique_lock lock{m_mutex};
m_conditionVariable.wait(lock, [this]() {
return m_pendingCalls == 0;
});
}
private:
std::mutex m_mutex{};
std::condition_variable m_conditionVariable{};
std::size_t m_pendingCalls{0};
};
struct RpcData
{
grpc::ClientContext context{};
helloworld::HelloRequest request{};
helloworld::HelloReply response{};
};
TEST(CallbackApiShutdownTest, BasicTest)
{
auto channel = grpc::CreateChannel("localhost:51000", grpc::InsecureChannelCredentials());
auto stub = helloworld::Greeter::NewStub(channel);
auto pendingRpcCounter = PendingRpcCounter{};
static constexpr std::size_t NUMBER_OF_CALLS = 1000;
for (std::size_t i = 0; i < NUMBER_OF_CALLS; ++i)
{
auto rpcData = std::make_shared();
pendingRpcCounter.increment();
stub->async()->SayHello(
&rpcData->context, &rpcData->request, &rpcData->response, [rpcData, &pendingRpcCounter](grpc::Status) {
pendingRpcCounter.decrement();
}
);
}
pendingRpcCounter.waitForZeroPendingCalls();
// Adding a sleep here seems to help avoid repeated-run issues:
// std::this_thread::sleep_for(std::chrono::seconds(1));
}
} // namespace
< /code>
error & stacktrace < /h2>
E0714 20:48:21.357772481 1162770 ref_counted.h:183] assertion failed: prior > 0
PID 1162689 - core
TID 1162770:
#0 0x0000776e4ade4b2c pthread_kill@@GLIBC_2.34
#1 0x0000776e4ad8b27e raise
#2 0x0000776e4ad6e8ff abort
#3 0x000057106583cf9d grpc_core::RefCount::Unref(grpc_core::DebugLocation const&, char const*)
#4 0x0000571065b2dbca grpc_cq_internal_unref(grpc_completion_queue*, char const*, char const*, int)
#5 0x0000571065b2fc89 cq_next(grpc_completion_queue*, gpr_timespec, void*)
#6 0x0000571065b3005e grpc_completion_queue_next
#7 0x000057106567ac57 grpc::(anonymous namespace)::CallbackAlternativeCQ::Ref()::{lambda(void*)#1}::operator()(void*) const
#8 0x000057106567ad64 grpc::(anonymous namespace)::CallbackAlternativeCQ::Ref()::{lambda(void*)#1}::_FUN(void*)
#9 0x0000571065e3688c grpc_core::(anonymous namespace)::ThreadInternalsPosix::ThreadInternalsPosix(char const*, void (*)(void*), void*, bool*, grpc_core::Thread::Options const&)::{lambda(void*)#1}::operator()(void*) const
#10 0x0000571065e368d9 grpc_core::(anonymous namespace)::ThreadInternalsPosix::ThreadInternalsPosix(char const*, void (*)(void*), void*, bool*, grpc_core::Thread::Options const&)::{lambda(void*)#1}::_FUN(void*)
#11 0x0000776e4ade2aa4 start_thread
#12 0x0000776e4ae6fc3c __clone3
TID 1162764:
#0 0x0000776e4ae6d25d syscall
#1 0x0000571065e8f84d absl::lts_20230802::synchronization_internal::FutexImpl::WaitAbsoluteTimeout(std::atomic*, int, timespec const*)
#2 0x0000571065e8f4bd absl::lts_20230802::synchronization_internal::FutexWaiter::WaitUntil(std::atomic*, int, absl::lts_20230802::synchronization_internal::KernelTimeout)
#3 0x0000571065e8f605 absl::lts_20230802::synchronization_internal::FutexWaiter::Wait(absl::lts_20230802::synchronization_internal::KernelTimeout)
#4 0x0000571065e8f2c0 AbslInternalPerThreadSemWait_lts_20230802
#5 0x0000571065e8d98a absl::lts_20230802::synchronization_internal::PerThreadSem::Wait(absl::lts_20230802::synchronization_internal::KernelTimeout)
#6 0x0000571065e850f3 absl::lts_20230802::Mutex::DecrementSynchSem(absl::lts_20230802::Mutex*, absl::lts_20230802::base_internal::PerThreadSynch*, absl::lts_20230802::synchronization_internal::KernelTimeout)
#7 0x0000571065e8c863 absl::lts_20230802::CondVar::WaitCommon(absl::lts_20230802::Mutex*, absl::lts_20230802::synchronization_internal::KernelTimeout)
#8 0x0000571065e8caa9 absl::lts_20230802::CondVar::WaitWithDeadline(absl::lts_20230802::Mutex*, absl::lts_20230802::Time)
#9 0x0000571065e33139 gpr_cv_wait
#10 0x0000571065acacb3 wait_until(grpc_core::Timestamp)
#11 0x0000571065acae42 timer_main_loop()
#12 0x0000571065acaf6a timer_thread(void*)
#13 0x0000571065e3688c grpc_core::(anonymous namespace)::ThreadInternalsPosix::ThreadInternalsPosix(char const*, void (*)(void*), void*, bool*, grpc_core::Thread::Options const&)::{lambda(void*)#1}::operator()(void*) const
#14 0x0000571065e368d9 grpc_core::(anonymous namespace)::ThreadInternalsPosix::ThreadInternalsPosix(char const*, void (*)(void*), void*, bool*, grpc_core::Thread::Options const&)::{lambda(void*)#1}::_FUN(void*)
#15 0x0000776e4ade2aa4 start_thread
#16 0x0000776e4ae6fc3c __clone3
TID 1162762:
#0 0x0000571065864a2c grpc_core::BitSet::is_set(int) const
#1 0x000057106586a269 _ZNK9grpc_core5TableIJNS_15metadata_detail5ValueINS_17LbCostBinMetadataEvEENS2_INS_17GrpcStatusContextEvEENS2_INS_15LbTokenMetadataEvEENS2_INS_19GrpcTagsBinMetadataEvEENS2_INS_20GrpcTraceBinMetadataEvEENS2_INS_26GrpcServerStatsBinMetadataEvEENS2_INS_30EndpointLoadMetricsBinMetadataEvEENS2_INS_12HostMetadataEvEENS2_INS_19GrpcMessageMetadataEvEENS2_INS_17UserAgentMetadataEvEENS2_INS_21HttpAuthorityMetadataEvEENS2_INS_16HttpPathMetadataEvEENS2_INS_10PeerStringEvEENS2_INS_19GrpcTimeoutMetadataEvEENS2_INS_25GrpcLbClientStatsMetadataEvEENS2_INS_27GrpcRetryPushbackMsMetadataEvEENS2_INS_27GrpcInternalEncodingRequestEvEENS2_INS_20GrpcEncodingMetadataEvEENS2_INS_18HttpStatusMetadataEvEENS2_INS_31GrpcPreviousRpcAttemptsMetadataEvEENS2_INS_18GrpcStatusMetadataEvEENS2_INS_12WaitForReadyEvEENS2_INS_19ContentTypeMetadataEvEENS2_INS_18HttpSchemeMetadataEvEENS2_INS_22GrpcStreamNetworkStateEvEENS2_INS_18HttpMethodMetadataEvEENS2_INS_26GrpcAcceptEncodingMetadataEvEENS2_INS_10TeMetadataEvEEEE3hasILm0EEENSt9enable_ifIXltT_Lm28EEbE4typeEv
#2 0x0000571065932ada _ZN9grpc_core5TableIJNS_15metadata_detail5ValueINS_17LbCostBinMetadataEvEENS2_INS_17GrpcStatusContextEvEENS2_INS_15LbTokenMetadataEvEENS2_INS_19GrpcTagsBinMetadataEvEENS2_INS_20GrpcTraceBinMetadataEvEENS2_INS_26GrpcServerStatsBinMetadataEvEENS2_INS_30EndpointLoadMetricsBinMetadataEvEENS2_INS_12HostMetadataEvEENS2_INS_19GrpcMessageMetadataEvEENS2_INS_17UserAgentMetadataEvEENS2_INS_21HttpAuthorityMetadataEvEENS2_INS_16HttpPathMetadataEvEENS2_INS_10PeerStringEvEENS2_INS_19GrpcTimeoutMetadataEvEENS2_INS_25GrpcLbClientStatsMetadataEvEENS2_INS_27GrpcRetryPushbackMsMetadataEvEENS2_INS_27GrpcInternalEncodingRequestEvEENS2_INS_20GrpcEncodingMetadataEvEENS2_INS_18HttpStatusMetadataEvEENS2_INS_31GrpcPreviousRpcAttemptsMetadataEvEENS2_INS_18GrpcStatusMetadataEvEENS2_INS_12WaitForReadyEvEENS2_INS_19ContentTypeMetadataEvEENS2_INS_18HttpSchemeMetadataEvEENS2_INS_22GrpcStreamNetworkStateEvEENS2_INS_18HttpMethodMetadataEvEENS2_INS_26GrpcAcceptEncodingMetadataEvEENS2_INS_10TeMetadataEvEEEE3getILm0EEEPNS_12table_detail15TypeIndexStructIXT_EJS4_S6_S8_SA_SC_SE_SG_SI_SK_SM_SO_SQ_SS_SU_SW_SY_S10_S12_S14_S16_S18_S1A_S1C_S1E_S1G_S1I_S1K_S1M_EE4TypeEv
#3 0x00005710659319f7 _ZN9grpc_core5TableIJNS_15metadata_detail5ValueINS_17LbCostBinMetadataEvEENS2_INS_17GrpcStatusContextEvEENS2_INS_15LbTokenMetadataEvEENS2_INS_19GrpcTagsBinMetadataEvEENS2_INS_20GrpcTraceBinMetadataEvEENS2_INS_26GrpcServerStatsBinMetadataEvEENS2_INS_30EndpointLoadMetricsBinMetadataEvEENS2_INS_12HostMetadataEvEENS2_INS_19GrpcMessageMetadataEvEENS2_INS_17UserAgentMetadataEvEENS2_INS_21HttpAuthorityMetadataEvEENS2_INS_16HttpPathMetadataEvEENS2_INS_10PeerStringEvEENS2_INS_19GrpcTimeoutMetadataEvEENS2_INS_25GrpcLbClientStatsMetadataEvEENS2_INS_27GrpcRetryPushbackMsMetadataEvEENS2_INS_27GrpcInternalEncodingRequestEvEENS2_INS_20GrpcEncodingMetadataEvEENS2_INS_18HttpStatusMetadataEvEENS2_INS_31GrpcPreviousRpcAttemptsMetadataEvEENS2_INS_18GrpcStatusMetadataEvEENS2_INS_12WaitForReadyEvEENS2_INS_19ContentTypeMetadataEvEENS2_INS_18HttpSchemeMetadataEvEENS2_INS_22GrpcStreamNetworkStateEvEENS2_INS_18HttpMethodMetadataEvEENS2_INS_26GrpcAcceptEncodingMetadataEvEENS2_INS_10TeMetadataEvEEEE8DestructIJLm0ELm1ELm2ELm3ELm4ELm5ELm6ELm7ELm8ELm9ELm10ELm11ELm12ELm13ELm14ELm15ELm16ELm17ELm18ELm19ELm20ELm21ELm22ELm23ELm24ELm25ELm26ELm27EEEEvN4absl12lts_2023080216integer_sequenceImJXspT_EEEE
#4 0x000057106593127e grpc_core::Table::~Table()
#5 0x0000571065930636 grpc_core::MetadataMap::~MetadataMap()
#6 0x000057106592f53a grpc_metadata_batch::~grpc_metadata_batch()
#7 0x0000571065b1f49b grpc_core::FilterStackCall::~FilterStackCall()
#8 0x0000571065b17254 grpc_core::FilterStackCall::ReleaseCall(void*, absl::lts_20230802::Status)
#9 0x0000571065aa9c38 exec_ctx_run(grpc_closure*)
#10 0x0000571065aa9d9f grpc_core::ExecCtx::Flush()
#11 0x0000571065aaa673 grpc_core::Executor::RunClosures(char const*, grpc_closure_list)
#12 0x0000571065aab0e1 grpc_core::Executor::ThreadMain(void*)
#13 0x0000571065e3688c grpc_core::(anonymous namespace)::ThreadInternalsPosix::ThreadInternalsPosix(char const*, void (*)(void*), void*, bool*, grpc_core::Thread::Options const&)::{lambda(void*)#1}::operator()(void*) const
#14 0x0000571065e368d9 grpc_core::(anonymous namespace)::ThreadInternalsPosix::ThreadInternalsPosix(char const*, void (*)(void*), void*, bool*, grpc_core::Thread::Options const&)::{lambda(void*)#1}::_FUN(void*)
#15 0x0000776e4ade2aa4 start_thread
#16 0x0000776e4ae6fc3c __clone3
TID 1162763:
#0 0x0000776e4ae6d25d syscall
#1 0x0000571065e8f84d absl::lts_20230802::synchronization_internal::FutexImpl::WaitAbsoluteTimeout(std::atomic*, int, timespec const*)
#2 0x0000571065e8f7e0 absl::lts_20230802::synchronization_internal::FutexImpl::Wait(std::atomic*, int)
#3 0x0000571065e8f43e absl::lts_20230802::synchronization_internal::FutexWaiter::WaitUntil(std::atomic*, int, absl::lts_20230802::synchronization_internal::KernelTimeout)
#4 0x0000571065e8f605 absl::lts_20230802::synchronization_internal::FutexWaiter::Wait(absl::lts_20230802::synchronization_internal::KernelTimeout)
#5 0x0000571065e8f2c0 AbslInternalPerThreadSemWait_lts_20230802
#6 0x0000571065e8d98a absl::lts_20230802::synchronization_internal::PerThreadSem::Wait(absl::lts_20230802::synchronization_internal::KernelTimeout)
#7 0x0000571065e850f3 absl::lts_20230802::Mutex::DecrementSynchSem(absl::lts_20230802::Mutex*, absl::lts_20230802::base_internal::PerThreadSynch*, absl::lts_20230802::synchronization_internal::KernelTimeout)
#8 0x0000571065e8c863 absl::lts_20230802::CondVar::WaitCommon(absl::lts_20230802::Mutex*, absl::lts_20230802::synchronization_internal::KernelTimeout)
#9 0x0000571065e8caef absl::lts_20230802::CondVar::Wait(absl::lts_20230802::Mutex*)
#10 0x0000571065e330bf gpr_cv_wait
#11 0x0000571065aaaf09 grpc_core::Executor::ThreadMain(void*)
#12 0x0000571065e3688c grpc_core::(anonymous namespace)::ThreadInternalsPosix::ThreadInternalsPosix(char const*, void (*)(void*), void*, bool*, grpc_core::Thread::Options const&)::{lambda(void*)#1}::operator()(void*) const
#13 0x0000571065e368d9 grpc_core::(anonymous namespace)::ThreadInternalsPosix::ThreadInternalsPosix(char const*, void (*)(void*), void*, bool*, grpc_core::Thread::Options const&)::{lambda(void*)#1}::_FUN(void*)
#14 0x0000776e4ade2aa4 start_thread
#15 0x0000776e4ae6fc3c __clone3
TID 1162776:
#0 0x0000776e4ade285e start_thread
#1 0x0000776e4ae6fc3c __clone3
TID 1162689:
#0 0x0000776e4ae6fc2d __clone3
#1 0x0000776e4ade2550 create_thread
#2 0x0000776e4ade31a5 pthread_create@GLIBC_2.2.5
#3 0x0000571065e36c0b grpc_core::(anonymous namespace)::ThreadInternalsPosix::ThreadInternalsPosix(char const*, void (*)(void*), void*, bool*, grpc_core::Thread::Options const&)
#4 0x0000571065e36e6d grpc_core::Thread::Thread(char const*, void (*)(void*), void*, bool*, grpc_core::Thread::Options const&)
#5 0x000057106567ba5a decltype (::new ((void*)(0)) grpc_core::Thread((declval)(), (declval)(), (declval)())) std::construct_at(grpc_core::Thread*, char const (&) [15], grpc::(anonymous namespace)::CallbackAlternativeCQ::Ref()::{lambda(void*)#1}&&, grpc::CompletionQueue*&)
#6 0x000057106567b7ba void std::vector::_M_realloc_insert(__gnu_cxx::__normal_iterator, char const (&) [15], grpc::(anonymous namespace)::CallbackAlternativeCQ::Ref()::{lambda(void*)#1}&&, grpc::CompletionQueue*&)
#7 0x000057106567b619 grpc_core::Thread& std::vector::emplace_back(char const (&) [15], grpc::(anonymous namespace)::CallbackAlternativeCQ::Ref()::{lambda(void*)#1}&&, grpc::CompletionQueue*&)
#8 0x000057106567ae53 grpc::(anonymous namespace)::CallbackAlternativeCQ::Ref()
#9 0x000057106567b470 grpc::CompletionQueue::CallbackAlternativeCQ()
#10 0x0000571065678815 grpc::Channel::CallbackCQ()
#11 0x0000571065651591 grpc::internal::CallbackUnaryCallImpl::CallbackUnaryCallImpl(grpc::ChannelInterface*, grpc::internal::RpcMethod const&, grpc::ClientContext*, google::protobuf::MessageLite const*, google::protobuf::MessageLite*, std::function)
#12 0x000057106564e60b void grpc::internal::CallbackUnaryCall(grpc::ChannelInterface*, grpc::internal::RpcMethod const&, grpc::ClientContext*, helloworld::HelloRequest const*, helloworld::HelloReply*, std::function)
#13 0x000057106564417f helloworld::Greeter::Stub::async::SayHello(grpc::ClientContext*, helloworld::HelloRequest const*, helloworld::HelloReply*, std::function)
#14 0x000057106563d62b (anonymous namespace)::CallbackApiShutdownTest_BasicTest_Test::TestBody()
#15 0x00005710663b933f void testing::internal::HandleSehExceptionsInMethodIfSupported(testing::Test*, void (testing::Test::*)(), char const*)
#16 0x00005710663b20dd void testing::internal::HandleExceptionsInMethodIfSupported(testing::Test*, void (testing::Test::*)(), char const*)
#17 0x000057106638be84 testing::Test::Run()
#18 0x000057106638c9b3 testing::TestInfo::Run()
#19 0x000057106638d3c8 testing::TestSuite::Run()
#20 0x000057106639d075 testing::internal::UnitTestImpl::RunAllTests()
#21 0x00005710663ba8f0 bool testing::internal::HandleSehExceptionsInMethodIfSupported(testing::internal::UnitTestImpl*, bool (testing::internal::UnitTestImpl::*)(), char const*)
#22 0x00005710663b3413 bool testing::internal::HandleExceptionsInMethodIfSupported(testing::internal::UnitTestImpl*, bool (testing::internal::UnitTestImpl::*)(), char const*)
#23 0x000057106639b509 testing::UnitTest::Run()
#24 0x0000571066377958 RUN_ALL_TESTS()
#25 0x0000571066377940 main
#26 0x0000776e4ad701ca __libc_start_call_main
#27 0x0000776e4ad7028b __libc_start_main@@GLIBC_2.34
#28 0x000057106563d025 _start
TID 1162775:
#0 0x0000776e4ae6d25d syscall
#1 0x0000571065e8f84d absl::lts_20230802::synchronization_internal::FutexImpl::WaitAbsoluteTimeout(std::atomic*, int, timespec const*)
#2 0x0000571065e8f7e0 absl::lts_20230802::synchronization_internal::FutexImpl::Wait(std::atomic*, int)
#3 0x0000571065e8f43e absl::lts_20230802::synchronization_internal::FutexWaiter::WaitUntil(std::atomic*, int, absl::lts_20230802::synchronization_internal::KernelTimeout)
#4 0x0000571065e8f605 absl::lts_20230802::synchronization_internal::FutexWaiter::Wait(absl::lts_20230802::synchronization_internal::KernelTimeout)
#5 0x0000571065e8f2c0 AbslInternalPerThreadSemWait_lts_20230802
#6 0x0000571065e8d98a absl::lts_20230802::synchronization_internal::PerThreadSem::Wait(absl::lts_20230802::synchronization_internal::KernelTimeout)
#7 0x0000571065e850f3 absl::lts_20230802::Mutex::DecrementSynchSem(absl::lts_20230802::Mutex*, absl::lts_20230802::base_internal::PerThreadSynch*, absl::lts_20230802::synchronization_internal::KernelTimeout)
#8 0x0000571065e8c863 absl::lts_20230802::CondVar::WaitCommon(absl::lts_20230802::Mutex*, absl::lts_20230802::synchronization_internal::KernelTimeout)
#9 0x0000571065e8caef absl::lts_20230802::CondVar::Wait(absl::lts_20230802::Mutex*)
#10 0x0000571065e330bf gpr_cv_wait
#11 0x0000571065e3683d grpc_core::(anonymous namespace)::ThreadInternalsPosix::ThreadInternalsPosix(char const*, void (*)(void*), void*, bool*, grpc_core::Thread::Options const&)::{lambda(void*)#1}::operator()(void*) const
#12 0x0000571065e368d9 grpc_core::(anonymous namespace)::ThreadInternalsPosix::ThreadInternalsPosix(char const*, void (*)(void*), void*, bool*, grpc_core::Thread::Options const&)::{lambda(void*)#1}::_FUN(void*)
#13 0x0000776e4ade2aa4 start_thread
#14 0x0000776e4ae6fc3c __clone3
Подробнее здесь: https://stackoverflow.com/questions/797 ... llback-api
Когда безопасно отказаться от GRPC STUB и Channel в C ++ (API обратного вызова)? ⇐ C++
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Когда безопасно отказаться от GRPC STUB и Channel в C ++ (API обратного вызова)?
Anonymous » » в форуме C++ - 0 Ответы
- 5 Просмотры
-
Последнее сообщение Anonymous
-
-
-
Когда безопасно отказаться от GRPC STUB и Channel в C ++ (API обратного вызова)?
Anonymous » » в форуме C++ - 0 Ответы
- 5 Просмотры
-
Последнее сообщение Anonymous
-
-
-
Когда безопасно отказаться от GRPC STUB и Channel в C ++ (API обратного вызова)?
Anonymous » » в форуме C++ - 0 Ответы
- 6 Просмотры
-
Последнее сообщение Anonymous
-