概念:
master的作用是用于接收和分配任务
worker是具体干活的,一个worker是一个线程。
1、master:
在master 类中定义了三个变量
1)ConcurrentLinkedQueue,用于存储任务
2)HashMap<String, Thread> workerMap 用于管理worker
3) ConcurrentHashMap<String, Object> resultMap 用于汇总各worker的执行结果
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import com.zx.entity.Task;
public class Master {
private ConcurrentLinkedQueue<Task> workerQueues = new ConcurrentLinkedQueue<Task>();
private HashMap<String, Thread> workerMap = new HashMap();
private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>();
public Master(Worker worker, int count) {
worker.setWorkerQueue(this.workerQueues);
worker.setResultMap(this.resultMap);
for (int i = 0; i < count; i++) {
workerMap.put("线程" + i, new Thread(worker));
}
}
public void submit(Task task) {
// TODO Auto-generated method stub
this.workerQueues.add(task);
}
public int getResultMap() {
int sum = 0;
for (Entry<String, Object> result : resultMap.entrySet()) {
Object obj = result.getValue();
sum += Integer.parseInt(obj.toString());
}
return sum;
}
public boolean iscomplete() {
// TODO Auto-generated method stub
for (Map.Entry<String, Thread> worker : workerMap.entrySet())
if (worker.getValue().getState() != Thread.State.TERMINATED) {
return false;
}
return true;
}
public void execute() {
// TODO Auto-generated method stub
for(Map.Entry<String, Thread> worker : workerMap.entrySet() ){
worker.getValue().start();
}
}
}
2、worker:
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import com.zx.entity.Task;
public class Worker implements Runnable{
private ConcurrentLinkedQueue<Task> workerQueue;
private ConcurrentHashMap<String, Object> resulttMap;
@Override
public void run() {
// TODO Auto-generated method stub
while(true)
{
Task task=this.workerQueue.poll();
if(task==null)
break;
Object handle=handle(task);
this.resulttMap.put(task.getId(), handle);
}
}
private Object handle(Task task) {
// TODO Auto-generated method stub
return 1;
}
public void setWorkerQueue(ConcurrentLinkedQueue<Task> workerQueues) {
// TODO Auto-generated method stub
this.workerQueue=workerQueues;
}
public void setResultMap(ConcurrentHashMap<String, Object> resulttMap) {
// TODO Auto-generated method stub
this.resulttMap=resulttMap;
}
}