package com.xs.concurrent;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class ThreadPool {
private final BlockingQueue<Runnable> blockingQueue = new LinkedBlockingQueue<Runnable>();
private final Thread thread;
public ThreadPool() {
thread = ThreadFactory.newThread(blockingQueue);
thread.start();
}
public void execute(Runnable runnable){
try {
blockingQueue.put(runnable);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) throws Exception {
ThreadPool pool = new ThreadPool();
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("Hello world!");
}
};
for (int i = 0; i < 10; i++) {
pool.execute(runnable);
Thread.sleep(1000);
}
}
}
ThreadFactory:
package com.xs.concurrent;
import java.util.concurrent.BlockingQueue;
public class ThreadFactory {
public static Thread newThread(final BlockingQueue<Runnable> tasks){
Thread t = new Thread(new Runnable() {
@Override
public void run() {
for (;;) {
try {
Runnable task = tasks.take(); // 阻塞方法,直到取到任务为止
task.run();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
});
return t;
}
private ThreadFactory(){}
}
Thread.start()不能重复调用,所以要重用Thread,就不能让Thread执行完一个任务后终止,因此就必须阻塞Thread.run()方法,让该方法不停地从任务队列中获取任务并执行。
下面看看JDK的源码。
ThreadPoolExecutor.execute()方法:
public void execute(Runnable command) {addIfUnderCorePoolSize():
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}
private boolean addIfUnderCorePoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < corePoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } if (t == null) return false; t.start(); return true; }
这个方法启动了一个线程。
addThread():private Thread addThread(Runnable firstTask) {这个方法将任务交给了Worker。Worker是ThreadPoolExecutor的内部类。
Worker w = new Worker(firstTask);
Thread t = threadFactory.newThread(w);
if (t != null) {
w.thread = t;
workers.add(w);
int nt = ++poolSize;
if (nt > largestPoolSize)
largestPoolSize = nt;
}
return t;
}
Worker.run():
public void run() {getTask()方法属于ThreadPoolExecutor类:
try {
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null) {
runTask(task); // 执行任务
task = null;
}
} finally {
workerDone(this);
}
}
Runnable getTask() { for (;;) { try { int state = runState; if (state > SHUTDOWN) return null; Runnable r; if (state == SHUTDOWN) // Help drain queue r = workQueue.poll(); else if (poolSize > corePoolSize || allowCoreThreadTimeOut) r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); else r = workQueue.take(); if (r != null) return r; if (workerCanExit()) { if (runState >= SHUTDOWN) // Wake up others interruptIdleWorkers(); return null; } // Else retry } catch (InterruptedException ie) { // On interruption, re-check runState } }该方法不停地从工作队列中获取任务。