线程重用——线程池的基本原理

时间:2021-05-16 18:36:06
为简单起见,线程池中只有一个线程:
package com.xs.concurrent;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class ThreadPool {

private final BlockingQueue<Runnable> blockingQueue = new LinkedBlockingQueue<Runnable>();

private final Thread thread;

public ThreadPool() {
thread = ThreadFactory.newThread(blockingQueue);
thread.start();
}

public void execute(Runnable runnable){
try {
blockingQueue.put(runnable);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

public static void main(String[] args) throws Exception {
ThreadPool pool = new ThreadPool();
Runnable runnable = new Runnable() {

@Override
public void run() {
System.out.println("Hello world!");
}
};
for (int i = 0; i < 10; i++) {
pool.execute(runnable);
Thread.sleep(1000);
}
}
}

ThreadFactory:

package com.xs.concurrent;

import java.util.concurrent.BlockingQueue;

public class ThreadFactory {

public static Thread newThread(final BlockingQueue<Runnable> tasks){
Thread t = new Thread(new Runnable() {

@Override
public void run() {
for (;;) {
try {
Runnable task = tasks.take(); // 阻塞方法,直到取到任务为止
task.run();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
});
return t;
}

private ThreadFactory(){}
}

Thread.start()不能重复调用,所以要重用Thread,就不能让Thread执行完一个任务后终止,因此就必须阻塞Thread.run()方法,让该方法不停地从任务队列中获取任务并执行。

下面看看JDK的源码。

ThreadPoolExecutor.execute()方法:

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}
addIfUnderCorePoolSize():
private boolean addIfUnderCorePoolSize(Runnable firstTask) {        Thread t = null;        final ReentrantLock mainLock = this.mainLock;        mainLock.lock();        try {            if (poolSize < corePoolSize && runState == RUNNING)                t = addThread(firstTask);        } finally {            mainLock.unlock();        }        if (t == null)            return false;        t.start();        return true;    }

这个方法启动了一个线程。

addThread():
private Thread addThread(Runnable firstTask) {
Worker w = new Worker(firstTask);
Thread t = threadFactory.newThread(w);
if (t != null) {
w.thread = t;
workers.add(w);
int nt = ++poolSize;
if (nt > largestPoolSize)
largestPoolSize = nt;
}
return t;
}
这个方法将任务交给了Worker。Worker是ThreadPoolExecutor的内部类。

Worker.run():

public void run() {
try {
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null) {
runTask(task); // 执行任务
task = null;
}
} finally {
workerDone(this);
}
}
getTask()方法属于ThreadPoolExecutor类:
Runnable getTask() {        for (;;) {            try {                int state = runState;                if (state > SHUTDOWN)                    return null;                Runnable r;                if (state == SHUTDOWN)  // Help drain queue                    r = workQueue.poll();                else if (poolSize > corePoolSize || allowCoreThreadTimeOut)                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);                else                    r = workQueue.take();                if (r != null)                    return r;                if (workerCanExit()) {                    if (runState >= SHUTDOWN) // Wake up others                        interruptIdleWorkers();                    return null;                }                // Else retry            } catch (InterruptedException ie) {                // On interruption, re-check runState            }        }
该方法不停地从工作队列中获取任务。