一、Master-Worker设计模式
Master-Worker模式是常用的并行设计模式。它的核心思想是,系统有两个进程协议工作:Master进程和Worker进程。Master进程负责接收和分配任务,Worker进程负责处理子任务。当各个Worker进程将子任务处理完后,将结果返回给Master进程,由Master进行归纳和汇总,从而得到系统结果。
Master-Worker模式的好处是,它能将大任务分解成若干个小任务,并发执行,从而提高系统性能。而对于系统请求者Client来说,任务一旦提交,Master进程就会立刻分配任务并立即返回,并不会等系统处理完全部任务再返回,其处理过程是异步的。
二、Master-Worker设计模式代码实现
1、创建Task任务对象
package com.ietree.basicskill.mutilthread.designpattern.masterworker; /**
* Created by Root on 5/12/2017.
*/
public class Task { private int id; private String name; private int price; public int getId() {
return id;
} public void setId(int id) {
this.id = id;
} public String getName() {
return name;
} public void setName(String name) {
this.name = name;
} public int getPrice() {
return price;
} public void setPrice(int price) {
this.price = price;
}
}
2、实现Worker对象
package com.ietree.basicskill.mutilthread.designpattern.masterworker; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue; /**
* Created by Root on 5/12/2017.
*/
public class Worker implements Runnable { private ConcurrentLinkedQueue<Task> workQueue;
private ConcurrentHashMap<String, Object> resultMap; public void setWorkerQueue(ConcurrentLinkedQueue<Task> workQueue) {
this.workQueue = workQueue;
} public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
this.resultMap = resultMap;
} @Override
public void run() {
while (true) {
Task input = this.workQueue.poll();
if (input == null) {
break;
}
// 真正的去做业务处理
//Object output = handle(input);
// 改造
Object output = MyWorker.handle(input);
// 返回处理结果集
this.resultMap.put(Integer.toString(input.getId()), output);
}
} // private Object handle(Task input) {
// Object output = null;
// try {
// // 表示处理task任务的耗时,可能是数据的加工,也可能是操作数据库......
// Thread.sleep(500);
// output = input.getPrice();
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// return output;
// } // 优化,考虑让继承类去自己实现具体的业务处理
public static Object handle(Task input) {
return null;
} }
3、为了使程序更灵活,将具体的业务执行逻辑抽离,在具体的Worker对象去实现,如这里的MyWorker对象
package com.ietree.basicskill.mutilthread.designpattern.masterworker; /**
* Created by Root on 5/13/2017.
*/
public class MyWorker extends Worker { public static Object handle(Task input) {
Object output = null;
try {
// 表示处理task任务的耗时,可能是数据的加工,也可能是操作数据库......
Thread.sleep(500);
output = input.getPrice();
} catch (InterruptedException e) {
e.printStackTrace();
}
return output;
} }
4、Master类
package com.ietree.basicskill.mutilthread.designpattern.masterworker; import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue; /**
* Created by Root on 5/12/2017.
*/
public class Master { // 1、使用一个ConcurrentLinkedQueue集合来装载所有需要执行的任务
private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>(); // 2、使用HashMap来装载所有的worker对象
private HashMap<String, Thread> workers = new HashMap<String, Thread>(); // 3、使用一个容器承装每一个worker并发执行任务的结果集
private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>(); // 4、构造方法
public Master(Worker worker, int workerCount) {
// 每一个worker对象都需要有Master的引用,workQueue用于任务的领取,resultMap用于任务的提交
worker.setWorkerQueue(this.workQueue);
worker.setResultMap(this.resultMap); for (int i = 0; i < workerCount; i++) {
workers.put("子节点" + Integer.toString(i), new Thread(worker));
}
} // 5、提交方法
public void submit(Task task) {
this.workQueue.add(task);
} // 6、需要有一个执行的方法(启动应用程序,让所有的worker工作)
public void execute() {
for (Map.Entry<String, Thread> me : workers.entrySet()) {
me.getValue().start();
}
} // 7、判断线程是否执行完毕
public boolean isComplete() {
for (Map.Entry<String, Thread> me : workers.entrySet()) {
// 判断所有的线程状态是否属于已停止状态
if (me.getValue().getState() != Thread.State.TERMINATED) {
return false;
}
}
return true;
} // 8、返回结果集数据
public int getResult() {
int ret = 0;
for (Map.Entry<String, Object> me : resultMap.entrySet()) {
// 汇总逻辑
ret += (Integer) me.getValue();
}
return ret;
} }
5、测试,具体调用实现
package com.ietree.basicskill.mutilthread.designpattern.masterworker; import java.util.Random; /**
* Created by Root on 5/13/2017.
*/
public class MasterWorkerTest { public static void main(String[] args) { // Master master = new Master(new Worker(), 10);
// 改造
// Master master = new Master(new MyWorker(), 10);
// 改造(获取当前机器可用线程数)
System.out.println("我的机器可用Processors数量:" + Runtime.getRuntime().availableProcessors());
Master master = new Master(new MyWorker(), Runtime.getRuntime().availableProcessors()); Random r = new Random();
for (int i = 1; i <= 10; i++) {
Task t = new Task();
t.setId(i);
t.setName("任务" + i);
t.setPrice(r.nextInt(1000));
master.submit(t);
}
master.execute(); long start = System.currentTimeMillis(); while (true) {
if (master.isComplete()) {
long end = System.currentTimeMillis() - start;
int ret = master.getResult();
System.out.println("最终的结果:" + ret + ",执行耗时:" + end);
break;
}
}
} }
程序输出:
我的机器可用Processors数量:20
最终的结果:4473,执行耗时:500
从上面的运行结果来看,程序最终执行时间几乎就等于一个线程单独运行的时间,在此注意的是,同时执行的线程数是根据你执行此程序的机器配置决定的。