消息处理代码

时间:2022-01-02 05:32:50

 

消息处理封装的代码:

public class Transceiver
{
public IPAddress MQServerIP { get; set; }
public UInt16 MQServerPort { get; set; }
public String MQUserName { get; set; }
public String MQPassword { get; set; }
public String MQTopic { get; set; }
public Boolean IsOnline { get { return Online; } }
/// <summary>
/// 消息持久时间(秒)
/// </summary>
public Int32 TTL { get; set; }

private IConnectionFactory factory;
private IConnection connection;
private ISession session;
private IMessageProducer producer;
private IMessageConsumer consumer;
private Boolean Online;

public delegate void MessageReceiveHandle(MQMessage massage);
public event MessageReceiveHandle MessageReceived;
public delegate void ConnectionEventHandle(Boolean Online);
public event ConnectionEventHandle ConnectionEvent;
public delegate void ConnectionErrorHandle(Exception error);
public event ConnectionErrorHandle ConnectionError;

public Transceiver(IPAddress ServerIP, UInt16 ServerPort, String UserName, String Password, String Topic, Int32 ttl)
{
MQServerIP = ServerIP;
MQServerPort = ServerPort;
MQUserName = UserName;
MQPassword = Password;
MQTopic = Topic;
TTL = ttl;

producer = null;
factory = null;
connection = null;
session = null;
Online = false;
}

~Transceiver()
{
Close();
}

public Boolean Open()
{
if (IsOnline) { return IsOnline; }
try
{
String URI = String.Format("tcp://{0}:{1}", MQServerIP, MQServerPort);
factory = new ConnectionFactory(URI);
if (MQUserName != "")
{
connection = factory.CreateConnection(MQUserName, MQPassword);
}
else
{
connection = factory.CreateConnection();
}
connection.ConnectionInterruptedListener += connection_ConnectionInterruptedListener;
connection.ConnectionResumedListener += connection_ConnectionResumedListener;
connection.ExceptionListener += connection_ExceptionListener;
connection.Start();
session = connection.CreateSession();
//CreateProducer
producer = session.CreateProducer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(MQTopic));
//CreateConsumer
consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(MQTopic));
//Regist event
consumer.Listener += consumer_Listener;
Online = true;
}
catch
{
Online = false;
}
return Online;
}

void connection_ExceptionListener(Exception exception)
{
ConnectionError(exception);
Close();
Thread thread = new Thread(new ThreadStart(ResumConncet));
thread.IsBackground = true;
thread.Start();
}

private void ResumConncet()
{
while (!IsOnline)
{
String URI = String.Format("tcp://{0}:{1}", MQServerIP, MQServerPort);
factory = new ConnectionFactory(URI);
try
{
if (MQUserName != "")
{
connection = factory.CreateConnection(MQUserName, MQPassword);
}
else
{
connection = factory.CreateConnection();
}
connection.ConnectionInterruptedListener += connection_ConnectionInterruptedListener;
connection.ConnectionResumedListener += connection_ConnectionResumedListener;
connection.ExceptionListener += connection_ExceptionListener;
connection.Start();
Online = true;
ConnectionEvent(Online);
session = connection.CreateSession();
//CreateProducer
producer = session.CreateProducer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(MQTopic));
//CreateConsumer
consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(MQTopic));
//Regist event
consumer.Listener += consumer_Listener;
}
catch { }
}
}

void connection_ConnectionResumedListener()
{
Online = true;
ConnectionEvent(Online);
}

void connection_ConnectionInterruptedListener()
{
Online = false;
ConnectionEvent(Online);
}

void consumer_Listener(IMessage message)
{
IObjectMessage objmsg = (IObjectMessage)message;
MQMessage msg = (MQMessage)objmsg.Body;
MessageReceived(msg);
}

public void Close()
{
if (producer != null)
{
try
{
producer.Close();
producer.Dispose();
}
catch { }
}
if (consumer != null)
{
try
{
consumer.Close();
consumer.Dispose();
}
catch { }
}
if (session != null)
{
try
{
session.Close();
session.Dispose();
}
catch { }
}
if (connection != null)
{
try
{
connection.Stop();
connection.Close();
connection.Dispose();
}
catch { }
}
Online = false;
}

public void SendMessage(MQMessage massage)
{
IObjectMessage msg = producer.CreateObjectMessage(massage);
producer.Send(msg, Apache.NMS.MsgDeliveryMode.Persistent, Apache.NMS.MsgPriority.Normal, TimeSpan.FromSeconds(TTL));
}
}