14.多线程设计模式 - Master-Worker模式

时间:2022-07-16 16:22:11

多线程设计模式 - Master-Worker模式

 并发设计模式属于设计优化的一部分,它对于一些常用的多线程结构的总结和抽象。与串行相比并行程序结构通常较为复杂,因此合理的使用并行模式在多线程并发中更具有意义。


1. Master-Worker模式

  • - Master-Worker模式是常用的并行模式。它的核心思想是系统由两类进程协作工作:Master进程和Worker进程。Master负责接收和分配任务,Worker负责处理子任务。当各个Worker子进程处理完成后,会将结果返回给Master,由Master做归纳和总结。
  • - 其好处是能将一个大任务分解成若干个小任务,并行执行,从而提高系统的吞吐量。

示例:下
说明:看类注释即可明了 不明白可以去屎了

  1 //Task.java  
  2   public class Task {
  3 
  4       private int id;
  5       private String name;
  6       private int price;
  7       public int getId() {
  8           return id;
  9       }
 10       public void setId(int id) {
 11           this.id = id;
 12       }
 13       public String getName() {
 14           return name;
 15       }
 16       public void setName(String name) {
 17           this.name = name;
 18       }
 19       public int getPrice() {
 20           return price;
 21       }
 22       public void setPrice(int price) {
 23           this.price = price;
 24       }
 25   }
 26   //Worker.java   
 27   public class Worker implements Runnable{
 28 
 29       private ConcurrentLinkedQueue<Task> workQueue;
 30       private ConcurrentHashMap<String,Object> resultMap;
 31 
 32       @Override
 33       public void run() {
 34           while(true){
 35               Task input = this.workQueue.poll();
 36               if(input==null)break;
 37               //真正的去做业务处理
 38               Object output = MyWorker.handle(input);
 39               this.resultMap.put(Integer.toString(input.getId()), output);
 40           }
 41       }
 42 
 43       public static Object handle(Task input){
 44           return null;
 45       }
 46 
 47       public void setWorkerQueue(ConcurrentLinkedQueue<Task> workQueue) {
 48           this.workQueue=workQueue;
 49       }
 50 
 51       public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
 52           this.resultMap=resultMap;
 53 
 54       }
 55   }
 56 
 57   //MyWorker.java继承Worker.java,重写handle方法
 58   public class MyWorker extends Worker{
 59 
 60       public static Object handle(Task input) {
 61           Object output = null;
 62           try {
 63               //处理task的耗时,可能是数据的加工,也可能是操作数据库
 64               Thread.sleep(500);
 65               output = input.getPrice();
 66           } catch (InterruptedException e) {
 67               e.printStackTrace();
 68           }
 69           return output;
 70       }
 71 
 72   }
 73   //Master.java *
 74   public class Master {
 75 
 76       //1 应该有一个承装任务的集合
 77       private ConcurrentLinkedQueue<Task> workQueue = new  ConcurrentLinkedQueue<Task>();
 78       //2 使用普通的HashMap去承装所有的worker对象
 79       private HashMap<String,Thread> workers = new HashMap<String,Thread>();
 80       //3 使用一个容器承装每一个worker并发执行的结果集
 81       private ConcurrentHashMap<String,Object> resultMap = new ConcurrentHashMap<String,Object>();
 82       //4 构造方法
 83       public Master(Worker worker,int workerCount){
 84           //每一个worker对象都需要有master的应用workQueue用于任务的领取,resultMap用于任务的提交
 85           worker.setWorkerQueue(this.workQueue);
 86           worker.setResultMap(this.resultMap);
 87 
 88           for(int i=0;i<workerCount;i++){
 89               //key表示每一个worker的名字,value表示线程执行对象
 90               workers.put("子节点"+Integer.toString(i), new Thread(worker));
 91           }
 92       }
 93       //5 提交方法
 94       public void submit(Task task){
 95           this.workQueue.add(task);
 96       }
 97       //6 需要有一个执行的方法(启动应用程序让所有的worker工作)
 98       public void execute(){
 99           for(Map.Entry<String,Thread> me:workers.entrySet()){
100               me.getValue().start();
101           }
102       }
103       //8 判断线程是否执行完毕
104       public boolean isComplete() {
105           for(Map.Entry<String, Thread> me:workers.entrySet()){
106               if(me.getValue().getState()!=Thread.State.TERMINATED){
107                   return false;
108               }
109           }
110           return true;
111       }
112       //9 返回结果集数据
113       public int getResult() {
114           int ret = 0;
115           for(Map.Entry<String,Object> me:resultMap.entrySet()){
116               //汇总逻辑
117               ret+=(Integer)me.getValue();
118           }
119           return ret;
120       }
121   }
122   //主函数
123   public class Main {
124 
125       public static void main(String[] args) {
126           System.out.println("我的机器可用processor的数量:"+Runtime.getRuntime().availableProcessors());
127           Master master = new Master(new MyWorker(),10);
128 
129           Random r = new Random();
130           for(int i=0;i<100;i++){
131               Task task = new Task();
132               task.setId(i);
133               task.setName("任务"+i);
134               task.setPrice(r.nextInt(1000));
135               master.submit(task);
136           }
137           master.execute();
138 
139           long start = System.currentTimeMillis();
140           while(true){
141               if(master.isComplete()){
142                   long end = System.currentTimeMillis() -start;
143                   int ret = master.getResult();
144                   System.out.println("最终结果:"+ret+",执行耗时:"+end+"毫秒");
145                   break;
146               }
147           }
148       }
149 
150   }