OKhttp3 源码分析(1)---同步异步请求流程

时间:2022-08-27 13:58:50

Rxjava+Retrofit+Okhttp 可以说是目前Android app最流行的网络请求套装了。
最近正好比较轻松就研究了下 Okhttp3,顺便写一哈加深下印象。

同步请求

用法

OkHttpClient okHttpClient = new OkHttpClient();

Request request = new Request.Builder()
.url("")
.build();

try {
Response response = okHttpClient.newCall(request).execute();
} catch (IOException e) {
e.printStackTrace();
}

同步源码

显然,从“okHttpClient.newCall(request).execute()”开始往里找。

@Override public Call newCall(Request request) {
return new RealCall(this, request, false /* for web socket */);
}

RealCall 对象 调用了 execute()方法。

@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
try {
client.dispatcher().executed(this);
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} finally {
client.dispatcher().finished(this);
}
}

一上来 如果第二次调用该方法,直接报错。
接下来是captureCallStackTrace();

private void captureCallStackTrace() {
Object callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()");
retryAndFollowUpInterceptor.setCallStackTrace(callStackTrace);
}

retryAndFollowUpInterceptor 这个拦截器 set了一个callStackTrace。
看方法名 capture:捕获,call:当前这个类所实现的接口,stack:栈,Trace:追踪。
基本可以猜出来是个捕获信息的方法。
不重要,往后看。

client.dispatcher().executed(this);
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);

}

往 runningSyncCalls里 add 当前 任务。
顺便看下 runningSyncCalls 可以发现 Dispatcher这个类中一共有三个类似的队列。

 /** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

ok,又到了看名字猜用途的时候了。

  • readyAsyncCalls:异步+准备 队列
  • runningAsyncCalls:正在运行+异步 队列
  • runningSyncCalls: 正在运行+同步 队列

接着刚刚的往下走。

Response result = getResponseWithInterceptorChain();

点进去

Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));

Interceptor.Chain chain = new RealInterceptorChain(
interceptors, null, null, null, 0, originalRequest);
return chain.proceed(originalRequest);
}
  1. 首先,一看内容这么长,就能感觉到这个方法很重要。仔细一看,果然很重要!
  2. 新建一个interceptors
  3. 往里add开发人员新建的interceptor
  4. 继续且持续的往里add 一系列interceptor
  5. 新建RealInterceptorChain类,将之前添加好的 interceptors 作为参数,执行proceed()方法。

先忽略interceptor往下看。

@Override public Response proceed(Request request) throws IOException {
return proceed(request, streamAllocation, httpCodec, connection);
}

调用另外一个proceed()。

public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError();

calls++;

// If we already have a stream, confirm that the incoming request will use it.
if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must retain the same host and port");
}

// If we already have a stream, confirm that this is the only call to chain.proceed().
if (this.httpCodec != null && calls > 1) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must call proceed() exactly once");
}

// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(
interceptors, streamAllocation, httpCodec, connection, index + 1, request);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);

// Confirm that the next interceptor made its required call to chain.proceed().
if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
throw new IllegalStateException("network interceptor " + interceptor
+ " must call proceed() exactly once");
}

// Confirm that the intercepted response isn't null.
if (response == null) {
throw new NullPointerException("interceptor " + interceptor + " returned null");
}

return response;
}

一上来看到方法很长,感觉就不简单。
仔细一看呢,由于之前new RealInterceptorChain类的时候传的参数大多不是null 就是0。这个方法里面,大半都直接跳过了。。。
就剩这么点儿了

...
RealInterceptorChain next = new RealInterceptorChain(
interceptors, streamAllocation, httpCodec, connection, index + 1, request);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
...
return response;

第一句有没有很眼熟

RealInterceptorChain next = new RealInterceptorChain(
interceptors, streamAllocation, httpCodec, connection, index + 1, request);

之前RealInterceptorChain类创建的时候

 Interceptor.Chain chain = new RealInterceptorChain(
interceptors, null, null, null, 0, originalRequest);

对比就能发现,chain 和 next 两个对象唯一的区别就是第五个属性的值 index + 1 和 0。
下一句

Interceptor interceptor = interceptors.get(index);

由于index =0,interceptor 是队列中的首个,如果之前有自定义的interceptor那取出来就是自定义的,否则就是“retryAndFollowUpInterceptor”。
继续往下

 Response response = interceptor.intercept(next);

我们就来看看retryAndFollowUpInterceptor的intercept()方法。

@Override public Response intercept(Chain chain) throws IOException {
@Override public Response intercept(Chain chain) throws IOException {
Request request = chain.request();

streamAllocation = new StreamAllocation(
client.connectionPool(), createAddress(request.url()), callStackTrace);

int followUpCount = 0;
Response priorResponse = null;
while (true) {
if (canceled) {
streamAllocation.release();
throw new IOException("Canceled");
}

Response response = null;
boolean releaseConnection = true;
try {
response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
releaseConnection = false;
} catch (RouteException e) {
// The attempt to connect via a route failed. The request will not have been sent.
if (!recover(e.getLastConnectException(), false, request)) {
throw e.getLastConnectException();
}
releaseConnection = false;
continue;
} catch (IOException e) {
// An attempt to communicate with a server failed. The request may have been sent.
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
if (!recover(e, requestSendStarted, request)) throw e;
releaseConnection = false;
continue;
} finally {
// We're throwing an unchecked exception. Release any resources.
if (releaseConnection) {
streamAllocation.streamFailed(null);
streamAllocation.release();
}
}

// Attach the prior response if it exists. Such responses never have a body.
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build();
}

Request followUp = followUpRequest(response);

if (followUp == null) {
if (!forWebSocket) {
streamAllocation.release();
}
return response;
}

closeQuietly(response.body());

if (++followUpCount > MAX_FOLLOW_UPS) {
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}

if (followUp.body() instanceof UnrepeatableRequestBody) {
streamAllocation.release();
throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
}

if (!sameConnection(response, followUp.url())) {
streamAllocation.release();
streamAllocation = new StreamAllocation(
client.connectionPool(), createAddress(followUp.url()), callStackTrace);
} else if (streamAllocation.codec() != null) {
throw new IllegalStateException("Closing the body of " + response
+ " didn't close its backing stream. Bad interceptor?");
}

request = followUp;
priorResponse = response;
}
}

额,看到这种的一般都是一脸懵逼。。。
言归正传,先不看别的,第19行中

response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);

眼熟啊,之前刚从那个内容很长的proceed()方法过来。
没错,调用proceed()方法后,又会调用interceptors队列中下一个interceptor的intercept()方法。如果你去看看下个interceptor的话,会发现又继续调下下个了。

先不去管各个interceptor的作用是什么,每一个interceptor的intercept()中都会return经过他处理后的response。然后又开始一个一个往前返回response,最后得到返回值。

Response result = getResponseWithInterceptorChain();

最后来看看finally

try {
...
} finally {
client.dispatcher().finished(this);
}

finished()方法很简单

void finished(RealCall call) {
finished(runningSyncCalls, call, false);

}

调用另一个finished

private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}

if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}

第三个参数是false
所以promoteCalls()并没有执行。
calls.remove(call):runningSyncCalls中删除当前的call,如果不能就报error

异步请求

用法

OkHttpClient client = new OkHttpClient();

Request request = new Request.Builder()
.url("")
.build();

client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {

}

@Override
public void onResponse(Call call, Response response) throws IOException {

}
});

异步源码

前面的部分一样,直接来看RealCall的enqueue()方法。

@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

前面的部分跟同步的一样,只看最后一句

client.dispatcher().enqueue(new AsyncCall(responseCallback));

先看Dispatcher类的enqueue()方法

synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);

executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}

runningAsyncCalls:之前已经遇到过了,存放正在运行的异步任务的队列。
runningCallsForHost(call):这个没见过,点进去看下

/** Returns the number of running calls that share a host with {@code call}. */
private int runningCallsForHost(AsyncCall call) {
int result = 0;
for (AsyncCall c : runningAsyncCalls) {
if (c.host().equals(call.host())) result++;
}
return result;
}

从注释也看得出来,遍历runningAsyncCalls,统计和当前call同一个host的call数量。
再回过头来看enque():

首先,maxRequests和maxRequestsPerHost这两个标志量在代码中写死了:

private int maxRequests = 64;
private int maxRequestsPerHost = 5;

如果runningAsyncCalls队列里的数量小于64,且同一个主机的请求小于5

  • runningAsyncCalls队列里添加当前任务
  • executorService().execute(call)

否则

  • readyAsyncCalls.add(call):添加到准备执行的异步队列里

再来看executorService().execute(call);

public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}

executorService()是个线程池,在线程池中执行这个call。
call对象是AsyncCall类。

final class AsyncCall extends NamedRunnable 

又继承了NamedRunnable;

public abstract class NamedRunnable implements Runnable {
protected final String name;

public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}

@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}

protected abstract void execute();
}

嗯,找到了run()

 String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);

嗯,设置线程名。

 execute();

这个很重要,执行了抽象方法execute()。
那么子类一定重写了,再看看子类中的execute()。

@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}

一眼看到老熟人

Response response = getResponseWithInterceptorChain();

没错,同步请求中那个循环往里调又循环往外return response的方法,取出了最终response。

if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
  • 如果isCanceled,回调onFailure
  • 否则,回调onResponse‘
finally {
client.dispatcher().finished(this);
}

又是老熟人,但是跟同步请求中又有点不一样

void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);

}

最后一个参数是 true

private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}

if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}

跟同步不一样的是

if (promoteCalls) promoteCalls();

promoteCalls为true,执行了promoteCalls()。

private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();

if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}

if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}

循环取出readyAsyncCalls中的call加到runningAsyncCalls队列里面去,然后往下execute()。

仔细理一下思路,之前运行的时候一直取的都是runningAsyncCalls中的call。
但是Dispatcher enqueue的时候

synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);

executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}

如果runningAsyncCalls 和 runningCallsForHost满了,会把call加到readyAsyncCalls队列中。
这下清楚了,promoteCalls()的作用是:异步请求中如果runningAsyncCalls中的一个call已经运行完了,会把readyAsyncCalls一个一个加到runningAsyncCalls中,继续在线程池中执行。


总结

  • 同步请求概况
    • runningSyncCalls.add(call):同步队列中添加当前任务
    • getResponseWithInterceptorChain:循环执行,获取interceptors 处理后的response
    • client.dispatcher().finished(this);:同步队列中删除当前任务
  • 异步请求概况
    • 如果runningAsyncCalls.size()<64 且runningCallsForHost(call)<5
      • runningAsyncCalls.add(call):正在运行的异步队列中添加当前任务
      • executorService().execute(call):
        • Response response = getResponseWithInterceptorChain():getResponseWithInterceptorChain:循环执行,获取interceptors 处理后的response
        • 看情况回调 onFailure,onResponse
        • 最后client.dispatcher().finished(this):
          • 正在运行的异步队列中删除当前执行完的任务
          • 执行promoteCalls():循环取出readyAsyncCalls中的call加到runningAsyncCalls队列里面去,然后往下execute()。
    • 否则readyAsyncCalls.add(call):准备执行的异步队列中添加当前任务