分布式计算forkJoin模拟

时间:2022-02-12 03:31:37

分布式计算ForkJoin原理

ForkJoin是把一个总任务分成多个线程执行,然后把获得的结果进行汇总,已达到快速执行,提高效率。以下Master\worker的形式模拟。

Master:用于接收客户端发来的任务,并把任务分配给指定的worker,并把worker计算的结果进行汇总, 是一个任务的总控制器

Worker:是正真计算的对象,把计算结果返回给Master的一个容器中。

Task:任务,需要计算的任务。

大概结构如下

代码如下

Master

package com.sf.simba.myforkjoin;import java.util.HashMap;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.concurrent.CountDownLatch;/** * 模拟forkjoin 分布式计算 * Master为控制器,用于接收客户端发送过来的任务和分配任务到各个worker, * 每个worker计算后把结果返回给Master * @author 734621 * */public class Master { //1、一个装任务的容器private ConcurrentLinkedQueue<Task> taskes = new ConcurrentLinkedQueue<Task>(); //2、一个装Worker的容器,其中worker是一个线程private HashMap<String,Thread> workers = new HashMap<String,Thread>(); //3、一个装运算结果的容器private ConcurrentHashMap<String,Object> result = new ConcurrentHashMap<String,Object>(); // 统计任务完成情况private final CountDownLatch workerComplete; //4、初始化workerpublic Master(Worker worker,int taskCount,CountDownLatch countDownLatch){//每一个worker都必须要有master的引用,用于分配任务worker.setTaskes(taskes);//把结果给work,用于计算完后返回给masterworker.setResult(result);this.workerComplete = countDownLatch;worker.setCountDownLatch(workerComplete);for(int i=0;i<taskCount;i++){/** * key表示每一个worker的名字,value表示线程执行对象, * 这里创建一个对象即可,不需要创建十个对象,只需创建十个线程即可 */workers.put("子节点"+i, new Thread(worker));}} //5、提交任务public void submit(Task task){taskes.add(task);}   //6、执行任务public void execute(){for(Map.Entry<String, Thread> mc :workers.entrySet()){System.out.println(mc.getKey()+"开始执行.....");mc.getValue().start();}}public int getResult(){int ret = 0;for(Map.Entry<String, Object> rs :result.entrySet()){ret += (Integer)rs.getValue();}return ret;}}

Worker

package com.sf.simba.myforkjoin;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;

/**
* 执行Master发送过来的任务,并将产生的结果发送给Master
* @author 734621
*
*/
public class Worker implements Runnable{
private ConcurrentLinkedQueue<Task> taskes;
private ConcurrentHashMap<String,Object> result;
private CountDownLatch workerComplete;

public void setTaskes(ConcurrentLinkedQueue<Task> taskes) {
this.taskes = taskes;
}
public void setResult(ConcurrentHashMap<String, Object> result) {
this.result = result;
}

@Override
public void run() {
Integer output = null;
while(true){
Task input = taskes.poll();//拿出一个值并移除
if(input == null) break;
output =(Integer)handler(input);
result.put(Integer.toString(input.getId()), output);
workerComplete.countDown();
}
}

public Object handler(Task input){
Object output = null;
try {
System.out.println(input.getName()+":执行任务中.....");
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
output = input.getPrice();
return output;
}
public void setCountDownLatch(CountDownLatch workerComplete) {
this.workerComplete = workerComplete;

}

}
Task

package com.sf.simba.myforkjoin;

public class Task {
private int id;
private String name;
private int price;

public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getPrice() {
return price;
}
public void setPrice(int price) {
this.price = price;
}

}



Main 测试类

package com.sf.simba.myforkjoin;

import java.util.Random;
import java.util.concurrent.CountDownLatch;

public class Main {

public static void main(String[] args) {
int allTaskCount = 100;//总共100个任务
//用于统计任务完成情况
CountDownLatch countDownLatch = new CountDownLatch(allTaskCount);
Master master = new Master(new Worker(),10,countDownLatch);
Random random = new Random();
for(int i=1;i<=allTaskCount;i++){
Task task = new Task();
task.setId(i);
task.setName("任务"+i);
task.setPrice(random.nextInt(1000));
master.submit(task);
}

master.execute();
try {
countDownLatch.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("总结果为:"+master.getResult());
//while(true){
//if(master.isComplete()){
//
//break;
//}
//}


}

}


结果如下:


子节点0开始执行.....
子节点2开始执行.....
任务1:执行任务中.....
子节点1开始执行.....
任务2:执行任务中.....
子节点4开始执行.....
任务3:执行任务中.....
子节点3开始执行.....
任务4:执行任务中.....
子节点6开始执行.....
任务5:执行任务中.....
子节点5开始执行.....
任务6:执行任务中.....
子节点8开始执行.....
任务7:执行任务中.....
子节点7开始执行.....
任务8:执行任务中.....
子节点9开始执行.....
任务9:执行任务中.....
任务10:执行任务中.....
任务11:执行任务中.....
任务12:执行任务中.....
任务18:执行任务中.....
任务17:执行任务中.....
任务16:执行任务中.....
任务15:执行任务中.....
任务14:执行任务中.....
任务13:执行任务中.....
任务19:执行任务中.....
任务20:执行任务中.....
任务21:执行任务中.....
任务22:执行任务中.....
任务25:执行任务中.....
任务26:执行任务中.....
任务27:执行任务中.....
任务28:执行任务中.....
任务29:执行任务中.....
任务23:执行任务中.....
任务24:执行任务中.....
任务30:执行任务中.....
任务31:执行任务中.....
任务32:执行任务中.....
任务37:执行任务中.....
任务36:执行任务中.....
任务35:执行任务中.....
任务34:执行任务中.....
任务33:执行任务中.....
任务38:执行任务中.....
任务39:执行任务中.....
任务40:执行任务中.....
任务41:执行任务中.....
任务42:执行任务中.....
任务46:执行任务中.....
任务45:执行任务中.....
任务44:执行任务中.....
任务43:执行任务中.....
任务47:执行任务中.....
任务48:执行任务中.....
任务49:执行任务中.....
任务50:执行任务中.....
任务51:执行任务中.....
任务53:执行任务中.....
任务56:执行任务中.....
任务55:执行任务中.....
任务52:执行任务中.....
任务54:执行任务中.....
任务57:执行任务中.....
任务58:执行任务中.....
任务59:执行任务中.....
任务60:执行任务中.....
任务61:执行任务中.....
任务62:执行任务中.....
任务66:执行任务中.....
任务65:执行任务中.....
任务64:执行任务中.....
任务63:执行任务中.....
任务67:执行任务中.....
任务68:执行任务中.....
任务69:执行任务中.....
任务70:执行任务中.....
任务71:执行任务中.....
任务72:执行任务中.....
任务75:执行任务中.....
任务74:执行任务中.....
任务73:执行任务中.....
任务76:执行任务中.....
任务78:执行任务中.....
任务79:执行任务中.....
任务77:执行任务中.....
任务80:执行任务中.....
任务81:执行任务中.....
任务82:执行任务中.....
任务86:执行任务中.....
任务85:执行任务中.....
任务84:执行任务中.....
任务83:执行任务中.....
任务87:执行任务中.....
任务88:执行任务中.....
任务89:执行任务中.....
任务90:执行任务中.....
任务91:执行任务中.....
任务92:执行任务中.....
任务93:执行任务中.....
任务94:执行任务中.....
任务95:执行任务中.....
任务96:执行任务中.....
任务97:执行任务中.....
任务98:执行任务中.....
任务99:执行任务中.....
任务100:执行任务中.....
总结果为:50140