provider启动之前之Filter扩展。
filter的概念在很多框架框架都有涉及,实现的手法也是大同小异,链式调用,一环勾着一环,有点类似栈的思想,先进后出。
dubbo filter原生的filter挺多,足够常规使用,自实现后面再讲。现在由于在说provider的启动,所以先讲源码,把启动流程捋顺。
dubbo filter的初始化在于ProtocolFilterWrapper这个包装类里面的buildInvokerChain方法,这里依然使用SPI加载获取适合的filter,在filter的注解类似@Activate(group = , order = -110000),group表明是provider还是consumer,order为链式顺序,越小越先过滤。说回来,buildInvokerChain循环遍历filter并放入相应的invoker,形成单链表,层层调用。调用顺序:EchoFilter->ClassLoaderFilter->GenericFilter->ContextFilter->TraceFilter->TimeoutFilter->MonitorFilter->ExceptionFilter。
简介:
EchoFilter: 回声测试用于检测服务是否可用,回声测试按照正常请求流程执行,能够测试整个调用是否通畅,可用于监控。
ClassLoaderFilter:切换线程上下文使用的classloader,用于SPI。
GenericFilter: 泛接口实现方式主要用于服务器端没有API接口及模型类元的情况,参数及返回值中的所有POJO均用Map表示,通常用于框架集成。
ContextFilter:处理请求的上下文信息。
TraceFilter:QOS服务中telnet相关调用。
TimeoutFilter:超时过滤器。
MonitorFilter: dubbo monitor监控相关。
ExceptionFilter:异常捕捉。
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ().getActivateExtension((), key, group);
if (!()) {
for (int i = () - 1; i >= 0; i--) {
final Filter filter = (i);
final Invoker<T> next = last;
last = new Invoker<T>() {
@Override
public Class<T> getInterface() {
return ();
}
@Override
public URL getUrl() {
return ();
}
@Override
public boolean isAvailable() {
return ();
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result result = (next, invocation);
if (result instanceof AsyncRpcResult) {
AsyncRpcResult asyncResult = (AsyncRpcResult) result;
(r -> (r, invoker, invocation));
return asyncResult;
} else {
return (result, invoker, invocation);
}
}
@Override
public void destroy() {
();
}
@Override
public String toString() {
return ();
}
};
}
}
return last;
}
EchoFilter:回声测试使用,发送任意字符串,返回该字符串,测试某个服务器是否开启。发什么返回什么。
@Override
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
if (().equals(Constants.$ECHO) && () != null && ().length == 1) {
return new RpcResult(()[0]);
}
return (inv);
}
classloaderFilter:类加载器切换
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
ClassLoader ocl = ().getContextClassLoader();
().setContextClassLoader(().getClassLoader());
try {
return (invocation);
} finally {
().setContextClassLoader(ocl);
}
}
GenericFilter:传递方法名方法名直接调用,下例:
Object result = genericService.$invoke("sayHello", new String[] {""}, new Object[] {"world"});
@Override
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
if (().equals(Constants.$INVOKE)
&& () != null
&& ().length == 3
&& !(())) {
String name = ((String) ()[0]).trim();
String[] types = (String[]) ()[1];
Object[] args = (Object[]) ()[2];
try {
Method method = ((), name, types);
Class<?>[] params = ();
if (args == null) {
args = new Object[];
}
String generic = (Constants.GENERIC_KEY);
if ((generic)) {
generic = ().getAttachment(Constants.GENERIC_KEY);
}
if ((generic)
|| (generic)) {
args = (args, params, ());
} else if ((generic)) {
for (int i = 0; i < ; i++) {
if (byte[].class == args[i].getClass()) {
try(UnsafeByteArrayInputStream is = new UnsafeByteArrayInputStream((byte[]) args[i])) {
args[i] = ()
.getExtension(Constants.GENERIC_SERIALIZATION_NATIVE_JAVA)
.deserialize(null, is).readObject();
} catch (Exception e) {
throw new RpcException("Deserialize argument [" + (i + 1) + "] failed.", e);
}
} else {
throw new RpcException(
"Generic serialization [" +
Constants.GENERIC_SERIALIZATION_NATIVE_JAVA +
"] only support message type " +
byte[].class +
" and your message type is " +
args[i].getClass());
}
}
} else if ((generic)) {
for (int i = 0; i < ; i++) {
if (args[i] instanceof JavaBeanDescriptor) {
args[i] = ((JavaBeanDescriptor) args[i]);
} else {
throw new RpcException(
"Generic serialization [" +
Constants.GENERIC_SERIALIZATION_BEAN +
"] only support message type " +
() +
" and your message type is " +
args[i].getClass().getName());
}
}
}
Result result = (new RpcInvocation(method, args, ()));
if (()
&& !(() instanceof GenericException)) {
return new RpcResult(new GenericException(()));
}
if ((generic)) {
try {
UnsafeByteArrayOutputStream os = new UnsafeByteArrayOutputStream(512);
()
.getExtension(Constants.GENERIC_SERIALIZATION_NATIVE_JAVA)
.serialize(null, os).writeObject(());
return new RpcResult(());
} catch (IOException e) {
throw new RpcException("Serialize result failed.", e);
}
} else if ((generic)) {
return new RpcResult(((), ));
} else {
return new RpcResult((()));
}
} catch (NoSuchMethodException e) {
throw new RpcException((), e);
} catch (ClassNotFoundException e) {
throw new RpcException((), e);
}
}
return (inv);
}
ContextFilter:处理请求的上下文环境
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
Map<String, String> attachments = ();
if (attachments != null) {
attachments = new HashMap<>(attachments);
(Constants.PATH_KEY);
(Constants.INTERFACE_KEY);
(Constants.GROUP_KEY);
(Constants.VERSION_KEY);
(Constants.DUBBO_VERSION_KEY);
(Constants.TOKEN_KEY);
(Constants.TIMEOUT_KEY);
// Remove async property to avoid being passed to the following invoke chain.
(Constants.ASYNC_KEY);
(Constants.TAG_KEY);
(Constants.FORCE_USE_TAG);
}
()
.setInvoker(invoker)
.setInvocation(invocation)
// .setAttachments(attachments) // merged from dubbox
.setLocalAddress(().getHost(),
().getPort());
// merged from dubbox
// we may already added some attachments into RpcContext before this filter (. in rest protocol)
if (attachments != null) {
if (().getAttachments() != null) {
().getAttachments().putAll(attachments);
} else {
().setAttachments(attachments);
}
}
if (invocation instanceof RpcInvocation) {
((RpcInvocation) invocation).setInvoker(invoker);
}
try {
return (invocation);
} finally {
// IMPORTANT! For async scenario, we must remove context from current thread, so we always create a new RpcContext for the next invoke for the same thread.
();
();
}
}
TraceFilter:QOS服务中telnet相关调用
1. trace XxxService: 跟踪 1 次服务任意方法的调用情况
2. trace XxxService 10: 跟踪 10 次服务任意方法的调用情况
3. trace XxxService xxxMethod: 跟踪 1 次服务方法的调用情况
4. trace XxxService xxxMethod 10: 跟踪 10 次服务方法的调用情况
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
long start = ();
Result result = (invocation);
long end = ();
if (() > 0) {
String key = ().getName() + "." + ();
Set<Channel> channels = (key);
if (channels == null || ()) {
key = ().getName();
channels = (key);
}
if ((channels)) {
for (Channel channel : new ArrayList<>(channels)) {
if (()) {
try {
int max = 1;
Integer m = (Integer) (TRACE_MAX);
if (m != null) {
max = m;
}
int count = 0;
AtomicInteger c = (AtomicInteger) (TRACE_COUNT);
if (c == null) {
c = new AtomicInteger();
(TRACE_COUNT, c);
}
count = ();
if (count < max) {
String prompt = ().getParameter(Constants.PROMPT_KEY, Constants.DEFAULT_PROMPT);
("\r\n" + ().getRemoteAddress() + " -> "
+ ().getName()
+ "." + ()
+ "(" + (()) + ")" + " -> " + (())
+ "\r\nelapsed: " + (end - start) + " ms."
+ "\r\n\r\n" + prompt);
}
if (count >= max - 1) {
(channel);
}
} catch (Throwable e) {
(channel);
((), e);
}
} else {
(channel);
}
}
}
}
return result;
}
TimeoutFilter:超时过滤器,超时后输出警告日志
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
if (() != null) {
long start = ();
().put(TIMEOUT_FILTER_START_TIME, (start));
} else {
if (invocation instanceof RpcInvocation) {
RpcInvocation invc = (RpcInvocation) invocation;
long start = ();
(TIMEOUT_FILTER_START_TIME, (start));
}
}
return (invocation);
}
MonitorFilter:MonitorFilter向DubboMonitor发送数据
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
if (().hasParameter(Constants.MONITOR_KEY)) {
RpcContext context = (); // provider must fetch context before invoke() gets called
String remoteHost = ();
long start = (); // record start timestamp
getConcurrent(invoker, invocation).incrementAndGet(); // count up
try {
Result result = (invocation); // proceed invocation chain
collect(invoker, invocation, result, remoteHost, start, false);
return result;
} catch (RpcException e) {
collect(invoker, invocation, null, remoteHost, start, true);
throw e;
} finally {
getConcurrent(invoker, invocation).decrementAndGet(); // count down
}
} else {
return (invocation);
}
}
ExceptionFilter:异常捕捉
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
try {
return (invocation);
} catch (RuntimeException e) {
("Got unchecked and undeclared exception which called by " + ().getRemoteHost()
+ ". service: " + ().getName() + ", method: " + ()
+ ", exception: " + ().getName() + ": " + (), e);
throw e;
}
}
filter是dubbo很重要的一环,对于拦截请求验证,处理入参,返回参数统一处理有很大的作用。