并发编程的MasterWorker模式

时间:2022-01-10 16:27:18

Master-Worker模式是常用的并发计算模式,它的核心思想是系统由两类进程协作工作:Master进程和Worker进程。Master负责接收和分配任务。Worker负责处理子任务。当各个Worker进程处理完成后,会将结果返回给Master,有Master做归纳和总结。其好处是能将一个大任务分解成若干个小任务,并行执行,从而提高系统的吞吐量。

并发编程的MasterWorker模式

下面我自己将会介绍一下代码怎么写,可能表达能力有点差。。

首先是Master。首先得有一个队列是存放客户端发过来的任务。由于不用考虑多线程并发,只要高性能即可,我们这里就用到ConcurrentLinkedQueue。然后我们得有一个存放worker的东西,由于也是不用考虑并发什么的,我们用个HashMap存放即可,HashMap<String,Thread>,是这样的,这里的String这是想着给每个worker一个名字,而Thread是因为这里的Worker都将是用线程来执行,所以Worker也需要实现Runnable接口。最后,Master还需要一个容器来存放所有的结果。考虑到是多个worker来处理任务,会出现多线程并发问题,所以我们这里使用的是ConcurrentHashMap来存放每个Worker执行任务的结果。

然后是Worker:上面也说到了,首先Worker需要实现Runnable接口。然后最重要的是Worker需要有上面Master存放任务队列和结果集引用。第一因为是Worker需要从队列中获取任务来执行,第二是因为Worker需要将执行完任务的结果存放到Master的结果集中。

最后是任务Task:任务随便写就好了,看你是做什么的,现在我们假设是取任务里面的price出来就好。

现在先看一下Task的代码:
public class Task {
	private int id;
	private String name;
	private int price;
	public int getId() {
		return id;
	}
	public void setId(int id) {
		this.id = id;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public int getPrice() {
		return price;
	}
	public void setPrice(int price) {
		this.price = price;
	}
}
Master的代码:
public class Master {
	//使用ConcurrentLinkedQueue承载全部任务(高性能,无阻塞队列)
	private ConcurrentLinkedQueue<Task> taskQueue = new ConcurrentLinkedQueue<Task>();
	//使用HashMap承载所有Worker对象(worker都需要实现Runnable接口)
	private HashMap<String,Thread> workers = new HashMap<String,Thread>();
	//使用ConcurrentHashMap承载每个worker执行任务的结果集
	private ConcurrentHashMap<String,Object> results = new ConcurrentHashMap<String,Object>();
	
	//构造方法
	public Master(Worker worker,int workerCount){
		//每个Worker都需要有workQueue和results的引用
		worker.setTaskQueue(taskQueue);
		worker.setResults(results);
		
		for(int i=0;i<workerCount;i++){
			//key表示worker的名字,value代表线程执行的worker对象
			workers.put("work"+Integer.toString(i) , new Thread(worker));
		}
	}
	
	//提交方法
	public void submit(Task task){
		this.taskQueue.add(task);
	}
	//需要有一个执行的方法(让队列中的所有woker工作)
	public void excute(){
		for(Map.Entry<String,Thread> me : workers.entrySet()){
			me.getValue().start();  //woker执行线程
		}
	}
	
	//返回最终的业务处理结果(这里只是简单地将result加起来)
	public int getResult(){
		int result = 0;
		for(Map.Entry<String, Object> me : results.entrySet()){
			result += (int)me.getValue();
		}
		return result;
	}
	//判断master的所有任务执行完成没有
	public boolean isCompelete() {
		//判断规则(只要有一个worker在执行,都返回false)
		for(Map.Entry<String, Thread> me : workers.entrySet()){
			if(me.getValue().getState()!=Thread.State.TERMINATED){
				return false;
			}
		}
		return true;
	}
}

Worker的代码:

public class Worker implements Runnable{
	
	//因为Worker需要拿Master中的任务,所以需要有Master的taskQueue的引用
	private ConcurrentLinkedQueue<Task> taskQueue;
	//因为Worker需要将执行完的结果给Master,所以需要Master的results引用
	private ConcurrentHashMap<String,Object> results;
	
	@Override
	public void run() {
		while(true){
			Task task = taskQueue.poll();  //将任务从队列中拿出来
			if(task == null){  //如果队列里面没任务了就跳出循环
				break;
			}
			Object result = handle(task);  //业务处理
			this.results.put(Integer.toString(task.getId()), result);  //将业务处理的结果放入结果map里面
		}
	}
	
	//将业务抽出来作为一个方法
	public static Object handle(Task task){
		Object result  = null;
		try {
			Thread.sleep(500); //假设是处理业务的时长
			result = task.getPrice();  //业务只是简单地将price拿出来
		} catch (InterruptedException e) {
			e.printStackTrace();
		} 
		return result;
	}

	public ConcurrentLinkedQueue<Task> getTaskQueue() {
		return taskQueue;
	}

	public void setTaskQueue(ConcurrentLinkedQueue<Task> taskQueue) {
		this.taskQueue = taskQueue;
	}

	public ConcurrentHashMap<String, Object> getResults() {
		return results;
	}

	public void setResults(ConcurrentHashMap<String, Object> results) {
		this.results = results;
	}
}
客户端代码:
public class Main {
	
	public static void main(String[] args) {
		int maxProcessCount = Runtime.getRuntime().availableProcessors(); //当前机器的可用处理器数量(线程数)
		System.out.println(maxProcessCount);
		Master master = new Master(new Worker(), maxProcessCount);
		for(int i=0;i<100;i++){
			Task task = new Task();
			task.setId(i);
			task.setName("任务"+i);
			task.setPrice(i);
			master.submit(task);
		}
		master.excute();  //执行master
		long start = System.currentTimeMillis();
		while(true){
			if(master.isCompelete()){
				//如果master中的worker都执行完了,就获取最后的结果
				long end = System.currentTimeMillis();
				long time = end - start;
				int result = master.getResult();
				System.out.println("花费时间:"+time+"  最后结果:"+result);
				break;
			}
			
		}
	}
}