Anonymous
C# серверное событие прерывистое исключение IO происходит
Сообщение
Anonymous » 31 янв 2025, 04:03
У меня есть приложение WPF, которое мне требует SSE. Я изучал веб -питания, но это, похоже, довольно сложно организовать, используя CPanel и WHM из -за ограничений порта и т. Д. < /p>
SSE работает и может работать в течение нескольких часов. У меня есть система повторного подключения, которая стреляет, если соединение потеряно. > Моя проблема заключается в том, что я могу оставить систему, работающую в течение нескольких часов, и когда я проверяю журнал, у меня действительно есть куча исключения io - тайм -ауты сервера (затем подключите).
На сервере узлов нет журналов. Есть проблемы. < /p>
Это прерывисто. Он может работать в течение 10 часов без проблем, а затем я получаю отступление каждые несколько минут, просто бегая на один час. < /P>
Код: Выделить всё
public static async Task StartSSEListenerAsync(CancellationToken cancellationToken)
{
string fullUrl = $"{URLSettings.getServerName(APITargets.PHAPPS)}/{APICalls.EventStream.Subscribe}?computerName={Uri.EscapeDataString(computerName)}";
string authToken = URLSettings.tennantAPIKey;
int retryCount = 0;
const int maxRetries = 5;
const int retryDelay = 5000; // 5 seconds
if (!client.DefaultRequestHeaders.Contains("X-Token"))
{
client.DefaultRequestHeaders.Add("X-Token", authToken);
}
//client.Timeout = Timeout.InfiniteTimeSpan;
const int maxSilentTime = 120000; // 2 minutes
while (!cancellationToken.IsCancellationRequested)
{
try
{
StreamLogger.Info("Attempting to connect to SSE...");
using (var response = await client.GetAsync(fullUrl, HttpCompletionOption.ResponseHeadersRead, cancellationToken))
{
response.EnsureSuccessStatusCode();
StreamLogger.Info("SSE Connected.");
Console.WriteLine("SSE Connected.");
retryCount = 0; // Reset retry count on successful connection
_lastEventTime = DateTime.UtcNow;
using (var stream = await response.Content.ReadAsStreamAsync())
using (var reader = new StreamReader(stream))
{
while (!reader.EndOfStream && !cancellationToken.IsCancellationRequested)
{
var line = await reader.ReadLineAsync();
if (!string.IsNullOrEmpty(line) && line.StartsWith("data: "))
{
_lastEventTime = DateTime.UtcNow;
string json = line.Substring(6); // Remove "data: "
var eventData = JsonConvert.DeserializeObject(json);
StreamLogger.Info($"Event Data received: {eventData.EventType}");
Console.WriteLine($"Event Data received: {eventData.EventType}");
try
{
switch (eventData.EventType)
{
case ListenerEvents.QudaDashboardUpdate:
EventReceived_QudaDashboardUpdate?.Invoke(null, eventData.Data);
break;
case ListenerEvents.QudaSettingsModelUpdate:
EventReceived_QudaSettingsModelUpdate?.Invoke(null, eventData.Data);
break;
case ListenerEvents.APIRequestReconnect:
EventReceived_APIRequestReconnect?.Invoke(null, eventData.Data);
break;
case ListenerEvents.HaloPayWarningUpdate:
EventReceived_HaloPayWarningUpdate?.Invoke(null, eventData.Data);
break;
default:
AppLogger.Warning($"Unexpected stream event received: {eventData.EventType}");
break;
}
}
catch (Exception ex)
{
AppLogger.Error($"Error processing event: {ex.Message}\n{ex.StackTrace}");
}
}
else
{
Console.WriteLine(line);
}
}
}
}
StreamLogger.Warning("SSE Stream ended. Attempting to reconnect...");
}
catch (IOException ioEx)
{
retryCount++;
string innerExceptionMessage = ioEx.InnerException?.Message ?? "No inner exception";
StreamLogger.Error($"IOException occurred: {ioEx.Message}\nInner Exception: {innerExceptionMessage}\nStack Trace: {ioEx.StackTrace}");
if (retryCount > maxRetries)
{
StreamLogger.Error("Max retry attempts reached. Stopping SSE Listener.");
break;
}
StreamLogger.Info($"Retrying in {retryDelay / 1000} seconds... (Attempt {retryCount}/{maxRetries})");
await Task.Delay(retryDelay, cancellationToken);
}
catch (HttpRequestException httpEx)
{
retryCount++;
StreamLogger.Error($"HttpRequestException occurred: {httpEx.Message}\n{httpEx.StackTrace}");
if (retryCount > maxRetries)
{
StreamLogger.Error("Max retry attempts reached. Stopping SSE Listener.");
break;
}
StreamLogger.Info($"Retrying in {retryDelay / 1000} seconds... (Attempt {retryCount}/{maxRetries})");
await Task.Delay(retryDelay, cancellationToken);
}
catch (TaskCanceledException taskCanceledEx) when (!cancellationToken.IsCancellationRequested)
{
retryCount++;
StreamLogger.Warning($"TaskCanceledException occurred (likely due to timeout): {taskCanceledEx.Message}");
if (retryCount > maxRetries)
{
StreamLogger.Error("Max retry attempts reached. Stopping SSE Listener.");
break;
}
StreamLogger.Info($"Retrying in {retryDelay / 1000} seconds... (Attempt {retryCount}/{maxRetries})");
await Task.Delay(retryDelay, cancellationToken);
}
catch (Exception ex)
{
retryCount++;
StreamLogger.Error($"Unexpected exception occurred: {ex.Message}\n{ex.StackTrace}");
if (retryCount > maxRetries)
{
StreamLogger.Error("Max retry attempts reached. Stopping SSE Listener.");
break;
}
StreamLogger.Info($"Retrying in {retryDelay / 1000} seconds... (Attempt {retryCount}/{maxRetries})");
await Task.Delay(retryDelay, cancellationToken);
}
}
}
< /code>
nodejs: < /p>
router.get('/Subscribe', verifyJWT, (req, res) => {
// Log all events on req and res
//logEvents(req, 'Request');
//logEvents(res, 'Response');
const chemistID = req.user._id;
const computerName = req.query.computerName;
if (!computerName) {
return res.status(400).send("Missing computer name");
}
if (!globals.clients[chemistID]) {
globals.clients[chemistID] = {};
}
if (globals.clients[chemistID][computerName]) {
console.log(`Replacing existing connection for ChemistID: ${chemistID}, Computer: ${computerName}`);
globals.clients[chemistID][computerName].res.end();
}
globals.clients[chemistID][computerName] = { res };
console.log(`Device Connected: ChemistID ${chemistID}, Computer ${computerName}`);
req.on('aborted', () => {
//console.log(`Connection aborted: ChemistID ${chemistID}, Computer ${computerName}`);
});
req.on('close', () => {
try {
//console.log(`Connection closed: ChemistID ${chemistID}, Computer ${computerName}`);
delete globals.clients[chemistID][computerName];
if (Object.keys(globals.clients[chemistID]).length === 0) {
console.log(`No more clients for ChemistID ${chemistID}. Cleaning up.`);
delete globals.clients[chemistID];
}
} catch (err) {
console.error(`Error during connection close cleanup: ${err.message}`);
}
});
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.flushHeaders();
});
// Attach an event logger to the request object
function logEvents(emitter, emitterName) {
const originalEmit = emitter.emit;
emitter.emit = function (event, ...args) {
console.log(`[${emitterName}] Event: ${event}`);
return originalEmit.call(emitter, event, ...args);
};
}
// Send periodic heartbeats to all clients
setInterval(() => {
Object.keys(globals.clients).forEach(chemistID => {
Object.keys(globals.clients[chemistID]).forEach(clientName => {
const client = globals.clients[chemistID][clientName];
try {
client.res.write(`: heartbeat\n\n`); // Send a comment line as a heartbeat
//console.log("heartbeat sent")
} catch (error) {
console.error(`Failed to send heartbeat to ChemistID: ${chemistID}, Computer: ${clientName}. Removing client.`);
// Remove the stale client
delete globals.clients[chemistID][clientName];
// If no more clients for this chemistID, clean up the chemist entry
if (Object.keys(globals.clients[chemistID]).length === 0) {
delete globals.clients[chemistID];
}
}
});
});
}, 15000); // Every 45 seconds
< /code>
Я бы оценил любую помощь с этим, потому что я не могу найти журнал или что -либо, что указывает на меня в направлении того, почему это происходит. < /p>
Я проверил свои основные журналы C# и просто получил эту информацию: < /p>
[Error] [31/01/2025 03:09:51] [STM] IOException occurred: The read operation failed, see inner exception.
Inner Exception: The operation has timed out.
Stack Trace: at System.Net.Http.HttpClientHandler.WebExceptionWrapperStream.Read(Byte[] buffer, Int32 offset, Int32 count)
at System.Net.Http.DelegatingStream.Read(Byte[] buffer, Int32 offset, Int32 count)
at System.IO.StreamReader.ReadBuffer()
at System.IO.StreamReader.get_EndOfStream()
at PH.Common.SharedUI.Services.StreamService.d__16.MoveNext() in C:\Users...
Я не вижу никаких проблем или регистрации в nodejs и, честно>
Подробнее здесь:
https://stackoverflow.com/questions/794 ... n-occuring
1738285425
Anonymous
У меня есть приложение WPF, которое мне требует SSE. Я изучал веб -питания, но это, похоже, довольно сложно организовать, используя CPanel и WHM из -за ограничений порта и т. Д. < /p> SSE работает и может работать в течение нескольких часов. У меня есть система повторного подключения, которая стреляет, если соединение потеряно. > Моя проблема заключается в том, что я могу оставить систему, работающую в течение нескольких часов, и когда я проверяю журнал, у меня действительно есть куча исключения io - тайм -ауты сервера (затем подключите). На сервере узлов нет журналов. Есть проблемы. < /p> Это прерывисто. Он может работать в течение 10 часов без проблем, а затем я получаю отступление каждые несколько минут, просто бегая на один час. < /P> [code]public static async Task StartSSEListenerAsync(CancellationToken cancellationToken) { string fullUrl = $"{URLSettings.getServerName(APITargets.PHAPPS)}/{APICalls.EventStream.Subscribe}?computerName={Uri.EscapeDataString(computerName)}"; string authToken = URLSettings.tennantAPIKey; int retryCount = 0; const int maxRetries = 5; const int retryDelay = 5000; // 5 seconds if (!client.DefaultRequestHeaders.Contains("X-Token")) { client.DefaultRequestHeaders.Add("X-Token", authToken); } //client.Timeout = Timeout.InfiniteTimeSpan; const int maxSilentTime = 120000; // 2 minutes while (!cancellationToken.IsCancellationRequested) { try { StreamLogger.Info("Attempting to connect to SSE..."); using (var response = await client.GetAsync(fullUrl, HttpCompletionOption.ResponseHeadersRead, cancellationToken)) { response.EnsureSuccessStatusCode(); StreamLogger.Info("SSE Connected."); Console.WriteLine("SSE Connected."); retryCount = 0; // Reset retry count on successful connection _lastEventTime = DateTime.UtcNow; using (var stream = await response.Content.ReadAsStreamAsync()) using (var reader = new StreamReader(stream)) { while (!reader.EndOfStream && !cancellationToken.IsCancellationRequested) { var line = await reader.ReadLineAsync(); if (!string.IsNullOrEmpty(line) && line.StartsWith("data: ")) { _lastEventTime = DateTime.UtcNow; string json = line.Substring(6); // Remove "data: " var eventData = JsonConvert.DeserializeObject(json); StreamLogger.Info($"Event Data received: {eventData.EventType}"); Console.WriteLine($"Event Data received: {eventData.EventType}"); try { switch (eventData.EventType) { case ListenerEvents.QudaDashboardUpdate: EventReceived_QudaDashboardUpdate?.Invoke(null, eventData.Data); break; case ListenerEvents.QudaSettingsModelUpdate: EventReceived_QudaSettingsModelUpdate?.Invoke(null, eventData.Data); break; case ListenerEvents.APIRequestReconnect: EventReceived_APIRequestReconnect?.Invoke(null, eventData.Data); break; case ListenerEvents.HaloPayWarningUpdate: EventReceived_HaloPayWarningUpdate?.Invoke(null, eventData.Data); break; default: AppLogger.Warning($"Unexpected stream event received: {eventData.EventType}"); break; } } catch (Exception ex) { AppLogger.Error($"Error processing event: {ex.Message}\n{ex.StackTrace}"); } } else { Console.WriteLine(line); } } } } StreamLogger.Warning("SSE Stream ended. Attempting to reconnect..."); } catch (IOException ioEx) { retryCount++; string innerExceptionMessage = ioEx.InnerException?.Message ?? "No inner exception"; StreamLogger.Error($"IOException occurred: {ioEx.Message}\nInner Exception: {innerExceptionMessage}\nStack Trace: {ioEx.StackTrace}"); if (retryCount > maxRetries) { StreamLogger.Error("Max retry attempts reached. Stopping SSE Listener."); break; } StreamLogger.Info($"Retrying in {retryDelay / 1000} seconds... (Attempt {retryCount}/{maxRetries})"); await Task.Delay(retryDelay, cancellationToken); } catch (HttpRequestException httpEx) { retryCount++; StreamLogger.Error($"HttpRequestException occurred: {httpEx.Message}\n{httpEx.StackTrace}"); if (retryCount > maxRetries) { StreamLogger.Error("Max retry attempts reached. Stopping SSE Listener."); break; } StreamLogger.Info($"Retrying in {retryDelay / 1000} seconds... (Attempt {retryCount}/{maxRetries})"); await Task.Delay(retryDelay, cancellationToken); } catch (TaskCanceledException taskCanceledEx) when (!cancellationToken.IsCancellationRequested) { retryCount++; StreamLogger.Warning($"TaskCanceledException occurred (likely due to timeout): {taskCanceledEx.Message}"); if (retryCount > maxRetries) { StreamLogger.Error("Max retry attempts reached. Stopping SSE Listener."); break; } StreamLogger.Info($"Retrying in {retryDelay / 1000} seconds... (Attempt {retryCount}/{maxRetries})"); await Task.Delay(retryDelay, cancellationToken); } catch (Exception ex) { retryCount++; StreamLogger.Error($"Unexpected exception occurred: {ex.Message}\n{ex.StackTrace}"); if (retryCount > maxRetries) { StreamLogger.Error("Max retry attempts reached. Stopping SSE Listener."); break; } StreamLogger.Info($"Retrying in {retryDelay / 1000} seconds... (Attempt {retryCount}/{maxRetries})"); await Task.Delay(retryDelay, cancellationToken); } } } < /code> nodejs: < /p> router.get('/Subscribe', verifyJWT, (req, res) => { // Log all events on req and res //logEvents(req, 'Request'); //logEvents(res, 'Response'); const chemistID = req.user._id; const computerName = req.query.computerName; if (!computerName) { return res.status(400).send("Missing computer name"); } if (!globals.clients[chemistID]) { globals.clients[chemistID] = {}; } if (globals.clients[chemistID][computerName]) { console.log(`Replacing existing connection for ChemistID: ${chemistID}, Computer: ${computerName}`); globals.clients[chemistID][computerName].res.end(); } globals.clients[chemistID][computerName] = { res }; console.log(`Device Connected: ChemistID ${chemistID}, Computer ${computerName}`); req.on('aborted', () => { //console.log(`Connection aborted: ChemistID ${chemistID}, Computer ${computerName}`); }); req.on('close', () => { try { //console.log(`Connection closed: ChemistID ${chemistID}, Computer ${computerName}`); delete globals.clients[chemistID][computerName]; if (Object.keys(globals.clients[chemistID]).length === 0) { console.log(`No more clients for ChemistID ${chemistID}. Cleaning up.`); delete globals.clients[chemistID]; } } catch (err) { console.error(`Error during connection close cleanup: ${err.message}`); } }); res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Connection', 'keep-alive'); res.flushHeaders(); }); // Attach an event logger to the request object function logEvents(emitter, emitterName) { const originalEmit = emitter.emit; emitter.emit = function (event, ...args) { console.log(`[${emitterName}] Event: ${event}`); return originalEmit.call(emitter, event, ...args); }; } // Send periodic heartbeats to all clients setInterval(() => { Object.keys(globals.clients).forEach(chemistID => { Object.keys(globals.clients[chemistID]).forEach(clientName => { const client = globals.clients[chemistID][clientName]; try { client.res.write(`: heartbeat\n\n`); // Send a comment line as a heartbeat //console.log("heartbeat sent") } catch (error) { console.error(`Failed to send heartbeat to ChemistID: ${chemistID}, Computer: ${clientName}. Removing client.`); // Remove the stale client delete globals.clients[chemistID][clientName]; // If no more clients for this chemistID, clean up the chemist entry if (Object.keys(globals.clients[chemistID]).length === 0) { delete globals.clients[chemistID]; } } }); }); }, 15000); // Every 45 seconds < /code> Я бы оценил любую помощь с этим, потому что я не могу найти журнал или что -либо, что указывает на меня в направлении того, почему это происходит. < /p> Я проверил свои основные журналы C# и просто получил эту информацию: < /p> [Error] [31/01/2025 03:09:51] [STM] IOException occurred: The read operation failed, see inner exception. Inner Exception: The operation has timed out. Stack Trace: at System.Net.Http.HttpClientHandler.WebExceptionWrapperStream.Read(Byte[] buffer, Int32 offset, Int32 count) at System.Net.Http.DelegatingStream.Read(Byte[] buffer, Int32 offset, Int32 count) at System.IO.StreamReader.ReadBuffer() at System.IO.StreamReader.get_EndOfStream() at PH.Common.SharedUI.Services.StreamService.d__16.MoveNext() in C:\Users... [/code] Я не вижу никаких проблем или регистрации в nodejs и, честно> Подробнее здесь: [url]https://stackoverflow.com/questions/79401582/c-sharp-server-side-event-intermittent-io-exception-occuring[/url]