Мой сетевой код C#/. Не удается при попытке обрабатывать сообщения с использованием асинхронного процесса.
Объяснение:
У меня есть код .NET, который может отправить из очереди IBM MQ, которая работает нормально. Я использую ibmxmsdotnetntclient версию 9.4.1.
синхронно , я могу получить сообщение из очереди, обработать полезную нагрузку данных, а затем заполнить сообщение , который удаляет его из очереди сообщений, чтобы предотвратить обработку повторяющихся сообщений. < /p>
Код: Выделить всё
internal class Program
{
public static IReceiver Receiver;
static void Main(string[] args)
{
Receiver = new MyReceiver;
Receiver.ReceiveMessagesAsynchronous("MY_QUEUE_NAME");
Application.Run();
}
}
public class MyReceiver
{
private ISession _session;
private IMessageConsumer _consumerAsync;
public void ReceiveMessagesAsynchronous(string queue)
{
IConnection connection;
string endpoint = "";
try
{
connection = this.GetConnection(endpoint);
if (connection == null)
{
throw new Exception("Connection is null. Exiting.");
}
this._session = connection.CreateSession(true, AcknowledgeMode.AutoAcknowledge);
if (this._session == null)
{
throw new Exception("Session is null. Exiting.");
}
IDestination destination = this._session.CreateQueue(queue);
if (destination == null)
{
throw new Exception("Destination is null. Exiting.");
}
this._consumerAsync = this._session.CreateConsumer(destination);
if (this._consumerAsync == null)
{
throw new Exception("ConsumerAsync is null. Exiting.");
}
this._consumerAsync.MessageListener = new MessageListener(OnMessageCallback);
Console.WriteLine($"{ DateTime.Now.ToLongTimeString() } => Listening to { queue } for messages");
connection.Start();
}
catch (XMSException ex)
{
this._logger.LogError($"XMSException caught: { ex }");
Console.WriteLine("XMSException caught: {0}", ex);
if (ex.LinkedException != null)
{
this._logger.LogError($"Stack Trace: { ex.LinkedException.StackTrace }");
Console.WriteLine("Stack Trace:\n {0}", ex.LinkedException.StackTrace);
}
Console.WriteLine("Sample execution FAILED!");
}
finally
{
refreshConnectionTimer = new Timer(HandleTimerCallback, null, 10000, 30000);
}
}
private void OnMessageCallback(IMessage message)
{
try
{
ITextMessage textMessage = (ITextMessage)message;
// Do something funky with the payload from --> textMessage.Text;
this._session.Commit();
}
catch (Exception ex)
{
Console.WriteLine($"Exception caught: {ex}");
this._session.Rollback();
}
}
private IBM.XMS.IConnection GetConnection(string mqEndpoint)
{
XMSFactoryFactory factoryFactory;
IConnectionFactory cf;
factoryFactory = XMSFactoryFactory.GetInstance(XMSC.CT_WMQ);
cf = factoryFactory.CreateConnectionFactory();
//
// >
//
return cf.CreateConnection();
}
}
Код: Выделить всё
private void OnMessageCallback(IMessage message)
{
try
{
ITextMessage textMessage = (ITextMessage)message;
var text = textMessage.Text;
var output = HandleMessage(text).Result;
}
catch (Exception ex)
{
Console.WriteLine($"Exception caught: {ex}");
this._session.Rollback();
}
}
private async Task HandleMessage(string messageText)
{
if (string.IsNullOrEmpty(messageText))
{
return false;
}
try
{
this._session.Commit();
Console.WriteLine("Committed the session.");
return true;
}
catch (Exception ex)
{
session.Rollback();
Console.WriteLine("Rolled back the session.");
return false;
}
}
ibm.xms.alleegalStateException: 'CWSMQ0101E: Синхронный метод вызов
не разрешен, когда сеанс используется асинхронно: Commit. Спецификация
XMS не позволяет использовать сеанс для синхронных методов
при запуске асинхронной доставки сообщений. Создайте отдельный сеанс
, если вы хотите использовать как синхронные методы, так и
асинхронную доставку одновременно. '< /P>
< /blockquote>
Подробнее здесь: https://stackoverflow.com/questions/794 ... sync-await
Мобильная версия