Клиент GrpcStreamingCall буферизует пакеты и сбрасывает их только по истечении таймаута, не получая событий в режиме реаAndroid

Форум для тех, кто программирует под Android
Ответить
Anonymous
 Клиент GrpcStreamingCall буферизует пакеты и сбрасывает их только по истечении таймаута, не получая событий в режиме реа

Сообщение Anonymous »

Я использую com.squareup.wire.GrpcStreamingCall вместе с OkHttp в мобильном приложении Kotlin для потоковой передачи событий gRPC из серверной службы. Проблема, с которой я столкнулся, заключается в том, что клиент не получает потоковые пакеты в режиме реального времени.
Вместо этого все пакеты буферизуются внутри и доставляются вместе только тогда, когда сервер закрывает соединение или истекает время сеанса (например, через 5 минут). Это противоречит цели потокового соединения.
С серверной стороны проблем нет, API работают нормально в Postman.
Ожидаемое поведение
Пакеты должны доставляться клиенту немедленно, когда сервер отправляет их (в режиме реального времени).
Поток GrpcStreamingCall.response должен отправлять каждое сообщение по мере его поступления на провод.
Фактическое поведение
Пакеты отправляются из потока только через несколько минут — когда сеанс закрывается или истекает время ожидания.
Все буферизованные пакеты поступают вместе одновременно, как если бы они были задержаны или кэшированы.
Grpc client builder

Код: Выделить всё

val okHttpClient = OkHttpClient.Builder()
.connectTimeout(30, TimeUnit.SECONDS)
.readTimeout(0, TimeUnit.SECONDS)
.writeTimeout(30, TimeUnit.SECONDS)
.addInterceptor(customHeadersInterceptor)
.addInterceptor(httpToFileInterceptor)
.addInterceptor(loggingInterceptor)
.build()

GrpcClient.Builder()
.client(okHttpClient)
.baseUrl(baseUrl)
.minMessageToCompress(Long.MAX_VALUE)
.build()
Реализация событий потока

Код: Выделить всё

override suspend fun startStreamingEvents(streamEventsRequest: StreamEventsRequest): Flow = callbackFlow {

val streaming = call.authenticatedStream {  grpcStreamingClient.StreamEvents() }

// Send the request
streaming.request.send(streamEventsRequest)

//Close the request stream!
streaming.request.close()

val job = launch {
try {
for (packet in streaming.response) {
send(packet)
}
} catch (e: Exception) {
Timber.e(e, "Error while receiving packets")
close(e)
}
}

awaitClose {
Timber.d("Stream closed")
job.cancel()
}
}
Поддерживающий код

Код: Выделить всё

suspend fun  authenticatedStream(call: () -> GrpcStreamingCall): StreamingCallResult {
val (request, response) = call().apply {
requestMetadata = headerProvider.getAuthHeaders()
}.executeIn(CoroutineScope(dispatchers.io))
return StreamingCallResult(request, response)
}

data class StreamingCallResult(
val request: SendChannel,
val response: ReceiveChannel
)

Подробнее здесь: https://stackoverflow.com/questions/798 ... out-not-re
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Android»