Как справиться с некорректным отключением на сервере Python grpc при использовании потокового ответа?Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Как справиться с некорректным отключением на сервере Python grpc при использовании потокового ответа?

Сообщение Anonymous »

Я разрабатываю RPC с двунаправленной потоковой передачей и тестирую его с помощью простого клиента. Я обнаружил, что сервер не работает должным образом, когда клиент убит. Для удобства приведу простой пример:

Код: Выделить всё

import demo_pb2, demo_pb2_grpc
import grpc
import time
from concurrent import futures
import traceback

class DemoService(demo_pb2_grpc.DemoServiceServicer):
def DemoRPC(self, request_iterator, context):
try:
for req in request_iterator:
time.sleep(1)
print("receive request")
yield demo_pb2.DemoResponse(resp=req.arg + "_resp")
except grpc.RpcError as e:
print("grpc connection disconnect")
except Exception as e:
print("error: {}, trace: {}".format(e, traceback.format_exc()))
print("finished")

if __name__ == "__main__":
thread_pool = futures.ThreadPoolExecutor(max_workers=10)
service = DemoService()
server = grpc.server(thread_pool)
demo_pb2_grpc.add_DemoServiceServicer_to_server(
service, server
)
server.add_insecure_port("[::]:50001")
server.start()
server.wait_for_termination()
когда клиент завершает работу нормально, я получаю следующий вывод (клиент отправляет 10 запросов):

Код: Выделить всё

receive request
receive request
receive request
receive request
receive request
receive request
receive request
receive request
receive request
receive request
finished
но когда я уничтожил клиент во время его работы, я получил следующий результат:

Код: Выделить всё

receive request
receive request
receive request
receive request
как показано выше, ни завершение, ни сообщение об исключении не печатается. Я отлаживаю сервер и обнаруживаю, что это вызвано оператором yield после отключения, поэтому проблему можно решить, проверив, живо ли соединение, с помощью context.is_active(). Модифицированный пример:

Код: Выделить всё

import demo_pb2, demo_pb2_grpc
import grpc
import time
from concurrent import futures
import traceback

class DemoService(demo_pb2_grpc.DemoServiceServicer):
def DemoRPC(self, request_iterator, context):
try:
for req in request_iterator:
time.sleep(1)
print("receive request")
if context.is_active():
yield demo_pb2.DemoResponse(resp=req.arg + "_resp")
except grpc.RpcError as e:
print("grpc connection disconnect")
except Exception as e:
print("error: {}, trace: {}".format(e, traceback.format_exc()))
print("finished")

if __name__ == "__main__":
thread_pool = futures.ThreadPoolExecutor(max_workers=10)
service = DemoService()
server = grpc.server(thread_pool)
demo_pb2_grpc.add_DemoServiceServicer_to_server(
service, server
)
server.add_insecure_port("[::]:50001")
server.start()
server.wait_for_termination()
Однако риск все равно существует, поскольку следующий код:

Код: Выделить всё

if context.is_active():
yield demo_pb2.DemoResponse(resp=req.arg + "_resp")
не является атомарной операцией. Клиент может быть уничтожен после оператора if, поэтому эта ошибка все равно возникает.
Я обнаружил аналогичную проблему в этом вопросе stackoverflow и, похоже, исправлен в соответствии с этой проблемой GitHub. Но я все еще сталкиваюсь с этой проблемой. Я использую Python 3.10 с grpcio==1.62.0 в Ubuntu 20.04.

Подробнее здесь: https://stackoverflow.com/questions/790 ... using-stre
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»