Я использую Zeromq (Netmq, последняя версия) в настройке клиента (Winforms) / Service (WCF). Текущая реализация имеет несколько циклов потока для обработки сообщений и механизма сердцебиения, но она грязная и иногда теряет соединение, вероятно, из -за блокировки потока (не воспроизводимо в тестах). < /P>
Я заменил некоторые петли потоков на Netmqpoller, используя Netmqqueue.receivedEdude, вместо опроса, и добавил netmqmonctory. Это, по-видимому, улучшает стабильность, и очистка ресурсов лучше с использованием операторов и netmqconfig.cleanup (false), предотвращая заблокирование портов за пределы time_wait (2–4 мин). < /P>
Однако это еще не «тестируется» в производстве, поэтому может быть проблемы. < /P>
ниже.public Task EstablishConnection(string endpoint, CancellationToken token)
{
var connectionTask = new TaskCompletionSource();
_logger.Log(LogState.Info, "Initializing request socket.");
Task.Run(() =>
{
try
{
_isShuttingDown = false;
_shutdownSignal = new TaskCompletionSource();
Thread.CurrentThread.Name = this.GetType().Name;
using (_messageQueue = new NetMQQueue())
using (var socket = new RequestSocket(endpoint))
using (_eventPoller = new NetMQPoller { _messageQueue })
using (var eventMonitor = new NetMQMonitor(socket, $"inproc://monitor.client.{Guid.NewGuid()}", SocketEvents.Disconnected | SocketEvents.BindFailed | SocketEvents.AcceptFailed))
{
_messageQueue.ReceiveReady += (s, e) =>
{
ProcessClientRequests(socket);
};
eventMonitor.EventReceived += (s, e) =>
{
string errorMessage = $"RequestSocket encountered an issue: {e.SocketEvent.ToString()}";
_logger.LogException(errorMessage, new Exception(errorMessage));
_eventPoller.Stop();
};
connectionTask.TrySetResult(true);
_isRunning = true;
using (var cancelRegistration = token.Register(() => _eventPoller.Stop()))
{
_eventPoller.Run();
}
lock (_shutdownLock)
{
_isShuttingDown = true;
}
}
}
catch (Exception ex)
{
_logger?.LogException("Client request processing failed", ex, fatal: true);
}
finally
{
_isRunning = false;
_shutdownSignal.TrySetResult(true);
if (!connectionTask.Task.IsCompleted)
connectionTask.SetResult(false);
_logger.Log(LogState.Info, "Request socket shutting down.");
}
}, token);
NetMQConfig.Cleanup(false)
return connectionTask.Task;
}
public void ProcessClientRequests(RequestSocket socket)
{
ProcessQueuedMessages(socket);
LoopCompleted?.Invoke();
}
private void ProcessQueuedMessages(RequestSocket clientSocket)
{
try
{
while (_messageQueue.TryDequeue(out IRequestMessage message, TimeSpan.Zero))
{
try
{
_logger?.Log(LogState.Info, $"Processing queued request: {message.Request.GetType()}, Session ID: {message.SessionId}, Timeout: {message.Timeout}");
var response = clientSocket.SendRequest(message.Request, message.SessionId, message.Timeout);
message.Complete(response);
LastSentMessageTimestamp = DateTime.Now;
Thread.Sleep(10); // Prevents excessive CPU usage
}
catch (Exception ex)
{
_logger?.LogException("Error while sending request.", ex, fatal: true);
message.Fail(ex);
}
}
}
catch (Exception ex)
{
_logger?.LogException("Queued message processing failure", ex, fatal: true);
}
}
< /code>
Вот пример клиентского подписчика, который реализует netmqtimer для проверки подписчиков: < /p>
public Task EstablishConnection(string endpoint, Guid serverIdentifier, Guid sessionIdentifier, CancellationToken cancellationToken, TimeSpan connectionTimeout, ClientSessionInfo previousSession)
{
_endpoint = endpoint;
_activeServerId = serverIdentifier;
_shutdownSignal = new TaskCompletionSource();
var connectionTask = new TaskCompletionSource();
Task.Run(() =>
{
try
{
_isRunning = true;
using (_subscriberSocket = new SubscriberSocket(endpoint))
{
_subscriberSocket.Subscribe(TopicConstants.Heartbeat);
_subscriberSocket.Subscribe(sessionIdentifier.ToString());
var receivedMessage = _subscriberSocket.RecievePubMessage(connectionTimeout);
if (receivedMessage == null)
{
connectionTask.SetResult(false);
return;
}
LastReceivedMessageTimestamp = DateTime.Now;
connectionTask.SetResult(true);
Task.Run(() => HandleCallbacks(cancellationToken), cancellationToken);
RestorePreviousSession(previousSession);
using (_eventPoller = new NetMQPoller { _subscriberSocket })
using (var monitor = new NetMQMonitor(_subscriberSocket, $"inproc://monitor.client.{Guid.NewGuid()}", SocketEvents.Disconnected | SocketEvents.BindFailed | SocketEvents.AcceptFailed))
{
_subscriberSocket.ReceiveReady += (s, e) =>
{
var message = _subscriberSocket.RecievePubMessage(TimeSpan.Zero);
ProcessClientMessage(message);
};
monitor.EventReceived += (s, e) =>
{
string errorMessage = $"SubscriberSocket encountered an issue: {e.SocketEvent.ToString()}";
_logger.LogException(errorMessage, new Exception(errorMessage));
_eventPoller.Stop();
};
monitor.AttachToPoller(_eventPoller);
var timer = new NetMQTimer(100);
timer.Elapsed += (s, e) =>
{
try
{
if (_subscriberSocket != null)
ManageSubscriptionRequests(_subscriberSocket);
}
catch (Exception ex)
{
_logger.LogException("Subscription request handling failed", ex, fatal: false);
}
};
_eventPoller.Add(timer);
using (var registration = cancellationToken.Register(() =>
{
if (_eventPoller != null && !_eventPoller.IsDisposed)
{
_eventPoller.Stop();
}
}))
{
_eventPoller.Run();
}
}
}
}
catch (Exception ex)
{
_logger.LogException("SubscriberSocketService: Connection failure", ex);
}
finally
{
_isRunning = false;
_shutdownSignal.SetResult(true);
connectionTask.TrySetResult(false);
}
}, cancellationToken);
return connectionTask.Task;
}
< /code>
Последний пример показывает Publishersocket of Server с реализацией Heartbeat, возможно, это можно заменить на netmqmonitor? < /p>
public override void Execute(NetMQSocket socket)
{
_lastHeartbeatSent = DateTime.Now;
try
{
_logger?.Log(LogState.Info, "Initializing event poller.");
using (_eventPoller = new NetMQPoller())
{
_logger?.Log(LogState.Info, "Setting up message queue.");
using (_messageQueue = new NetMQQueue())
{
_messageQueue.ReceiveReady += (s, e) => HandleMessageQueue(socket);
_eventPoller.Add(_messageQueue);
var heartbeatTimer = new NetMQTimer(_config.Options.Publisher.HeartbeatInterval);
heartbeatTimer.Elapsed += (s, e) =>
{
if (DateTime.Now - _lastHeartbeatSent >= _config.Options.Publisher.HeartbeatInterval)
{
SendHeartbeat();
}
};
_eventPoller.Add(heartbeatTimer);
try
{
_logger?.Log(LogState.Info, "Starting event poller.");
_eventPoller.Run();
}
finally
{
_shutdownSignal.SetResult(true);
}
}
}
}
catch(Exception ex)
{
_logger?.LogException("Execution error in event loop", ex);
throw;
}
}
< /code>
Это правильный подход? Может ли NetMQMonitor надежно обнаружить утраченные связи или ручное сердцебиение все еще необходимо для стабильности? Обеспечение автоматического повторного соединения является критическим, если происходит разъединение. Я знаю, что запуск обоих сокетов под одним полайром (клиент/сервер), вероятно, лучше, но у меня сейчас нет времени для основных изменений кода.>
Подробнее здесь: https://stackoverflow.com/questions/794 ... n-properly