Rxjava+Retrofit+Okhttp 可以说是目前Android app最流行的网络请求套装了。
最近正好比较轻松就研究了下 Okhttp3,顺便写一哈加深下印象。
同步请求
用法
OkHttpClient okHttpClient = new OkHttpClient();
Request request = new Request.Builder()
.url("")
.build();
try {
Response response = okHttpClient.newCall(request).execute();
} catch (IOException e) {
e.printStackTrace();
}
同步源码
显然,从“okHttpClient.newCall(request).execute()”开始往里找。
@Override public Call newCall(Request request) {
return new RealCall(this, request, false /* for web socket */);
}
RealCall 对象 调用了 execute()方法。
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
try {
client.dispatcher().executed(this);
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} finally {
client.dispatcher().finished(this);
}
}
一上来 如果第二次调用该方法,直接报错。
接下来是captureCallStackTrace();
private void captureCallStackTrace() {
Object callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()");
retryAndFollowUpInterceptor.setCallStackTrace(callStackTrace);
}
retryAndFollowUpInterceptor 这个拦截器 set了一个callStackTrace。
看方法名 capture:捕获,call:当前这个类所实现的接口,stack:栈,Trace:追踪。
基本可以猜出来是个捕获信息的方法。
不重要,往后看。
client.dispatcher().executed(this);
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
往 runningSyncCalls里 add 当前 任务。
顺便看下 runningSyncCalls 可以发现 Dispatcher这个类中一共有三个类似的队列。
/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
ok,又到了看名字猜用途的时候了。
- readyAsyncCalls:异步+准备 队列
- runningAsyncCalls:正在运行+异步 队列
- runningSyncCalls: 正在运行+同步 队列
接着刚刚的往下走。
Response result = getResponseWithInterceptorChain();
点进去
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(
interceptors, null, null, null, 0, originalRequest);
return chain.proceed(originalRequest);
}
- 首先,一看内容这么长,就能感觉到这个方法很重要。仔细一看,果然很重要!
- 新建一个interceptors
- 往里add开发人员新建的interceptor
- 继续且持续的往里add 一系列interceptor
- 新建RealInterceptorChain类,将之前添加好的 interceptors 作为参数,执行proceed()方法。
先忽略interceptor往下看。
@Override public Response proceed(Request request) throws IOException {
return proceed(request, streamAllocation, httpCodec, connection);
}
调用另外一个proceed()。
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError();
calls++;
// If we already have a stream, confirm that the incoming request will use it.
if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must retain the same host and port");
}
// If we already have a stream, confirm that this is the only call to chain.proceed().
if (this.httpCodec != null && calls > 1) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must call proceed() exactly once");
}
// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(
interceptors, streamAllocation, httpCodec, connection, index + 1, request);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
// Confirm that the next interceptor made its required call to chain.proceed().
if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
throw new IllegalStateException("network interceptor " + interceptor
+ " must call proceed() exactly once");
}
// Confirm that the intercepted response isn't null.
if (response == null) {
throw new NullPointerException("interceptor " + interceptor + " returned null");
}
return response;
}
一上来看到方法很长,感觉就不简单。
仔细一看呢,由于之前new RealInterceptorChain类的时候传的参数大多不是null 就是0。这个方法里面,大半都直接跳过了。。。
就剩这么点儿了
...
RealInterceptorChain next = new RealInterceptorChain(
interceptors, streamAllocation, httpCodec, connection, index + 1, request);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
...
return response;
第一句有没有很眼熟
RealInterceptorChain next = new RealInterceptorChain(
interceptors, streamAllocation, httpCodec, connection, index + 1, request);
之前RealInterceptorChain类创建的时候
Interceptor.Chain chain = new RealInterceptorChain(
interceptors, null, null, null, 0, originalRequest);
对比就能发现,chain 和 next 两个对象唯一的区别就是第五个属性的值 index + 1 和 0。
下一句
Interceptor interceptor = interceptors.get(index);
由于index =0,interceptor 是队列中的首个,如果之前有自定义的interceptor那取出来就是自定义的,否则就是“retryAndFollowUpInterceptor”。
继续往下
Response response = interceptor.intercept(next);
我们就来看看retryAndFollowUpInterceptor的intercept()方法。
@Override public Response intercept(Chain chain) throws IOException {
@Override public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
streamAllocation = new StreamAllocation(
client.connectionPool(), createAddress(request.url()), callStackTrace);
int followUpCount = 0;
Response priorResponse = null;
while (true) {
if (canceled) {
streamAllocation.release();
throw new IOException("Canceled");
}
Response response = null;
boolean releaseConnection = true;
try {
response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
releaseConnection = false;
} catch (RouteException e) {
// The attempt to connect via a route failed. The request will not have been sent.
if (!recover(e.getLastConnectException(), false, request)) {
throw e.getLastConnectException();
}
releaseConnection = false;
continue;
} catch (IOException e) {
// An attempt to communicate with a server failed. The request may have been sent.
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
if (!recover(e, requestSendStarted, request)) throw e;
releaseConnection = false;
continue;
} finally {
// We're throwing an unchecked exception. Release any resources.
if (releaseConnection) {
streamAllocation.streamFailed(null);
streamAllocation.release();
}
}
// Attach the prior response if it exists. Such responses never have a body.
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build();
}
Request followUp = followUpRequest(response);
if (followUp == null) {
if (!forWebSocket) {
streamAllocation.release();
}
return response;
}
closeQuietly(response.body());
if (++followUpCount > MAX_FOLLOW_UPS) {
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}
if (followUp.body() instanceof UnrepeatableRequestBody) {
streamAllocation.release();
throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
}
if (!sameConnection(response, followUp.url())) {
streamAllocation.release();
streamAllocation = new StreamAllocation(
client.connectionPool(), createAddress(followUp.url()), callStackTrace);
} else if (streamAllocation.codec() != null) {
throw new IllegalStateException("Closing the body of " + response
+ " didn't close its backing stream. Bad interceptor?");
}
request = followUp;
priorResponse = response;
}
}
额,看到这种的一般都是一脸懵逼。。。
言归正传,先不看别的,第19行中
response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
眼熟啊,之前刚从那个内容很长的proceed()方法过来。
没错,调用proceed()方法后,又会调用interceptors队列中下一个interceptor的intercept()方法。如果你去看看下个interceptor的话,会发现又继续调下下个了。
先不去管各个interceptor的作用是什么,每一个interceptor的intercept()中都会return经过他处理后的response。然后又开始一个一个往前返回response,最后得到返回值。
Response result = getResponseWithInterceptorChain();
最后来看看finally
try {
...
} finally {
client.dispatcher().finished(this);
}
finished()方法很简单
void finished(RealCall call) {
finished(runningSyncCalls, call, false);
}
调用另一个finished
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
第三个参数是false
所以promoteCalls()并没有执行。
calls.remove(call):runningSyncCalls中删除当前的call,如果不能就报error
异步请求
用法
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder()
.url("")
.build();
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
}
});
异步源码
前面的部分一样,直接来看RealCall的enqueue()方法。
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
前面的部分跟同步的一样,只看最后一句
client.dispatcher().enqueue(new AsyncCall(responseCallback));
先看Dispatcher类的enqueue()方法
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
runningAsyncCalls:之前已经遇到过了,存放正在运行的异步任务的队列。
runningCallsForHost(call):这个没见过,点进去看下
/** Returns the number of running calls that share a host with {@code call}. */
private int runningCallsForHost(AsyncCall call) {
int result = 0;
for (AsyncCall c : runningAsyncCalls) {
if (c.host().equals(call.host())) result++;
}
return result;
}
从注释也看得出来,遍历runningAsyncCalls,统计和当前call同一个host的call数量。
再回过头来看enque():
首先,maxRequests和maxRequestsPerHost这两个标志量在代码中写死了:
private int maxRequests = 64;
private int maxRequestsPerHost = 5;
如果runningAsyncCalls队列里的数量小于64,且同一个主机的请求小于5
- runningAsyncCalls队列里添加当前任务
- executorService().execute(call)
否则
- readyAsyncCalls.add(call):添加到准备执行的异步队列里
再来看executorService().execute(call);
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
executorService()是个线程池,在线程池中执行这个call。
call对象是AsyncCall类。
final class AsyncCall extends NamedRunnable
又继承了NamedRunnable;
public abstract class NamedRunnable implements Runnable {
protected final String name;
public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}
@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
}
嗯,找到了run()
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
嗯,设置线程名。
execute();
这个很重要,执行了抽象方法execute()。
那么子类一定重写了,再看看子类中的execute()。
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
一眼看到老熟人
Response response = getResponseWithInterceptorChain();
没错,同步请求中那个循环往里调又循环往外return response的方法,取出了最终response。
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
- 如果isCanceled,回调onFailure
- 否则,回调onResponse‘
finally {
client.dispatcher().finished(this);
}
又是老熟人,但是跟同步请求中又有点不一样
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}
最后一个参数是 true
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
跟同步不一样的是
if (promoteCalls) promoteCalls();
promoteCalls为true,执行了promoteCalls()。
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
循环取出readyAsyncCalls中的call加到runningAsyncCalls队列里面去,然后往下execute()。
仔细理一下思路,之前运行的时候一直取的都是runningAsyncCalls中的call。
但是Dispatcher enqueue的时候
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
如果runningAsyncCalls 和 runningCallsForHost满了,会把call加到readyAsyncCalls队列中。
这下清楚了,promoteCalls()的作用是:异步请求中如果runningAsyncCalls中的一个call已经运行完了,会把readyAsyncCalls一个一个加到runningAsyncCalls中,继续在线程池中执行。
总结
- 同步请求概况
- runningSyncCalls.add(call):同步队列中添加当前任务
- getResponseWithInterceptorChain:循环执行,获取interceptors 处理后的response
- client.dispatcher().finished(this);:同步队列中删除当前任务
- 异步请求概况
- 如果runningAsyncCalls.size()<64 且runningCallsForHost(call)<5
- runningAsyncCalls.add(call):正在运行的异步队列中添加当前任务
- executorService().execute(call):
- Response response = getResponseWithInterceptorChain():getResponseWithInterceptorChain:循环执行,获取interceptors 处理后的response
- 看情况回调 onFailure,onResponse
- 最后client.dispatcher().finished(this):
- 正在运行的异步队列中删除当前执行完的任务
- 执行promoteCalls():循环取出readyAsyncCalls中的call加到runningAsyncCalls队列里面去,然后往下execute()。
- 否则readyAsyncCalls.add(call):准备执行的异步队列中添加当前任务
- 如果runningAsyncCalls.size()<64 且runningCallsForHost(call)<5