使用windows消息队列MessageQueue

时间:2024-03-03 17:33:03

Config中appSettings配置:

<--本地消息队列时 value=".\PRIVATE$\MgrApiRequest"/>-->
<add key="RequestQueueName" value="FormatName:Direct=TCP:192.168.100.102\PRIVATE$\MgrApiRequest" />

 

消息队列连接,将order类资料加入消息队列中:

MessageQueue messageOrderQueue = new MessageQueue(ConfigurationManager.AppSettings["RequestQueueName"]);
messageOrderQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });

using (Message message = new Message())
{
message.Body = JsonConvert.SerializeObject(order);
message.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
messageOrderQueue.Send(message);

}

 

监控消息队列,获取消息队列中的数据进行处理:

private CancellationTokenSource cts = new CancellationTokenSource();

internal void Init()

{

string CommissionQueueName = ConfigurationManager.AppSettings["RequestQueueName"];

if (!MessageQueue.Exists(CommissionQueueName))
{
MessageQueue.Create(CommissionQueueName);
}
messageOrderQueue = new MessageQueue(CommissionQueueName);

messageOrderQueue.SetPermissions("Everyone", MessageQueueAccessRights.FullControl, AccessControlEntryType.Allow);
messageOrderQueue.SetPermissions("ANONYMOUS LOGON", MessageQueueAccessRights.FullControl, AccessControlEntryType.Allow);

messageOrderQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });//new XmlMessageFormatter();

messageOrderQueue.MessageReadPropertyFilter = new MessagePropertyFilter() { Id = true, CorrelationId = true, Body = true, Label = true };

messageOrderQueue.ReceiveCompleted += new ReceiveCompletedEventHandler(RequestReceived);
messageOrderQueue.BeginReceive();

Task.Factory.StartNew(() => HandleRequestThread(), TaskCreationOptions.LongRunning);

}

 

/// <summary>
/// 从消息队列接收消息,加入待处理队列RequestQueue中
/// </summary>
/// <param name="source"></param>
/// <param name="args"></param>
private void RequestReceived(object source, ReceiveCompletedEventArgs args)
{
if (!cts.IsCancellationRequested)
{
MessageQueue q = (MessageQueue)source;
//once a message is received, stop receiving
using (var msMessage = q.EndReceive(args.AsyncResult))
{
try
{
//msMessage.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
//do something with the message
string json = msMessage.Body as string;

RequestQueue.Add(json);
}
catch (Exception ex)
{
Logger.LogError(ex.ToString());
}
}

//begin receiving again
q.BeginReceive();
}
}

 

private async void HandleRequestThread()
{
while (!cts.IsCancellationRequested)
{
try
{
var json = RequestQueue.Take(cts.Token);
Logger.LogInfo($"msgContent:{json}");

MT4Order tradeOrder = JsonConvert.DeserializeObject<MT4Order>(json);
......

}
catch (OperationCanceledException)
{
Logger.LogError("HandleRequestThread is shutting down");
}
catch (Exception ex)
{
Logger.LogError(ex.ToString());
}
}
}