消息队列(MSMQ)实现多服务器应用程序之间消息实时交互
我所介绍的例子是利用微软的消息队列(msmq)实现多个服务器之间消息实时传递。
应用程序:基于dotnet平台采用WinForm+Webservice开发的应用程序。
每个地区都有自己的数据库和Webservice服务器。
Webservice服务器有很多台,这样多个服务器上用户互相交流就成了问题。思前想后采用了msmq,设计思想如下:
例子:A服务器用户user1发送消息给B服务器上的user2
1、首先是数据库结构是一样的使用sql2005同步用户信息表。
2、在每台Webservice服务器上安装msmq。
3、当user1发送消息给user2时,判断user2所在服务器,如果user2和发送者不在同一个服务器就使用msmq传递消息给B服务器。
4、每个Webservice服务器都有一个消息接收服务程序,用来侦测本服务器消息队列里的消息,反序列化消息内容,写入数据库。
5、user2直接读取数据库就可以了。
6、实际上就是通过msmq在多个数据库服务器之间消息传递。
下面是消息接收服务程序的源代码:
Code
using System;
using System.Collections;
using System.ComponentModel;
using System.Data;
using System.Diagnostics;
using System.ServiceProcess;
using System.Threading;
using System.Messaging;
using wzh.Db;//数据库操作类
///<summary>
///名称:消息服务程序
///用途:实时侦测远程机器发送到本机的消息队列的消息,写入本机数据库。
///作者:wzh
///编写日期:2008-9-28
///修改日期:wzh于2008-10-10修改了部分BUG
///</summary>
namespace wzh_MsgService
{
public class MsgService : System.ServiceProcess.ServiceBase
{
/// <summary>
/// 必需的设计器变量。
/// </summary>
private System.ComponentModel.Container components = null;
private bool servicePaused;
private string mPath = null;// = @".\Private$\MsgQueue
public MsgService()
{
// 该调用是 Windows.Forms 组件设计器所必需的。
InitializeComponent();
// TODO: 在 InitComponent 调用后添加任何初始化
}
// 进程的主入口点
static void Main()
{
System.ServiceProcess.ServiceBase[] ServicesToRun;
// 同一进程中可以运行多个用户服务。若要将
//另一个服务添加到此进程,请更改下行
// 以创建另一个服务对象。例如,
//
// ServicesToRun = New System.ServiceProcess.ServiceBase[] {new Service1(), new MySecondUserService()};
//
ServicesToRun = new System.ServiceProcess.ServiceBase[] { new MsgService() };
System.ServiceProcess.ServiceBase.Run(ServicesToRun);
}
/// <summary>
/// 调试用在项目属性里,将输出类型(Output Type)改成Console Application,将Startup Object改成Namespace.ServiceName
/// </summary>
/// <param name="args"></param>
// public static void Main(string[] args)
// {
// MsgService s = new MsgService();
// s.OnStart(args);
// Console.ReadLine();
// s.OnStop();
// }
/// <summary>
/// 设计器支持所需的方法 - 不要使用代码编辑器
/// 修改此方法的内容。
/// </summary>
private void InitializeComponent()
{
components = new System.ComponentModel.Container();
this.ServiceName = "QZCPNET_MsgService";
this.CanPauseAndContinue = true;
this.CanStop = true;
}
/// <summary>
/// 清理所有正在使用的资源。
/// </summary>
protected override void Dispose( bool disposing )
{
if( disposing )
{
if (components != null)
{
components.Dispose();
}
}
base.Dispose( disposing );
}
/// <summary>
/// 设置具体的操作,以便服务可以执行它的工作。
/// </summary>
protected override void OnStart(string[] args)
{
// TODO: 在此处添加代码以启动服务。
servicePaused=false;
mPath = GetXmlData("//msgmsmqpath");
Thread newThread=new Thread(new ThreadStart(RecevieMessage));
newThread.IsBackground=true;
newThread.Start();
}
/// <summary>
/// 接收本地服务器上的MSMQ里的消息
/// </summary>
private void RecevieMessage()
{
if(!servicePaused)
{
if(! MessageQueue.Exists(mPath))
{
MessageQueue.Create(mPath);
MessageQueue mqTemp = new MessageQueue(mPath);
mqTemp.SetPermissions("Everyone", MessageQueueAccessRights.FullControl);
}
MessageQueue MyQueue = new MessageQueue(mPath);
MyQueue .Formatter =new XmlMessageFormatter (new Type []{typeof (DataSet)});
MyQueue.ReceiveCompleted += new ReceiveCompletedEventHandler(MyReceiveCompleted);
MyQueue.BeginReceive();
}
}
private void MyReceiveCompleted(Object source, ReceiveCompletedEventArgs asyncResult)
{
MessageQueue mq = (MessageQueue)source;
try
{
Message ms = mq.EndReceive(asyncResult.AsyncResult);
DataSet ds = (DataSet)ms.Body;
ProcessMsg(ds);
}
catch(Exception ex)
{
if(ex.Source != "System.Messaging")
WriteSysLog(ex.ToString());
}
finally
{
GC.Collect();
mq.BeginReceive();
}
return;
}
/// <summary>
/// 消息处理
/// </summary>
/// <param name="ds"></param>
private void ProcessMsg(DataSet ds)
{
//插入新消息
try
{
clsDBExec clsDb=new clsDBExec(GetXmlData("//conn") ,"消息管理-SaveMsg");
string strSql="";
DataRow dr=ds.Tables[0].Rows[0];
strSql="INSERT INTO SM_MsgessSend ( MsgSId, MsgTitle, MsgSender,MsgSenderJgCode, MsgSenderJg, MsgAderrs,MsgSAderrs ,MsgContent, "+
" MsgAccessName, MsgAccessDes, MsgHz)"+
" VALUES ("+dr["MsgSId"].ToString()+",\'"+ dr["MsgTitle"].ToString()+"\',\'"+ dr["MsgSender"].ToString() +"\',\'"+ dr["MsgSenderJgCode"].ToString() + "\',\'" + dr["MsgSenderJg"].ToString() +"\',\'"+ dr["MsgAderrs"].ToString() +"\',\'"+
dr["MsgSAderrs"].ToString() +"\',\'"+dr["MsgContent"].ToString().Replace("\'","\'\'") +"\',\'"+ dr["MsgAccessName"].ToString().Replace("\'","\'\'") +"\',\'"+ dr["MsgAccessDes"].ToString() +"\',\'"+dr["MsgHz"].ToString() + "\');";
strSql+="SELECT @@IDENTITY ";
string strV=clsDb.getOneVal(strSql);
//发入发件箱
strSql="INSERT INTO SM_MsgessUser(UserId, MsgId,MsgFolder,MsgIsRead ) values(\'"+ dr["MsgSender"].ToString() +"\',"+ strV +",\'发件箱\',2)";
clsDb.execNonQuery(strSql);
//插入到用户
string strAderrs=dr["MsgSAderrs"].ToString();
string [] arrStr=strAderrs.Split(\',\');
strSql="";
foreach(string strUser in arrStr)
{
if (strUser.ToUpper()=="ALL" )
{
strSql="INSERT INTO SM_MsgessUser(UserId, MsgId,MsgFolder ) SELECT UserName,"+ strV +",\'收件箱\' FROM SM_UserId";
break;
}
if (strUser.ToUpper()=="ALLONLINE" )
{
strSql="INSERT INTO SM_MsgessUser(UserId, MsgId,MsgFolder ) SELECT DISTINCT UserId,"+ strV +",\'收件箱\' FROM SM_OnlineUser";
break;
}
if ((strUser.ToUpper()!="ALL" ) && strUser.ToUpper()!="ALLONLINE")
{
strSql+="INSERT INTO SM_MsgessUser(UserId, MsgId ,MsgFolder) SELECT UserName,"+ strV +",\'收件箱\' FROM SM_UserId where UserName=\'"+ strUser +"\';";
}
}
clsDb.execNonQuery(strSql);
}
catch(Exception ex)
{
WriteSysLog(ex.Message);
}
}
/// <summary>
/// 停止此服务。
/// </summary>
protected override void OnStop()
{
// TODO: 在此处添加代码以执行停止服务所需的关闭操作。
servicePaused=true;
}
/// <summary>
/// 获取XML文件内容
/// </summary>
/// <param name="nodepath"></param>
/// <returns></returns>
private string GetXmlData(string nodepath)
{
System.Xml.XmlDocument dom=new System.Xml.XmlDocument();
dom.Load(@"e:\net\service\Serverconfig.xml");
return dom.SelectSingleNode(nodepath).InnerText;
}
/// <summary>
/// 写日志
/// </summary>
/// <param name="strMsg"></param>
private void WriteSysLog(string strMsg)
{
EventLog objLog=new EventLog("Application") ;
objLog.Source ="信息网络管理系统--消息服务";
objLog.WriteEntry(strMsg,EventLogEntryType.Error );
}
}
}
using System;
using System.Collections;
using System.ComponentModel;
using System.Data;
using System.Diagnostics;
using System.ServiceProcess;
using System.Threading;
using System.Messaging;
using wzh.Db;//数据库操作类
///<summary>
///名称:消息服务程序
///用途:实时侦测远程机器发送到本机的消息队列的消息,写入本机数据库。
///作者:wzh
///编写日期:2008-9-28
///修改日期:wzh于2008-10-10修改了部分BUG
///</summary>
namespace wzh_MsgService
{
public class MsgService : System.ServiceProcess.ServiceBase
{
/// <summary>
/// 必需的设计器变量。
/// </summary>
private System.ComponentModel.Container components = null;
private bool servicePaused;
private string mPath = null;// = @".\Private$\MsgQueue
public MsgService()
{
// 该调用是 Windows.Forms 组件设计器所必需的。
InitializeComponent();
// TODO: 在 InitComponent 调用后添加任何初始化
}
// 进程的主入口点
static void Main()
{
System.ServiceProcess.ServiceBase[] ServicesToRun;
// 同一进程中可以运行多个用户服务。若要将
//另一个服务添加到此进程,请更改下行
// 以创建另一个服务对象。例如,
//
// ServicesToRun = New System.ServiceProcess.ServiceBase[] {new Service1(), new MySecondUserService()};
//
ServicesToRun = new System.ServiceProcess.ServiceBase[] { new MsgService() };
System.ServiceProcess.ServiceBase.Run(ServicesToRun);
}
/// <summary>
/// 调试用在项目属性里,将输出类型(Output Type)改成Console Application,将Startup Object改成Namespace.ServiceName
/// </summary>
/// <param name="args"></param>
// public static void Main(string[] args)
// {
// MsgService s = new MsgService();
// s.OnStart(args);
// Console.ReadLine();
// s.OnStop();
// }
/// <summary>
/// 设计器支持所需的方法 - 不要使用代码编辑器
/// 修改此方法的内容。
/// </summary>
private void InitializeComponent()
{
components = new System.ComponentModel.Container();
this.ServiceName = "QZCPNET_MsgService";
this.CanPauseAndContinue = true;
this.CanStop = true;
}
/// <summary>
/// 清理所有正在使用的资源。
/// </summary>
protected override void Dispose( bool disposing )
{
if( disposing )
{
if (components != null)
{
components.Dispose();
}
}
base.Dispose( disposing );
}
/// <summary>
/// 设置具体的操作,以便服务可以执行它的工作。
/// </summary>
protected override void OnStart(string[] args)
{
// TODO: 在此处添加代码以启动服务。
servicePaused=false;
mPath = GetXmlData("//msgmsmqpath");
Thread newThread=new Thread(new ThreadStart(RecevieMessage));
newThread.IsBackground=true;
newThread.Start();
}
/// <summary>
/// 接收本地服务器上的MSMQ里的消息
/// </summary>
private void RecevieMessage()
{
if(!servicePaused)
{
if(! MessageQueue.Exists(mPath))
{
MessageQueue.Create(mPath);
MessageQueue mqTemp = new MessageQueue(mPath);
mqTemp.SetPermissions("Everyone", MessageQueueAccessRights.FullControl);
}
MessageQueue MyQueue = new MessageQueue(mPath);
MyQueue .Formatter =new XmlMessageFormatter (new Type []{typeof (DataSet)});
MyQueue.ReceiveCompleted += new ReceiveCompletedEventHandler(MyReceiveCompleted);
MyQueue.BeginReceive();
}
}
private void MyReceiveCompleted(Object source, ReceiveCompletedEventArgs asyncResult)
{
MessageQueue mq = (MessageQueue)source;
try
{
Message ms = mq.EndReceive(asyncResult.AsyncResult);
DataSet ds = (DataSet)ms.Body;
ProcessMsg(ds);
}
catch(Exception ex)
{
if(ex.Source != "System.Messaging")
WriteSysLog(ex.ToString());
}
finally
{
GC.Collect();
mq.BeginReceive();
}
return;
}
/// <summary>
/// 消息处理
/// </summary>
/// <param name="ds"></param>
private void ProcessMsg(DataSet ds)
{
//插入新消息
try
{
clsDBExec clsDb=new clsDBExec(GetXmlData("//conn") ,"消息管理-SaveMsg");
string strSql="";
DataRow dr=ds.Tables[0].Rows[0];
strSql="INSERT INTO SM_MsgessSend ( MsgSId, MsgTitle, MsgSender,MsgSenderJgCode, MsgSenderJg, MsgAderrs,MsgSAderrs ,MsgContent, "+
" MsgAccessName, MsgAccessDes, MsgHz)"+
" VALUES ("+dr["MsgSId"].ToString()+",\'"+ dr["MsgTitle"].ToString()+"\',\'"+ dr["MsgSender"].ToString() +"\',\'"+ dr["MsgSenderJgCode"].ToString() + "\',\'" + dr["MsgSenderJg"].ToString() +"\',\'"+ dr["MsgAderrs"].ToString() +"\',\'"+
dr["MsgSAderrs"].ToString() +"\',\'"+dr["MsgContent"].ToString().Replace("\'","\'\'") +"\',\'"+ dr["MsgAccessName"].ToString().Replace("\'","\'\'") +"\',\'"+ dr["MsgAccessDes"].ToString() +"\',\'"+dr["MsgHz"].ToString() + "\');";
strSql+="SELECT @@IDENTITY ";
string strV=clsDb.getOneVal(strSql);
//发入发件箱
strSql="INSERT INTO SM_MsgessUser(UserId, MsgId,MsgFolder,MsgIsRead ) values(\'"+ dr["MsgSender"].ToString() +"\',"+ strV +",\'发件箱\',2)";
clsDb.execNonQuery(strSql);
//插入到用户
string strAderrs=dr["MsgSAderrs"].ToString();
string [] arrStr=strAderrs.Split(\',\');
strSql="";
foreach(string strUser in arrStr)
{
if (strUser.ToUpper()=="ALL" )
{
strSql="INSERT INTO SM_MsgessUser(UserId, MsgId,MsgFolder ) SELECT UserName,"+ strV +",\'收件箱\' FROM SM_UserId";
break;
}
if (strUser.ToUpper()=="ALLONLINE" )
{
strSql="INSERT INTO SM_MsgessUser(UserId, MsgId,MsgFolder ) SELECT DISTINCT UserId,"+ strV +",\'收件箱\' FROM SM_OnlineUser";
break;
}
if ((strUser.ToUpper()!="ALL" ) && strUser.ToUpper()!="ALLONLINE")
{
strSql+="INSERT INTO SM_MsgessUser(UserId, MsgId ,MsgFolder) SELECT UserName,"+ strV +",\'收件箱\' FROM SM_UserId where UserName=\'"+ strUser +"\';";
}
}
clsDb.execNonQuery(strSql);
}
catch(Exception ex)
{
WriteSysLog(ex.Message);
}
}
/// <summary>
/// 停止此服务。
/// </summary>
protected override void OnStop()
{
// TODO: 在此处添加代码以执行停止服务所需的关闭操作。
servicePaused=true;
}
/// <summary>
/// 获取XML文件内容
/// </summary>
/// <param name="nodepath"></param>
/// <returns></returns>
private string GetXmlData(string nodepath)
{
System.Xml.XmlDocument dom=new System.Xml.XmlDocument();
dom.Load(@"e:\net\service\Serverconfig.xml");
return dom.SelectSingleNode(nodepath).InnerText;
}
/// <summary>
/// 写日志
/// </summary>
/// <param name="strMsg"></param>
private void WriteSysLog(string strMsg)
{
EventLog objLog=new EventLog("Application") ;
objLog.Source ="信息网络管理系统--消息服务";
objLog.WriteEntry(strMsg,EventLogEntryType.Error );
}
}
}