
Worker是“工人”的意思,worker thread pattern中,工人线程(worker thread)会一次抓一件工作来处理,当没有工作可做时,工人线程会停下来等待心得工作过来。
Worker Thread也叫做background thread,另外,也有人把视点放在管理工人线程的地方,称之为Thread Pool。
最近闲时在看《Java多线程设计模式》,很不错,语言浅显易懂,而且编排也好,很有启发性,现在挑其中一章来写写心得
worker thread是我们平时用的很多的一种多线程模式,只不过我们常常不把它当模式罢了。基本内容是:有一个流水线(channel),流水线一端有客户线程client,另一端有工人线程worker,客户不断把新的任务(request)放入流水线,工人在另一头获得任务,并执行,客户和工人的数量可多可少,就这么简单
这个所谓的pattern初看好像似曾相识,就是一个thread pool嘛, 按通常的做法,request可以实现Runnable接口,把要做的事情放在run方法中,由worker去执行,具体实现时还要注意同步的问题
不过,由此我们可以想到Swing的工作方式。Swing是事件驱动的,它有一个the event-dispatch queue,这里之所以用the,是因为这个队列是唯一的,就和上面说的流水线一样。Swing的各个组件相当于客户,不断把各种事件(键盘或者鼠标事件,等等)塞入event queue中,queue有个专门的线程负责把这些事件送给相应的listener,就实现了最基本的事件驱动模型。如果不采用这种模型,即事件由专门线程处理的话,界面的相应速度就很差了
如果有用过Java做游戏的话,应该都接触过javax.swing.SwingUtilities这个类,里面有个invokeAndWait方法,就是用来让其他线程操作Swing组件的。为什么不能直接操作呢,如上所述,event dispatch queue是唯一的,因此Swing组件在设计时就没有过多考虑多线程的问题,反正由event dispatch queue统一操作,这样可以提高速度(尽管Swing本来就很慢),但是当其他线程要操作Swing组件时,就可能有潜在的不稳定因素,所以才有了invokeAndWait方法,调用此方法的线程会wait直到所需操作已经完成
还没说完,再仔细想想,我们在程序里也常常对那些组件直接操作而非用什么invokeAndWait,这里又有一些细节值得注意。首先是在你调用组件的setVisible等方法之前,你是可以随便改组件的,调用完setVisible之后,只有少数方法,比如repaint,addListener等等。最后,根据jdk文档所言,这个方法是用来给应用程序线程改变GUI外观的。如果非要直接改,不一定会出错,多线程本来就是比较难说的,呵呵,我也没试过,改天可以尝试一下。
每 個執行緒(线程)處理一個請求,每次執行緒執行完請求後,再次嘗試取得下一個請求並執行,這是Worker Thread的基本概念,對於一些需要冗長計算或要在背景執行的請求,可以採用Worker Thread。
在 Thread-Per-Message 模式中,其實已經有點Worker Thread的概念,在Service物件接收到資料後,以匿名方式建立執行緒來處理資料,那個建立的執行緒就是Worker Thread,只不用過就丟了。
Worker Thread可以應用在不同的場合,例如在 Guarded Suspension 模式 的範例,是使用一個執行緒來處理請求佇列中的請求,如果請求不斷來到,且請求中可能有冗長的處理,則請求佇列中的請求可能會來不及消化。
您可以為請求佇列中的每個請求配給一個執行緒來處理,不過實際上,只要建立足夠多的執行緒即可,
Worker Thread模式在Request的管理上像是 Producer Consumer 模式,在Request的行为上像是 Command 模式。
Producer Consumer模式专注于Product的生产与消费,至于Product被消费时是作何处理,则不在它的讨论范围之中。
如果您的Product是一个Request,消费者取得Request之后,执行Request中指定的请求方法,也就是使用Command模式,并且您的Request缓冲区还管理了Consumer,就有Worker Thread模式的意思了。
在Sequence Diagram上,可以看出Worker Thread同时展现了Producer Consumer模式与Command模式:
public class Request {
private final String name; // 委托者
private final int number; // 请求编号
private static final Random random = new Random();
public Request(String name, int number) {
this.name = name;
this.number = number;
}
public void execute() {
System.out.println(Thread.currentThread().getName() + " executes " + this);
try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
}
}
public String toString() {
return "[ Request from " + name + " No." + number + " ]";
}
}
ClientThread:
public class ClientThread extends Thread {
private final Channel channel;
private static final Random random = new Random();
public ClientThread(String name, Channel channel) {
super(name);
this.channel = channel;
}
public void run() {
try {
for (int i = 0; true; i++) { //可以修改为合适的
Request request = new Request(getName(), i);
channel.putRequest(request);
Thread.sleep(random.nextInt(1000));
}
} catch (InterruptedException e) {
}
}
}
WorkerThread:
public class WorkerThread extends Thread {
private final Channel channel;
public WorkerThread(String name, Channel channel) {
super(name);
this.channel = channel;
}
public void run() {
while (true) {
Request request = channel.takeRequest();
request.execute();
}
}
}
Channel: 可以LinkedList
public class Channel {
private static final int MAX_REQUEST = 100;
private final Request[] requestQueue;
private int tail; // 下一个putRequest的地方
private int head; // 下一个takeRequest的地方
private int count; // Request的数量 private final WorkerThread[] threadPool; public Channel(int threads) {
this.requestQueue = new Request[MAX_REQUEST];
this.head = 0;
this.tail = 0;
this.count = 0; threadPool = new WorkerThread[threads];
for (int i = 0; i < threadPool.length; i++) {
threadPool[i] = new WorkerThread("Worker-" + i, this);
}
}
public void startWorkers() {
for (int i = 0; i < threadPool.length; i++) {
threadPool[i].start();
}
}
public synchronized void putRequest(Request request) {
while (count >= requestQueue.length) {
try {
wait();
} catch (InterruptedException e) {
}
}
requestQueue[tail] = request;
tail = (tail + 1) % requestQueue.length;
count++;
notifyAll();
}
public synchronized Request takeRequest() {
while (count <= 0) {
try {
wait();
} catch (InterruptedException e) {
}
}
Request request = requestQueue[head];
head = (head + 1) % requestQueue.length;
count--;
notifyAll();
return request;
}
}
测试:
public class Main {
public static void main(String[] args) {
Channel channel = new Channel(5); // 工人线程的數量
channel.startWorkers();
new ClientThread("Alice", channel).start();
new ClientThread("Bobby", channel).start();
new ClientThread("Chris", channel).start();
}
}
参考了:http://www.riabook.cn/doc/designpattern/WorkerThread.htm
http://openhome.cc/Gossip/DesignPattern/WorkerThread.htm