Я пытаюсь создать клиент для тестирования моего сервера grpc. На моем сервере grpc у меня есть rpc NotificationStreaming(), который передает уведомление (унарный поток). Также у меня есть несколько синхронных методов RPC (унарный-унарный).
В методе main() ниже сначала я устанавливаю соединение для потоковой передачи в отдельном процессе, а затем последовательно выполнять унарные-унарные запросы RPC. После каждого унарного-унарного запроса RPC я получаю уведомления через NOTIFICATION_QUEUE. Потоковая передача остается пустой, пока я не вызову первый унарный-унарный метод create_project(stub), поэтому я ожидаю получить первое уведомление во время этого метода.
Проблема в том, что что если я удалю Sleep(5), моя программа застрянет на этой строке.
Пожалуйста, подскажите мне, как использовать более разумный способ?
def _notification_stream(notification_queue):
with grpc.insecure_channel(settings.GRPC_PORT) as channel:
stub = main_pb2_grpc.MyAPIStub(channel)
try:
response_stream = stub.NotificationStreaming(Empty())
for notification in response_stream:
r = json_format.MessageToDict(notification, preserving_proto_field_name=True,
including_default_value_fields=True)
notification_queue.put(r['message'])
except grpc.RpcError as e:
misc.log(f"ERROR notification stream: {e}")
def notification_streaming(notification_queue):
_process = mp.Process(target=_notification_stream, daemon=True, kwargs={"notification_queue": notification_queue})
_process.start()
return _process.pid
def main():
NOTIFICATION_QUEUE = mp.Queue()
# start listening to the notification stream
notification_streaming(NOTIFICATION_QUEUE)
sleep(5)
with grpc.insecure_channel(settings.GRPC_PORT) as channel:
stub = main_pb2_grpc.MyAPIStub(channel)
create_project(stub)
while not NOTIFICATION_QUEUE.empty():
misc.log(f"\tnotification: {NOTIFICATION_QUEUE.get(block=True)}")
close_project(stub)
while not NOTIFICATION_QUEUE.empty():
misc.log(f"\tnotification: {NOTIFICATION_QUEUE.get(block=True)}")
load_project(stub)
while not NOTIFICATION_QUEUE.empty():
misc.log(f"\tnotification: {NOTIFICATION_QUEUE.get(block=True)}")
save_project(stub)
while not NOTIFICATION_QUEUE.empty():
misc.log(f"\tnotification: {NOTIFICATION_QUEUE.get(block=True)}")
...
if __name__ == '__main__':
main()
Подробнее здесь: https://stackoverflow.com/questions/723 ... -get-stuck
Потоковая передача gRPC в отдельном процессе зависает ⇐ Python
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Двунаправленная потоковая передача gRpc зависает при большом количестве сообщений
Anonymous » » в форуме C# - 0 Ответы
- 15 Просмотры
-
Последнее сообщение Anonymous
-
-
-
Потоковая потоковая передача Polars: Parquet Parquet на основе Shift (-1)
Anonymous » » в форуме Python - 0 Ответы
- 6 Просмотры
-
Последнее сообщение Anonymous
-
-
-
C# grpc-серверная потоковая передача RPC. Клиент получил первое сообщение только со вторым
Anonymous » » в форуме C# - 0 Ответы
- 7 Просмотры
-
Последнее сообщение Anonymous
-