Master-Worker模式是常用的并发计算模式,它的核心思想是系统由两类进程协作工作:Master进程和Worker进程。Master负责接收和分配任务。Worker负责处理子任务。当各个Worker进程处理完成后,会将结果返回给Master,有Master做归纳和总结。其好处是能将一个大任务分解成若干个小任务,并行执行,从而提高系统的吞吐量。
下面我自己将会介绍一下代码怎么写,可能表达能力有点差。。
首先是Master。首先得有一个队列是存放客户端发过来的任务。由于不用考虑多线程并发,只要高性能即可,我们这里就用到ConcurrentLinkedQueue。然后我们得有一个存放worker的东西,由于也是不用考虑并发什么的,我们用个HashMap存放即可,HashMap<String,Thread>,是这样的,这里的String这是想着给每个worker一个名字,而Thread是因为这里的Worker都将是用线程来执行,所以Worker也需要实现Runnable接口。最后,Master还需要一个容器来存放所有的结果。考虑到是多个worker来处理任务,会出现多线程并发问题,所以我们这里使用的是ConcurrentHashMap来存放每个Worker执行任务的结果。
然后是Worker:上面也说到了,首先Worker需要实现Runnable接口。然后最重要的是Worker需要有上面Master存放任务队列和结果集引用。第一因为是Worker需要从队列中获取任务来执行,第二是因为Worker需要将执行完任务的结果存放到Master的结果集中。
最后是任务Task:任务随便写就好了,看你是做什么的,现在我们假设是取任务里面的price出来就好。
现在先看一下Task的代码:
public class Task { private int id; private String name; private int price; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getPrice() { return price; } public void setPrice(int price) { this.price = price; } }Master的代码:
public class Master { //使用ConcurrentLinkedQueue承载全部任务(高性能,无阻塞队列) private ConcurrentLinkedQueue<Task> taskQueue = new ConcurrentLinkedQueue<Task>(); //使用HashMap承载所有Worker对象(worker都需要实现Runnable接口) private HashMap<String,Thread> workers = new HashMap<String,Thread>(); //使用ConcurrentHashMap承载每个worker执行任务的结果集 private ConcurrentHashMap<String,Object> results = new ConcurrentHashMap<String,Object>(); //构造方法 public Master(Worker worker,int workerCount){ //每个Worker都需要有workQueue和results的引用 worker.setTaskQueue(taskQueue); worker.setResults(results); for(int i=0;i<workerCount;i++){ //key表示worker的名字,value代表线程执行的worker对象 workers.put("work"+Integer.toString(i) , new Thread(worker)); } } //提交方法 public void submit(Task task){ this.taskQueue.add(task); } //需要有一个执行的方法(让队列中的所有woker工作) public void excute(){ for(Map.Entry<String,Thread> me : workers.entrySet()){ me.getValue().start(); //woker执行线程 } } //返回最终的业务处理结果(这里只是简单地将result加起来) public int getResult(){ int result = 0; for(Map.Entry<String, Object> me : results.entrySet()){ result += (int)me.getValue(); } return result; } //判断master的所有任务执行完成没有 public boolean isCompelete() { //判断规则(只要有一个worker在执行,都返回false) for(Map.Entry<String, Thread> me : workers.entrySet()){ if(me.getValue().getState()!=Thread.State.TERMINATED){ return false; } } return true; } }
Worker的代码:
public class Worker implements Runnable{ //因为Worker需要拿Master中的任务,所以需要有Master的taskQueue的引用 private ConcurrentLinkedQueue<Task> taskQueue; //因为Worker需要将执行完的结果给Master,所以需要Master的results引用 private ConcurrentHashMap<String,Object> results; @Override public void run() { while(true){ Task task = taskQueue.poll(); //将任务从队列中拿出来 if(task == null){ //如果队列里面没任务了就跳出循环 break; } Object result = handle(task); //业务处理 this.results.put(Integer.toString(task.getId()), result); //将业务处理的结果放入结果map里面 } } //将业务抽出来作为一个方法 public static Object handle(Task task){ Object result = null; try { Thread.sleep(500); //假设是处理业务的时长 result = task.getPrice(); //业务只是简单地将price拿出来 } catch (InterruptedException e) { e.printStackTrace(); } return result; } public ConcurrentLinkedQueue<Task> getTaskQueue() { return taskQueue; } public void setTaskQueue(ConcurrentLinkedQueue<Task> taskQueue) { this.taskQueue = taskQueue; } public ConcurrentHashMap<String, Object> getResults() { return results; } public void setResults(ConcurrentHashMap<String, Object> results) { this.results = results; } }客户端代码:
public class Main { public static void main(String[] args) { int maxProcessCount = Runtime.getRuntime().availableProcessors(); //当前机器的可用处理器数量(线程数) System.out.println(maxProcessCount); Master master = new Master(new Worker(), maxProcessCount); for(int i=0;i<100;i++){ Task task = new Task(); task.setId(i); task.setName("任务"+i); task.setPrice(i); master.submit(task); } master.excute(); //执行master long start = System.currentTimeMillis(); while(true){ if(master.isCompelete()){ //如果master中的worker都执行完了,就获取最后的结果 long end = System.currentTimeMillis(); long time = end - start; int result = master.getResult(); System.out.println("花费时间:"+time+" 最后结果:"+result); break; } } } }