@(分布式计算集群)
Master-Worker设计模式在生活中非常地常见,比如:督工和工人,督工给每个工人分配任务,也监督每个工人的工作情况。一般还有以下场景:如果有个工人身体不好,就重新将他的任务分配给其他人;现实中一般督工就一个,督工自己也有可能身体不好,然后呢?就没有然后了——也有可能由一个工人暂时顶替,或者再从其他小组挪一个督工,或者由一直陪同的小秘顶上。额外补充的场景是为了能够应对不正常事件,保证任务能够准时完成。同样的,程序设计中,也有这样的场景存在,尤其是任务量很大的时候。那我们是怎样完成这样的场景抽象,又是如何保证异常问题下的正常任务执行的呢?Hadoop中的Master-Worker设计模式又是怎么指导MapReduce工作流程的?
Master-Worker设计模式介绍
Master-Worker模式是常用的并行设计模式。核心思想是,系统由两个角色组成,Master和Worker,Master负责接收和分配任务,Worker负责处理子任务。任务处理过程中,Master还负责监督任务进展和Worker的健康状态;Master将接收Client提交的任务,并将任务的进展汇总反馈给Client。各角色关系如下图
Master-Worker模式满足于可以将大任务划分为小任务的场景,是一种分而治之的设计理念。通过多线程或者多进程多机器的模式,可以将小任务处理分发给更多的CPU处理,降低单个CPU的计算量,通过并发/并行提高任务的完成速度,提高系统的性能。
具体细节如上图,Master对任务进行切分,并放入任务队列;然后,触发Worker处理任务。实际操作中,任务的分配有多种形式,如Master主动拉起Workder进程池或线程池,并将任务分配给Worker;或者由Worker主动领取任务,这样的Worker一般是常驻进程;还有一种解耦的方式,即Master指做任务的接收、切分和结果统计,指定Worker的数量和性能指标,但不参与Worker的实际管理,而是交由第三方调度监控和调度Worker。
用例说明
1. 简单demo(来自网友编写,做了些细节调整)
Master类定义
package com.linesum.linjx.test.masterWorker;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
/** * Master定义 * @author linjx * @Date 2016年9月22日 */
public class Master {
/** * 任务列表(这边作为Master成员变量,可以拆分) */
private Queue<Object> jobQueue = new ConcurrentLinkedQueue<Object>();
/** * Worker组 */
private Map<String, Thread> workers = new HashMap<>();
/** * 结果集(这边作为Master成员变量,可以拆分) */
private Queue<Object> results = new ConcurrentLinkedQueue<Object>();
/** * 原始任务数 */
private AtomicLong oriJobNum = new AtomicLong(0);
/** * @param workerNum,worker数量上限 * @param workerClass, worker实现类 * @throws IllegalAccessException * @throws InstantiationException */
public Master(int workerNum, Class<? extends Worker> workerClass)
throws InstantiationException, IllegalAccessException {
for (int i = 0; i < workerNum; i ++) {
Worker worker = workerClass.newInstance();
worker.setJobQueue(jobQueue);
worker.setResults(results);
worker.setId(i);
workers.put(Integer.toString(i),
new Thread(worker, Integer.toString(i)));
}
}
/** * 任务是否已经完成 * @return */
public boolean isComplete() {
for (Map.Entry<String, Thread> worker : workers.entrySet()) {
if (Thread.State.TERMINATED != worker.getValue().getState()) {
return false;
}
}
return true;
}
/** * 由Client提交一个子任务 * @param job */
public void submit(Object job) {
oriJobNum.incrementAndGet(); //允许动态增加任务
jobQueue.add(job);
}
/** * 返回子任务结果集 * @return */
public Queue<Object> getResults() {
return results;
}
/** * 将运行结果放到结果集 * @param result */
public void putResult(Object result) {
results.add(result);
}
/** * 启动worker,进行任务处理 */
public void execute() {
for(Map.Entry<String, Thread> entry : workers.entrySet()) {
entry.getValue().start();
}
}
/** * 获取完成任务占比 * @return */
public float getFinishRatio() {
return 100.0f-100.0f * ((float)jobQueue.size()) / oriJobNum.get();
}
}
Worker类定义
package com.linesum.linjx.test.masterWorker;
import java.util.Queue;
/** * Worker * @author linjx * @Date 2016年9月22日 */
public abstract class Worker implements Runnable {
/** * id */
private int id;
/** * 任务队列 */
protected Queue<Object> jobs;
/** * 子任务结果集 */
protected Queue<Object> results;
public abstract Object handle(Object input);
@Override
public void run() {
System.out.println(String.format("Worker:[%d] start working...", this.id));
while(true){
Object input = jobs.poll(); //贪婪式获取任务
if(null == input) break;
//处理子任务,并传递给结果集
putResult(handle(input));
}
System.out.println(String.format("Worker:[%d] finish the jobs...", this.id));
}
public void setJobQueue(Queue<Object> jobQueue) {
this.jobs = jobQueue;
}
public void setResults(Queue<Object> results) {
this.results = results;
}
private void putResult(Object result) {
results.add(result);
}
public void setId(int id) {
this.id = id;
}
}
上述定义,与网友定义有些出入,做了些调整和丰富。这里,做个Worker用例,使用Master-Worker实现
PlusWorker定义
package com.linesum.linjx.test.job;
import com.linesum.linjx.test.masterWorker.Worker;
/** * 求立方和 * @author linjx * @Date 2016年9月22日 */
public class PlusWorker extends Worker {
@Override
public Object handle(Object input) {
long i = (Long) input;
return i * i * i;
}
}
Client实现
package com.linesum.linjx.test.job;
import java.util.Queue;
import com.linesum.linjx.test.masterWorker.Master;
/** * Client * @author linjx * @Date 2016年9月22日 */
public class Client {
private static int jobNum = 500000;
public static void main(String args[]) throws InterruptedException {
testMasterWorker();
System.out.println("-----------------");
testSington();
}
static void testMasterWorker() throws InterruptedException {
//固定使用4个Workder
Master master = null;
try {
master = new Master(4, PlusWorker.class);
} catch (InstantiationException | IllegalAccessException e) {
e.printStackTrace();
System.exit(1);
}
for(long i = 1; i <= jobNum; i ++) //提交100个子任务
master.submit(i);
master.execute(); //开始计算
long re = 0;
while(true) {
if(master.isComplete()) {//等待计算结束
Queue<Object> results = master.getResults();
for (Object result : results) {
re = re + (long) result;
}
System.out.println(master.getFinishRatio() + "%");
break;
} else {
System.out.println(master.getFinishRatio() + "%");
Thread.sleep(1);
}
}
System.out.println(re);
}
static void testSington() {
long result = 0;
for (long i = 1; i <= jobNum; i ++) {
result = result + (i*i*i);
}
System.out.println(result);
}
}
这个是模拟网友实现的,各位有没有觉得有点怪异,最后的累加居然是由Client来做的,而不是Master来完成的,明显不合理啊。和MapReduce做比较,我们会发现,PlusWorker只是实现了基本的Map功能,我们并没有考虑聚合Reduce的实现。好的,我没有打算再拓展这个用例,就用了一个MapReduce的经典Demo来重新实现我的Master-Worker,具体如下:
2. 词频计数demo(可以更合理的将Map和Reduce拆分成两个worker)
词频计算是一个非常常见的用例,这里就不多做赘述。具体实现如下
新增Worker构造方法
/** * @param workerNum,worker数量上限 * @param workerClass, worker实现类 * @Param jobQueue, 任务队列 * @Param resultQueue, 结果集队列 * @throws IllegalAccessException * @throws InstantiationException */
public Master(int workerNum, Class<? extends Worker> workerClass,
Queue<Object> jobQueue, Queue<Object> resultQueue)
throws InstantiationException, IllegalAccessException {
for (int i = 0; i < workerNum; i ++) {
Worker worker = workerClass.newInstance();
worker.setJobQueue(jobQueue);
worker.setResults(resultQueue);
worker.setId(i);
workers.put(Integer.toString(i),
new Thread(worker, Integer.toString(i)));
}
}
改造Master
/** * */
package com.linesum.linjx.test.masterWorker;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
/** * Master定义 * @author linjx * @Date 2016年9月22日 */
public class Master {
/** * 任务列表 */
private Queue<Object> jobQueue = new ConcurrentLinkedQueue<Object>();
/** * Worker组 */
private Map<String, Thread> workers = new HashMap<>();
/** * 结果集 */
private Queue<Object> results = new ConcurrentLinkedQueue<Object>();
/** * 原始任务数 */
private AtomicLong oriJobNum = new AtomicLong(0);
public Master() {}
/** * @param workerNum,worker数量上限 * @param workerClass, worker实现类 * @throws IllegalAccessException * @throws InstantiationException */
public Master(int workerNum, Class<? extends Worker> workerClass)
throws InstantiationException, IllegalAccessException {
for (int i = 0; i < workerNum; i ++) {
Worker worker = workerClass.newInstance();
worker.setJobQueue(jobQueue);
worker.setResults(results);
worker.setId(i);
workers.put(Integer.toString(i),
new Thread(worker, Integer.toString(i)));
}
}
/** * @param workerNum,worker数量上限 * @param workerClass, worker实现类 * @Param jobQueue, 任务队列 * @Param resultQueue, 结果集队列 * @throws IllegalAccessException * @throws InstantiationException */
public Master(int workerNum, Class<? extends Worker> workerClass,
Queue<Object> jobQueue, Queue<Object> resultQueue)
throws InstantiationException, IllegalAccessException {
for (int i = 0; i < workerNum; i ++) {
Worker worker = workerClass.newInstance();
worker.setJobQueue(jobQueue);
worker.setResults(resultQueue);
worker.setId(i);
workers.put(Integer.toString(i),
new Thread(worker, Integer.toString(i)));
}
}
/** * 任务是否已经完成 * @return */
public boolean isComplete() {
for (Map.Entry<String, Thread> worker : workers.entrySet()) {
if (Thread.State.TERMINATED != worker.getValue().getState()) {
return false;
}
}
return true;
}
/** * 任务是否已经完成 * @return */
public boolean isComplete(Map<String, Thread> workers) {
for (Map.Entry<String, Thread> worker : workers.entrySet()) {
if (Thread.State.TERMINATED != worker.getValue().getState()) {
return false;
}
}
return true;
}
/** * 由Client提交一个子任务 * @param job */
public void submit(Object job) {
oriJobNum.incrementAndGet(); //允许动态增加任务
jobQueue.add(job);
}
public Queue<Object> getJobs() {
return jobQueue;
}
/** * 返回子任务结果集 * @return */
public Queue<Object> getResults() {
return results;
}
/** * 将运行结果放到结果集 * @param result */
public void putResult(Object result) {
results.add(result);
}
/** * 启动worker,进行任务处理 */
public void execute() {
for(Map.Entry<String, Thread> entry : this.workers.entrySet()) {
entry.getValue().start();
}
}
/** * 启动特定的worker,进行任务处理 * @param workers */
public void execute(Map<String, Thread> workers) {
for(Map.Entry<String, Thread> entry : workers.entrySet()) {
entry.getValue().start();
}
}
/** * 获取完成任务占比 * @return */
public float getFinishRatio() {
return 100.0f-100.0f * ((float)jobQueue.size()) / oriJobNum.get();
}
}
定义WordCountMapper
/** * */
package com.linesum.linjx.test.job;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import com.linesum.linjx.test.masterWorker.Worker;
/** * * @author linjx * @Date 2016年9月22日 */
public class WordCountMapper extends Worker {
@Override
public Object handle(Object input) {
Map<String, Integer> wordCount = new HashMap<>();
if (null != input && StringUtils.isNotBlank(String.valueOf(input))) {
String[] words = StringUtils.split(String.valueOf(input), " "); //分词
for (String w : words) {
if (StringUtils.isBlank(w)) {
continue;
}
wordCount.put(w, wordCount.getOrDefault(w, 0) + 1);
}
}
return wordCount;
}
}
定义WordCountReducer
/** * */
package com.linesum.linjx.test.job;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import org.apache.commons.collections4.MapUtils;
import com.linesum.linjx.test.masterWorker.Worker;
/** * WordCount Reducer * @author linjx * @Date 2016年9月27日 */
public class WordCountReducer extends Worker {
@Override
public void run() {
System.out.println(String.format("Worker:start working..."));
putResult(handle(jobs));
System.out.println(String.format("Worker:finish the jobs..."));
}
@Override
public Object handle(Object input) {
Map<String, Integer> resultMap = new HashMap<>();
for (Object re : (Queue<Object>) input) {
if (MapUtils.isEmpty((Map<?, ?>) re)) {
continue;
}
@SuppressWarnings("unchecked")
Map<String, Integer> reMap = (Map<String, Integer>) re;
for (Entry<String, Integer> entry : reMap.entrySet()) {
resultMap.put(entry.getKey(), resultMap.getOrDefault(entry.getKey(), 0) + entry.getValue());
}
}
return resultMap;
}
}
定义Job并调度
/** * */
package com.linesum.linjx.test.job;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
/** * * @author linjx * @Date 2016年9月22日 */
public class WordCountJob {
public static void main(String args[]) throws InstantiationException, IllegalAccessException, InterruptedException {
Queue<Object> result = new LinkedBlockingQueue<>();
WordCountMaster master = new WordCountMaster();
master.setMapWorkers(5, WordCountMapper.class, master.getJobs(), master.getResults());
master.setReduceWorkers(1, WordCountReducer.class, master.getResults(), result);
master.submit("hello world1");
master.submit("hello world2");
master.submit("hello world2");
master.submit("hello world3");
master.submit("hello world3");
master.submit("hello world3");
master.submit("hello world4");
master.submit("hello world4");
master.submit("hello world4");
master.submit("hello world4");
master.submit("hello world5");
master.submit("hello world5");
master.submit("hello world5");
master.submit("hello world5");
master.submit("hello world5");
master.map();
while(true) {
//这里是等待结束后才开始执行reduce,改变reduce的统计方式,可以提前执行reduce
if (master.isComplete(master.mapWorkers)) {
System.out.println(master.getFinishRatio() + "%");
master.reduce();
while(true) {
if (master.isComplete(master.reduceWorkers)) {
System.out.println(result);
break;
} else {
Thread.sleep(5);
}
}
break;
} else {
System.out.println(master.getFinishRatio() + "%");
Thread.sleep(5);
}
}
}
}
执行结果如下:
0.0%
Worker:[3] start working...
Worker:[4] start working...
Worker:[2] start working...
Worker:[0] start working...
Worker:[1] start working...
33.333336%
Worker:[0] finish the jobs...
Worker:[3] finish the jobs...
Worker:[4] finish the jobs...
Worker:[2] finish the jobs...
Worker:[1] finish the jobs...
100.0%
Worker:start working...
Worker:finish the jobs...
[{world4=4, world5=5, world2=2, world3=3, hello=15, world1=1}]
这个构造方法是为了满足指定任务集和结果集,即满足更加灵活的任务和结果指定,意味着jobQueue和resultQueue是可以拆分的,比如可以使用File、HDFS、Cache等实现机制。
WorkCountMaster
思考
这里的实现还是不够彻底,Map和Reduce集成的类不应该是同一个Worker;Map和Reduce的输入输出没有合理的定义;计算内容存储没有和Master解耦等。这些就留给各位再做优化,这里做个引子而已。
总结
一个完成的分布式Master-Worker实现是Demo用例没有体现的。高可用的分布式Master-Worker要考虑任务的分配,方式数据倾斜带来的单点等待;考虑单点故障重试和任务重新分配策略;考虑网络传输损耗,避免过多的数据传输和交换;考虑Worker Failure处理机制;考虑Master Failure重试机制;并且还应该考虑更多的可拓展性,比如分区算法、快速排序算法等等问题。这些问题我们将在后续的章节展开讨论。