Использование PipeStream для передачи потока ответов для ведения журнала без потери эффекта потоковой передачи для конечC#

Место общения программистов C#
Ответить Пред. темаСлед. тема
Anonymous
 Использование PipeStream для передачи потока ответов для ведения журнала без потери эффекта потоковой передачи для конеч

Сообщение Anonymous »

У меня есть конечная точка, которая передает List как IAsyncEnumerable в ответе, и поэтому при использовании в браузере ответ загружается с эффектом потоковой передачи по мере записи данных в поток ответа.
Теперь у меня также есть сквозное промежуточное программное обеспечение, которое предназначено для регистрации каждого потока тела запроса и ответа API в BLOB-объекте Azure.
Моя проблема заключается в том, что при включенном промежуточном программном обеспечении Я получаю вывод, но теряю эффект потоковой передачи конечной точки при использовании в пользовательском интерфейсе, но если он отключен, я вижу эффект потоковой передачи (данные отображаются частями, доступными в потоке) в пользовательском интерфейсе браузера когда запрашивается API. .
Ниже приведен код промежуточного программного обеспечения.

Код: Выделить всё

public class LogRequestResponseInBlobMiddleware
{
private const string REQUEST_RESPONSE_BLOB_FILE_PATH = "Blob/req-resp-logs";
public const string LOGGING_KEY = "Identifier_Key";

private readonly RequestDelegate _next;

private readonly IRequestResponseLogEntityBlobData _requestResponseLogEntityBlobData;

public LogLLMRequestResponseInBlobMiddleware(RequestDelegate next, ApiConfigurationBase apiConfigurationBase, IRequestResponseLogEntityBlobData requestResponseLogEntityBlob, IBoldContextPropertiesAccessor boldContextPropertiesAccessor, IBoldLogger boldLogger, IWebHostEnvironment hostingEnv)
{
_next = next;
_requestResponseLogEntityBlobData = requestResponseLogEntityBlob;
}

public async Task InvokeAsync(HttpContext context)
{
//Logging model structure
LLMRequestResponsesLog logRequestResponseBlobModel = new LLMRequestResponsesLog()
{
EndpointName =context?.Request.Path.Value,
LogIdentifierKey=LOGGING_KEY,
RequestStartAt = DateTime.Now,
Environment = _hostingEnv.EnvironmentName
};

//Request
var originalRequestStream = context.Request.Body;

var requestMemoryStream = new MemoryStream();
await context.Request.Body.CopyToAsync(requestMemoryStream);
requestMemoryStream.Seek(0, SeekOrigin.Begin);
context.Request.Body = requestMemoryStream;

//Response
var originalResponseStream = context.Response.Body;
var respMemoryStream = new MemoryStream();
context.Response.Body = respMemoryStream;

//call next middleware in line
await _next(context);

//copy streams back to earlier ones
context.Request.Body = originalRequestStream;
if (respMemoryStream.Length > 0 && originalResponseStream.CanWrite)
{
respMemoryStream.Seek(0, SeekOrigin.Begin);
await respMemoryStream.CopyToAsync(originalResponseStream);
}

if (context.Response.StatusCode != StatusCodes.Status401Unauthorized)
{

// var openAIRequestResponses = context.Items["someKey"] as LLMRequestResponseLogBlobModel;

if (requestMemoryStream.Position != 0)
{
requestMemoryStream.Position = 0;
}
if (respMemoryStream.Position != 0)
{
respMemoryStream.Position = 0;
}

StreamReader streamReader = new StreamReader(requestMemoryStream);
var llmApiRequest = await streamReader.ReadToEndAsync();

streamReader = new StreamReader(respMemoryStream);
var llmApiResponse = await streamReader.ReadToEndAsync();

//preparing the structure in which log has to be written
logRequestResponseBlobModel.APIRequest = llmApiRequest;
logRequestResponseBlobModel.APIResponse = llmApiResponse;
logRequestResponseBlobModel.ResponseStatusCode = context.Response.StatusCode;

try
{
string appendblobName = GetNewAppendBlobNameForCurrentHour();

//logging the structure to Azure Append Blob
await _requestResponseLogEntityBlobData.AddToAppendBlobAsync(appendblobName, logRequestResponseBlobModel.GetStream());
}
catch (Exception ex)
{
if (ex is RequestFailedException requestFailedException)
{
// current blob has reached the maximum allowed block size i.e.  it has been appended more than 50K times
if (requestFailedException.ErrorCode.Contains("BlockCountExceedsLimit"))
{
//log exception
}
else
{
//log exception
}

}
else
{
//log exception
}
}
finally
{
streamReader.Dispose();

// remove the item from http context
context.Items.Remove("someKey");
}
}

//gracefully dispose
requestMemoryStream.Dispose();
respMemoryStream.Dispose();

}
}

При анализе я обнаружил, что в промежуточном программном обеспечении потоки запросов и ответов буферизуются в памяти перед передачей их в контекст, и поэтому, возможно, я теряю эффект потоковой передачи. Кроме того, благодаря этому и этому подходу мне казалось, что я использую PipeStreams, то есть AnonymousPipeServerStream и AnonymousPipeClientStream, как в этом примере IPC, поскольку это позволит байтам, сгенерированным производителем, или ответу конечной точки передаваться для регистрации в blob в фоновом режиме, а ответ в пользовательском интерфейсе отображается как прогрессивный поток.
Итак, я создал объект AnonymousPipeServerStream для копирования потока тела ответа контекста API и попытался установить его в контекст .Response.Body, читая байты из AnonymousPipeClientStream.
Однако во время реализации я захожу в тупик, и мое промежуточное программное обеспечение приводит к сбою веб-API.
Ниже представлена ​​новая реализация

Код: Выделить всё

public class LogRequestResponseInBlobMiddleware
{
private const string REQUEST_RESPONSE_BLOB_FILE_PATH = "Blob/req-resp-logs";
public const string LOGGING_KEY = "Identifier_Key";

private readonly RequestDelegate _next;

private readonly IRequestResponseLogEntityBlobData _requestResponseLogEntityBlobData;

public LogLLMRequestResponseInBlobMiddleware(RequestDelegate next, ApiConfigurationBase apiConfigurationBase, IRequestResponseLogEntityBlobData requestResponseLogEntityBlob, IBoldContextPropertiesAccessor boldContextPropertiesAccessor, IBoldLogger boldLogger, IWebHostEnvironment hostingEnv)
{
_next = next;
_requestResponseLogEntityBlobData = requestResponseLogEntityBlob;
}

public async Task InvokeAsync(HttpContext context)
{

//add request start time to compute endpoint responseTime while saving all metrics - show it over grafana
if (context is not null)
{
context.Items.TryAdd("EndpointRequestStartAt", DateTime.UtcNow);
}

//Logging model structure
LLMRequestResponsesLog logRequestResponseBlobModel = new LLMRequestResponsesLog()
{
EndpointName =context?.Request.Path.Value,
LogIdentifierKey=LOGGING_KEY,
RequestStartAt = DateTime.Now,
Environment = _hostingEnv.EnvironmentName
};

//Request
var originalRequestStream = context.Request.Body;

var requestMemoryStream = new MemoryStream();
await context.Request.Body.CopyToAsync(requestMemoryStream);
requestMemoryStream.Seek(0, SeekOrigin.Begin);
context.Request.Body = requestMemoryStream;

//Response
using var pipeResponseServerStream = new AnonymousPipeServerStream(PipeDirection.Out);
using var pipeResponseClientStream = new AnonymousPipeClientStream(PipeDirection.In, pipeResponseServerStream.GetClientHandleAsString());

var originalResponseStream = context.Response.Body;
context.Response.Body = pipeResponseServerStream;

//call next middleware in line
await _next(context);

//copy streams back to earlier ones
context.Request.Body = originalRequestStream;

// getting struck in some deadlock in below line while copying the response published by pipeResponseServerStream
await pipeResponseClientStream.CopyToAsync(originalResponseStream).ConfigureAwait(false);

// same code as above middleware

//gracefully dispose
requestMemoryStream.Dispose();
//respMemoryStream.Dispose();

}
}
Было бы здорово, если бы кто-нибудь помог мне понять, почему я сталкиваюсь с тупиком при доступе к потоку клиента ответа. Я также не уверен, что использование AnonymousPipeServerStream и AnonymousPipeClirnyStream для такого типа внутрипроцессного взаимодействия является допустимым вариантом или нет.

Подробнее здесь: https://stackoverflow.com/questions/784 ... ng-streami
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «C#»