Apache Mina Filter

时间:2021-05-27 16:58:22

Mina中的过滤器处于IoService与IoHandler之间,用于过滤每一个I/O事件。本文分析Mina中的过滤器是怎么串起来的?

前面提到了IoFilter,FilterChain等接口和类,在分析过滤器链怎么串起来之前,有必要看一下这些接口和类之间的关系。

Apache Mina Filter

如上图所示:

FilterChain是由一个个Entry串起来的,EntryImpl是Entry的实现;

从EntryImpl中可以获取到Filter与NextFilter,NextFilter相当于那根线(指针);

有两个特殊的Entry,里面的Filter分别是HeadFilter和TailFilter,我们添加的Filter都处于这两个Filter之间;

每做完一个Entry,NextFilter会回到FilterChain处理下一个Entry(处理顺序如下图所示)。

Apache Mina Filter

由上图可知:

(1)当消息到来时,触发过滤器链的fireXXX事件(这一步一般在processor里面触发);

(2)从过滤器链获取上头结点Entry,从头结点Entry中取出filter和nextFilter;

(3)Filter处理后,交由nextFilter处理,nextFilter并不是一个真正的Filter,它决定这个过滤器链的走向,在这里它是返回到过滤器链;

(4)过滤器链通过nextFilter指针得到下一个entry,重复执行(2)、(3),直到每个Entry都处理完。

这里抛出两个问题:

1、如何保证新加入的过滤器在HeadFilter与TailFilter之间?

2、nextFilter一定是从左向右的顺序吗?

3、TailFilter调用了IoHandler吗?

对于第一个问题,答案在EntryImpl的构造方法里面

// EntryImpl构造方法
private EntryImpl(EntryImpl prevEntry, EntryImpl nextEntry, String name, IoFilter filter) {
......
this.prevEntry = prevEntry;
this.nextEntry = nextEntry;
this.name = name;
this.filter = filter;
this.nextFilter = new NextFilter() {
......
};
} //FilterChain构造方法
public DefaultIoFilterChain(AbstractIoSession session) {
if (session == null) {
throw new IllegalArgumentException("session");
} this.session = session;
head = new EntryImpl(null, null, "head", new HeadFilter());
tail = new EntryImpl(head, null, "tail", new TailFilter());
head.nextEntry = tail;
}

在FilterChain构造方法里面初始化了两个Entry:

Apache Mina Filter

我们一般调用IoFilterChain的如下方法添加过滤器

void addFirst(String name, IoFilter filter);
void addLast(String name, IoFilter filter);
void addBefore(String baseName, String name, IoFilter filter);
void addAfter(String baseName, String name, IoFilter filter);

这里以addLast为例,先看看addLast方法的实现

public synchronized void addFirst(String name, IoFilter filter) {
checkAddable(name);
register(head, name, filter);
} private void register(EntryImpl prevEntry, String name, IoFilter filter) {
EntryImpl newEntry = new EntryImpl(prevEntry, prevEntry.nextEntry, name, filter);
......
prevEntry.nextEntry.prevEntry = newEntry;
prevEntry.nextEntry = newEntry;
name2entry.put(name, newEntry);
......
}

从代码一目了然,这就是一个简单的链表操作,在这里第一个问题回答完毕。

对于第二个问题答案是否定的。从IoFilter接口可以看出,一共处理以下几种事件:

sessionCreated
sessionOpened
sessionClosed
sessionIdle
exceptionCaught
messageReceived
messageSent
filterClose
filterWrite
前面FilterChain的构造方法中,省略了NextFilter的实现,这里补上

 this.nextFilter = new NextFilter() {
public void sessionCreated(IoSession session) {
Entry nextEntry = EntryImpl.this.nextEntry;
callNextSessionCreated(nextEntry, session);
} public void sessionOpened(IoSession session) {
Entry nextEntry = EntryImpl.this.nextEntry;
callNextSessionOpened(nextEntry, session);
} public void sessionClosed(IoSession session) {
Entry nextEntry = EntryImpl.this.nextEntry;
callNextSessionClosed(nextEntry, session);
} public void sessionIdle(IoSession session, IdleStatus status) {
Entry nextEntry = EntryImpl.this.nextEntry;
callNextSessionIdle(nextEntry, session, status);
} public void exceptionCaught(IoSession session, Throwable cause) {
Entry nextEntry = EntryImpl.this.nextEntry;
callNextExceptionCaught(nextEntry, session, cause);
} public void messageReceived(IoSession session, Object message) {
Entry nextEntry = EntryImpl.this.nextEntry;
callNextMessageReceived(nextEntry, session, message);
} public void messageSent(IoSession session, WriteRequest writeRequest) {
Entry nextEntry = EntryImpl.this.nextEntry;
callNextMessageSent(nextEntry, session, writeRequest);
} public void filterWrite(IoSession session, WriteRequest writeRequest) {
Entry nextEntry = EntryImpl.this.prevEntry;
callPreviousFilterWrite(nextEntry, session, writeRequest);
} public void filterClose(IoSession session) {
Entry nextEntry = EntryImpl.this.prevEntry;
callPreviousFilterClose(nextEntry, session);
}
}

可以看出filterWrite与filterClose是反向的,其他都是正向的。

对于第三个问题,直接看TailFilter的代码即可:

public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
AbstractIoSession s = (AbstractIoSession) session;
if (!(message instanceof IoBuffer)) {
s.increaseReadMessages(System.currentTimeMillis());
} else if (!((IoBuffer) message).hasRemaining()) {
s.increaseReadMessages(System.currentTimeMillis());
} try {
session.getHandler().messageReceived(s, message);
} finally {
if (s.getConfig().isUseReadOperation()) {
s.offerReadFuture(message);
}
}
}

确实调用了handler来处理真正的请求。

总结一下:FilterChain是由Entry组成一个链表,HeadFilter与TailFilter所在Entry分别是链表的首尾,HeadFilter与processor相连,TailFilter与handler相连,添加Filter的操作实际上是普通的链表插入操作。FilterChain可以触发多种事件,每种事件到来时Filter的顺序是正序还是倒序由nextFilter决定。