接上篇的MQ配置。利用C#实现MQ消息的收发。源码
1.需要引入的dll是amqmdnet.dll
2.app.config配置
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<appSettings >
<add key="HostName" value ="192.168.1.40"/>
<add key="Channel" value ="CLIENT.QM_ORANGE"/>
<add key ="Port" value ="1418"/>
<add key ="QueueManager" value="QM_APPLE"/>
<add key="Queue" value="Q1"/>
</appSettings>
<connectionStrings>
<add name ="connectionString" connectionString ="Data Source=(local);Initial Catalog=TestDb; Integrated Security=SSPI" />
</connectionStrings>
</configuration>
3.MQ操作类
using System;
using System.Data;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using IBM.WMQ;
using System.Xml;
using System.Configuration;
using System.Windows.Forms; namespace WindowsFormsApplicationMQ
{ class Management
{
string queueName;
MQQueueManager qMgr;
MQMessage mqMsg;
MQQueue queue;
MQPutMessageOptions putOptions; #region 连接队列管理器 public Management()
{ }
string linkStatus;
public string LinkToQueueManager()
{
string QueueName = ConfigurationSettings.AppSettings["Queue"];
queueName = QueueName; Environment.SetEnvironmentVariable("MQCCSID", "");
if (MQEnvironment.properties.Count <= )
{
MQEnvironment.properties.Add(MQC.CCSID_PROPERTY, );
}
MQEnvironment.Port = Convert.ToInt32(ConfigurationSettings.AppSettings["Port"]);
MQEnvironment.Channel = ConfigurationSettings.AppSettings["Channel"];
MQEnvironment.Hostname = ConfigurationSettings.AppSettings["HostName"];
string qmName = ConfigurationSettings.AppSettings["QueueManager"];
try
{
if (qMgr == null || !qMgr.IsConnected)
{
qMgr = new MQQueueManager(qmName);
} linkStatus = "连接队列管理器:" + "成功!";
}
catch (MQException e)
{ linkStatus = "连接队列管理器错误: 结束码:" + e.CompletionCode + " 错误原因代码:" + e.ReasonCode;
}
catch (Exception e)
{ linkStatus = "连接队列管理器错误: 结束码:" + e;
}
return linkStatus;
}
#endregion #region 发送消息 public void SendMsg(string message)
{
int openOptions=MQC.MQOO_OUTPUT | MQC.MQOO_INPUT_SHARED | MQC.MQOO_INQUIRE;
try
{
queue = qMgr.AccessQueue(queueName, openOptions); //尝试打开队列
}
catch(MQException e)
{
MessageBox.Show("打开队列失败:"+e.Message);
}
mqMsg = new MQMessage();
mqMsg.WriteString(message);
putOptions = new MQPutMessageOptions();
try
{
queue.Put(mqMsg, putOptions); //将消息放入消息队列
}
catch (MQException mqe)
{
MessageBox.Show("发送异常终止:"+mqe .Message );
}
finally
{
try
{
qMgr.Disconnect(); }
catch (MQException e)
{ }
}
} #endregion #region 接收消息 public DataSet receiveMsg()
{
int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_INPUT_SHARED | MQC.MQOO_INQUIRE;
try
{
queue = qMgr.AccessQueue(queueName, openOptions); //尝试打开队列
}
catch (MQException e)
{
MessageBox.Show("打开队列失败:" + e.Message);
}
//从队列管理器中获得消息
MQGetMessageOptions mqGetMsgOpts;
mqMsg = new MQMessage();
mqGetMsgOpts = new MQGetMessageOptions();
mqGetMsgOpts.WaitInterval = ;
mqGetMsgOpts.Options |= MQC.MQGMO_WAIT;
try
{
int queryDep = queue.CurrentDepth;
if (queryDep > )
{
queue.Get(mqMsg, mqGetMsgOpts); //获得消息
var ds = new DataSet();
var table = new DataTable("T_School");
table.Columns.Add("ID", typeof(string));
table.Columns.Add("SchoolName", typeof(string));
table.Columns.Add("BuildDate", typeof(string));
table.Columns.Add("Address", typeof(string));
ds.Tables.Add(table);
string message = mqMsg.ReadString(mqMsg.MessageLength);
mqMsg.Format = MQC.MQFMT_XMIT_Q_HEADER;
var reader = new StringReader(message);
ds.ReadXml(reader, XmlReadMode.Fragment);
return ds;
} else
{
return null;
}
}
catch(MQException ex) {
MessageBox.Show("访问队列停止" + ex.InnerException);
return null;
}
finally
{
try
{
qMgr.Disconnect(); }
catch (MQException e)
{ }
}
}
#endregion
}
}