ESB定义了消息的收发和收发池,对于各种通讯方式定义了收发API,在收到信息后由eventBus来发布消息
ISender:
public abstract interface ISender { public abstract void send(Object paramObject) throws Exception; public abstract Object sendRequest(Object paramObject, double paramDouble) throws Exception; }
IListener:
public abstract interface IListener { public abstract void connect() throws Exception; public abstract void disconnect() throws Exception; public abstract boolean isconnect() throws Exception; public abstract void destroy() throws Exception; }
SenderPool: 可以用GenericKeyedObjectPool<String, ISender>代替(线程安全的)
public abstract class SenderPool extends BasePooledObjectFactory<ISender> { private String id; private ObjectPool<ISender> pool; id,pool getter, setter... public PooledObject<ISender> wrap(ISender sender) { return new DefaultPooledObject<ISender>(sender); } public ISender borrowClient() throws Exception { return (ISender)getPool().borrowObject(); } public void returnClient(ISender sender) throws Exception { getPool().returnObject(sender); } public void destroy() throws Exception { getPool().clear(); } }
ListenerPool: 可以用GenericKeyedObjectPool<String, IListener>代替pool, method: listAllObjects() 可以用来遍历各个PooledObject,省去了ListenerCf
public abstract class ListenerPool { private String id; private ListenerCf config; private Set<IListener> pool = new CopyOnWriteArraySet<IListener>(); private EventBus eventBus; constructor, getter, setter... public void connect() throws Exception() { for (IListener listener : getPool()) listener.connect(); } public void disconnect() throws Exception { for (IListener listener : getPool()) listener.disconnect(); } public boolean isconnect() throws Exception { for (IListener listener : getPool()) if (!listener.isconnect()) return false; return true; } public void destroy() throws Exception { for (IListener listener : getPool()) listener.destroy(); } }
ESBFactory: map各个SenderPool, ListenerPool
public abstract class ESBFactory { private Map<String, SenderPool> senderMap = new ConcurrentHashMap<String, SenderPool>(); private Map<String, ListenerPool> listenerMap = new ConcurrentHashMap<String, ListenerPool>(); public abstract void initialize(); public void destroy() {对各个pool destroy并清空map} public void disconnect() {对各个pool disconnect} }
ESBComponent: @Startup, @Singleton
@Startup @Singleton public class ESBComponent { public static Set<ESBFactory> esbFactory = new CopyOnWriteArraySet(); public static Map<String, SenderPool> senderMap = new ConcurrentHashMap(); public static Map<String, ListenerPool> listenerMap = new ConcurrentHashMap(); @PostConstruct public void initialize() { 将esbFactory里的所有senderMap和ListenerMap中的所有Pool都整合到ESBComponent的senderMap和listenerMap中 } @Lock(LockType.READ) public void disconnect() { 。。。 } @Lock(LockType.READ) public boolean disconnect(String listener) { ... } public boolean isConnected(String listener) { ... } @PreDestroy public void destroy() throws Exception { ... } static { esbFactory.add(new TibcoFactory()); esbFactory.add(new FtpFactory()); esbFactory.add(new FileFactory()); esbFactory.add(new JmsFactory()); esbFactory.add(new IbmMqFactory()); } }
=================================================================================================
JmsSender: 用javax.jms来实现ISender,send是用来向destination发送textMessage,sendRequest是发送完接受返回数据的。(可以将两个重载设计)
public class JmsSender implements ISender { private String id; private Connection connection; private Session session; private int timeOut; private String destination; constructor with fields, setter, getter... public void send(String destination, Object object) throws Exception { InitialContext initialContext = null; try { initialContext = new InitialContext(); Destination des = (Destination)initialContext.lookup(destination); MessageProducer producer = this.session.createProducer(des); Message message = null; if ((object instanceof String)) { message = this.session.createTextMessage((String)object); } if (message != null) producer.send(message); } catch(JMSException e) { ... } finally { if (initialContext != null) initialContext.close(); } } } public Object sendRequest(String destination, Object object, double timeOut) throws Exception { InitialContext initialContext = null; try { initialContext = new InitialContext(); Queue queue = (Queue) initialContext.lookup(destination); MessageProducer producer = this.session.createProducer(queue); TemporaryQueue replyQueue = this.session.createTemporaryQueue(); MessageConsumer replyConsumer = this.seesion.createConsumer(replyQueue); Message message = null; if ((object instanceof String)) { message = this.session.createTextMessage((String)object); } if (message != null) { message.setJMSReplyTo(replyQueue); producer.send(message); Message replyMessage = replyConsumer.receive((long)timeOut); return replyMessage; } replyConsumer.close(); replyQueue.delete(); } catch (JMSException e) { ... } finally { if (initialContext != null) { initialContext.close(); } } return null; }
JmsSenderPool: extends SenderPool, 因此要实现create method.
public class JmsSenderPool extends SenderPool { private JmsSenderCf config; constructor with fields, setter, getter... public ISender create() throw Exception { InitialContext initialContext = null; try { String destination = this.config.getDestination(); initialContext = new initialContext(); ConnectionFactory cfact = (ConnectionFactory)initialContext.lookup(this.config.getConnectionFactory()); Connection connection = cfact.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); connection.start(); JmsSender sender = new JmsSender(getId(), connection, session, destination, this.config.getTimeOut()); return sender; } finally { if (initialContext != null) initialContext.close(); } } }
JmsListener: 实现IListener, MessageListener. (可以将IListener去掉,Consumer的创建放到JmsListenerPool中)
onMessage Method:
public void onMessage(Message request) { try { JmsMessage message = new JmsMessage(); message.setMessage(request); Destination replyQueue = request.getJMSReplyTo(); if (replyQueue == null) { String replyTo = request.getStringProperty("JMSReplyTo"); if ((replyTo != null) && (replyTo.trim().length() > 0)) { replyQueue = (Destination) this.initialContext.lookup(replyTo); } if (replyQueue == null) replyQueue = this.defaultReplyQueue; } request.setJMSReplyTo(replyQueue); message.setConnection(this.connection); } this.eventBus.post(message); } catch ... }
最关键的是this.eventBus.post(message);即用eventBus发布消息
JmsListenerPool:
JmsFactory: 读取配置文件,并初始化ListenerPool和SenderPool