Master-Worker设计模式

时间:2022-10-08 03:29:37

@(分布式计算集群)

Master-Worker设计模式在生活中非常地常见,比如:督工和工人,督工给每个工人分配任务,也监督每个工人的工作情况。一般还有以下场景:如果有个工人身体不好,就重新将他的任务分配给其他人;现实中一般督工就一个,督工自己也有可能身体不好,然后呢?就没有然后了——也有可能由一个工人暂时顶替,或者再从其他小组挪一个督工,或者由一直陪同的小秘顶上。额外补充的场景是为了能够应对不正常事件,保证任务能够准时完成。同样的,程序设计中,也有这样的场景存在,尤其是任务量很大的时候。那我们是怎样完成这样的场景抽象,又是如何保证异常问题下的正常任务执行的呢?Hadoop中的Master-Worker设计模式又是怎么指导MapReduce工作流程的?

Master-Worker设计模式介绍

Master-Worker模式是常用的并行设计模式。核心思想是,系统由两个角色组成,Master和Worker,Master负责接收和分配任务,Worker负责处理子任务。任务处理过程中,Master还负责监督任务进展和Worker的健康状态;Master将接收Client提交的任务,并将任务的进展汇总反馈给Client。各角色关系如下图
Master-Worker设计模式

Master-Worker模式满足于可以将大任务划分为小任务的场景,是一种分而治之的设计理念。通过多线程或者多进程多机器的模式,可以将小任务处理分发给更多的CPU处理,降低单个CPU的计算量,通过并发/并行提高任务的完成速度,提高系统的性能。
Master-Worker设计模式
具体细节如上图,Master对任务进行切分,并放入任务队列;然后,触发Worker处理任务。实际操作中,任务的分配有多种形式,如Master主动拉起Workder进程池或线程池,并将任务分配给Worker;或者由Worker主动领取任务,这样的Worker一般是常驻进程;还有一种解耦的方式,即Master指做任务的接收、切分和结果统计,指定Worker的数量和性能指标,但不参与Worker的实际管理,而是交由第三方调度监控和调度Worker。

用例说明

1. 简单demo(来自网友编写,做了些细节调整)

Master类定义

package com.linesum.linjx.test.masterWorker;

import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;

/** * Master定义 * @author linjx * @Date 2016年9月22日 */
public class Master {

    /** * 任务列表(这边作为Master成员变量,可以拆分) */
    private Queue<Object> jobQueue = new ConcurrentLinkedQueue<Object>();

    /** * Worker组 */
    private Map<String, Thread> workers = new HashMap<>(); 

    /** * 结果集(这边作为Master成员变量,可以拆分) */
    private Queue<Object> results = new ConcurrentLinkedQueue<Object>();

    /** * 原始任务数 */
    private AtomicLong oriJobNum = new AtomicLong(0);

    /** * @param workerNum,worker数量上限 * @param workerClass, worker实现类 * @throws IllegalAccessException * @throws InstantiationException */
    public Master(int workerNum, Class<? extends Worker> workerClass) 
            throws InstantiationException, IllegalAccessException {
        for (int i = 0; i < workerNum; i ++) {
            Worker worker = workerClass.newInstance();
            worker.setJobQueue(jobQueue);
            worker.setResults(results);
            worker.setId(i);
            workers.put(Integer.toString(i), 
                    new Thread(worker, Integer.toString(i)));
        }
    }

    /** * 任务是否已经完成 * @return */
    public boolean isComplete() {
        for (Map.Entry<String, Thread> worker : workers.entrySet()) {
            if (Thread.State.TERMINATED != worker.getValue().getState()) {
                return false;
            }
        }
        return true;
    }

    /** * 由Client提交一个子任务 * @param job */
    public void submit(Object job) {  
        oriJobNum.incrementAndGet(); //允许动态增加任务
        jobQueue.add(job);  
    }  

    /** * 返回子任务结果集 * @return */
    public Queue<Object> getResults() {  
        return results;  
    }  

    /** * 将运行结果放到结果集 * @param result */
    public void putResult(Object result) {
        results.add(result);
    }

    /** * 启动worker,进行任务处理 */
    public void execute() {  
        for(Map.Entry<String, Thread> entry : workers.entrySet()) {  
            entry.getValue().start();  
        }  
    }  

    /** * 获取完成任务占比 * @return */
    public float getFinishRatio() {
        return 100.0f-100.0f * ((float)jobQueue.size()) / oriJobNum.get();
    }

}

Worker类定义

package com.linesum.linjx.test.masterWorker;

import java.util.Queue;

/** * Worker * @author linjx * @Date 2016年9月22日 */
public abstract class Worker implements Runnable {

    /** * id */
    private int id;

    /** * 任务队列 */
    protected Queue<Object> jobs;

    /** * 子任务结果集 */
    protected Queue<Object> results;

    public abstract Object handle(Object input);

    @Override
    public void run() {
        System.out.println(String.format("Worker:[%d] start working...", this.id));
        while(true){
            Object input = jobs.poll(); //贪婪式获取任务

            if(null == input) break;
            //处理子任务,并传递给结果集
            putResult(handle(input));
        }
        System.out.println(String.format("Worker:[%d] finish the jobs...", this.id));
    }

    public void setJobQueue(Queue<Object> jobQueue) {
        this.jobs = jobQueue;
    }

    public void setResults(Queue<Object> results) {
        this.results = results;
    }

    private void putResult(Object result) {
        results.add(result);
    }

    public void setId(int id) {
        this.id = id;
    }

}

上述定义,与网友定义有些出入,做了些调整和丰富。这里,做个Worker用例,使用Master-Worker实现 Ni=0n3i 的计算。OK,分而治之的理念,即每个worker获取任务 ni ,然后计算 n3i ,并将结果放入结果集。

PlusWorker定义

package com.linesum.linjx.test.job;

import com.linesum.linjx.test.masterWorker.Worker;

/** * 求立方和 * @author linjx * @Date 2016年9月22日 */
public class PlusWorker extends Worker {

    @Override  
    public Object handle(Object input) {  
        long i = (Long) input;  
        return i * i * i;  
    }  
}  

Client实现

package com.linesum.linjx.test.job;

import java.util.Queue;

import com.linesum.linjx.test.masterWorker.Master;

/** * Client * @author linjx * @Date 2016年9月22日 */
public class Client { 

    private static int jobNum = 500000;

    public static void main(String args[]) throws InterruptedException {
        testMasterWorker();
        System.out.println("-----------------");
        testSington();
    }

    static void testMasterWorker() throws InterruptedException {  
        //固定使用4个Workder 
        Master master = null;
        try {
            master = new Master(4, PlusWorker.class);
        } catch (InstantiationException | IllegalAccessException e) {
            e.printStackTrace();
            System.exit(1);
        }  

        for(long i = 1; i <= jobNum; i ++) //提交100个子任务 
            master.submit(i);  
        master.execute(); //开始计算 

        long re = 0;
        while(true) {  
            if(master.isComplete()) {//等待计算结束
                Queue<Object> results = master.getResults();  
                for (Object result : results) {
                     re = re + (long) result;
                }
                System.out.println(master.getFinishRatio() + "%");
                break;
            } else {
                System.out.println(master.getFinishRatio() + "%");
                Thread.sleep(1);
            }
        }

        System.out.println(re);  
    }  

    static void testSington() {
        long result = 0;
        for (long i = 1; i <= jobNum; i ++) {
            result = result + (i*i*i);
        }
        System.out.println(result);
    }
}  

这个是模拟网友实现的,各位有没有觉得有点怪异,最后的累加居然是由Client来做的,而不是Master来完成的,明显不合理啊。和MapReduce做比较,我们会发现,PlusWorker只是实现了基本的Map功能,我们并没有考虑聚合Reduce的实现。好的,我没有打算再拓展这个用例,就用了一个MapReduce的经典Demo来重新实现我的Master-Worker,具体如下:

2. 词频计数demo(可以更合理的将Map和Reduce拆分成两个worker)

词频计算是一个非常常见的用例,这里就不多做赘述。具体实现如下

新增Worker构造方法

/** * @param workerNum,worker数量上限 * @param workerClass, worker实现类 * @Param jobQueue, 任务队列 * @Param resultQueue, 结果集队列 * @throws IllegalAccessException * @throws InstantiationException */
public Master(int workerNum, Class<? extends Worker> workerClass, 
        Queue<Object> jobQueue, Queue<Object> resultQueue) 
        throws InstantiationException, IllegalAccessException {
    for (int i = 0; i < workerNum; i ++) {
        Worker worker = workerClass.newInstance();
        worker.setJobQueue(jobQueue);
        worker.setResults(resultQueue);
        worker.setId(i);
        workers.put(Integer.toString(i), 
                new Thread(worker, Integer.toString(i)));
    }
}

改造Master

/** * */
package com.linesum.linjx.test.masterWorker;

import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;

/** * Master定义 * @author linjx * @Date 2016年9月22日 */
public class Master {

    /** * 任务列表 */
    private Queue<Object> jobQueue = new ConcurrentLinkedQueue<Object>();

    /** * Worker组 */
    private Map<String, Thread> workers = new HashMap<>(); 

    /** * 结果集 */
    private Queue<Object> results = new ConcurrentLinkedQueue<Object>();

    /** * 原始任务数 */
    private AtomicLong oriJobNum = new AtomicLong(0);

    public Master() {}

    /** * @param workerNum,worker数量上限 * @param workerClass, worker实现类 * @throws IllegalAccessException * @throws InstantiationException */
    public Master(int workerNum, Class<? extends Worker> workerClass) 
            throws InstantiationException, IllegalAccessException {
        for (int i = 0; i < workerNum; i ++) {
            Worker worker = workerClass.newInstance();
            worker.setJobQueue(jobQueue);
            worker.setResults(results);
            worker.setId(i);
            workers.put(Integer.toString(i), 
                    new Thread(worker, Integer.toString(i)));
        }
    }

    /** * @param workerNum,worker数量上限 * @param workerClass, worker实现类 * @Param jobQueue, 任务队列 * @Param resultQueue, 结果集队列 * @throws IllegalAccessException * @throws InstantiationException */
    public Master(int workerNum, Class<? extends Worker> workerClass, 
            Queue<Object> jobQueue, Queue<Object> resultQueue) 
            throws InstantiationException, IllegalAccessException {
        for (int i = 0; i < workerNum; i ++) {
            Worker worker = workerClass.newInstance();
            worker.setJobQueue(jobQueue);
            worker.setResults(resultQueue);
            worker.setId(i);
            workers.put(Integer.toString(i), 
                    new Thread(worker, Integer.toString(i)));
        }
    }

    /** * 任务是否已经完成 * @return */
    public boolean isComplete() {
        for (Map.Entry<String, Thread> worker : workers.entrySet()) {
            if (Thread.State.TERMINATED != worker.getValue().getState()) {
                return false;
            }
        }
        return true;
    }

    /** * 任务是否已经完成 * @return */
    public boolean isComplete(Map<String, Thread> workers) {
        for (Map.Entry<String, Thread> worker : workers.entrySet()) {
            if (Thread.State.TERMINATED != worker.getValue().getState()) {
                return false;
            }
        }
        return true;
    }

    /** * 由Client提交一个子任务 * @param job */
    public void submit(Object job) {  
        oriJobNum.incrementAndGet(); //允许动态增加任务
        jobQueue.add(job);  
    }  

    public Queue<Object> getJobs() {
        return jobQueue;
    }

    /** * 返回子任务结果集 * @return */
    public Queue<Object> getResults() {  
        return results;  
    }  

    /** * 将运行结果放到结果集 * @param result */
    public void putResult(Object result) {
        results.add(result);
    }

    /** * 启动worker,进行任务处理 */
    public void execute() {  
        for(Map.Entry<String, Thread> entry : this.workers.entrySet()) {  
            entry.getValue().start();  
        }  
    }  

    /** * 启动特定的worker,进行任务处理 * @param workers */
    public void execute(Map<String, Thread> workers) {
        for(Map.Entry<String, Thread> entry : workers.entrySet()) {  
            entry.getValue().start();  
        }
    }

    /** * 获取完成任务占比 * @return */
    public float getFinishRatio() {
        return 100.0f-100.0f * ((float)jobQueue.size()) / oriJobNum.get();
    }

}

定义WordCountMapper

/** * */
package com.linesum.linjx.test.job;

import java.util.HashMap;
import java.util.Map;

import org.apache.commons.lang.StringUtils;

import com.linesum.linjx.test.masterWorker.Worker;

/** * * @author linjx * @Date 2016年9月22日 */
public class WordCountMapper extends Worker {

    @Override  
    public Object handle(Object input) {
        Map<String, Integer> wordCount = new HashMap<>();
        if (null != input && StringUtils.isNotBlank(String.valueOf(input))) {
            String[] words = StringUtils.split(String.valueOf(input), " "); //分词
            for (String w : words) {
                if (StringUtils.isBlank(w)) {
                    continue;
                }
                wordCount.put(w, wordCount.getOrDefault(w, 0) + 1);
            }
        }
        return wordCount;
    }

}

定义WordCountReducer

/** * */
package com.linesum.linjx.test.job;

import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;

import org.apache.commons.collections4.MapUtils;

import com.linesum.linjx.test.masterWorker.Worker;

/** * WordCount Reducer * @author linjx * @Date 2016年9月27日 */
public class WordCountReducer extends Worker {

    @Override
    public void run() {
        System.out.println(String.format("Worker:start working..."));
        putResult(handle(jobs));
        System.out.println(String.format("Worker:finish the jobs..."));
    }

    @Override
    public Object handle(Object input) {
        Map<String, Integer> resultMap = new HashMap<>();
        for (Object re : (Queue<Object>) input) {
            if (MapUtils.isEmpty((Map<?, ?>) re)) {
                continue;
            }
            @SuppressWarnings("unchecked")
            Map<String, Integer> reMap = (Map<String, Integer>) re;
            for (Entry<String, Integer> entry : reMap.entrySet()) {
                resultMap.put(entry.getKey(), resultMap.getOrDefault(entry.getKey(), 0) + entry.getValue());
            }
        }
        return resultMap;
    }

}

定义Job并调度

/** * */
package com.linesum.linjx.test.job;

import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;

/** * * @author linjx * @Date 2016年9月22日 */
public class WordCountJob {

    public static void main(String args[]) throws InstantiationException, IllegalAccessException, InterruptedException {

        Queue<Object> result = new LinkedBlockingQueue<>();

        WordCountMaster master =  new WordCountMaster();
        master.setMapWorkers(5, WordCountMapper.class, master.getJobs(), master.getResults());
        master.setReduceWorkers(1, WordCountReducer.class, master.getResults(), result);

        master.submit("hello world1");
        master.submit("hello world2");
        master.submit("hello world2");
        master.submit("hello world3");
        master.submit("hello world3");
        master.submit("hello world3");
        master.submit("hello world4");
        master.submit("hello world4");
        master.submit("hello world4");
        master.submit("hello world4");
        master.submit("hello world5");
        master.submit("hello world5");
        master.submit("hello world5");
        master.submit("hello world5");
        master.submit("hello world5");

        master.map();
        while(true) {
            //这里是等待结束后才开始执行reduce,改变reduce的统计方式,可以提前执行reduce
            if (master.isComplete(master.mapWorkers)) {
                System.out.println(master.getFinishRatio() + "%");
                master.reduce();
                while(true) {
                    if (master.isComplete(master.reduceWorkers)) {
                        System.out.println(result);
                        break;
                    } else {
                        Thread.sleep(5);
                    }
                }
                break;
            } else {
                System.out.println(master.getFinishRatio() + "%");
                Thread.sleep(5);
            }
        }

    }

}

执行结果如下:

0.0%
Worker:[3] start working...
Worker:[4] start working...
Worker:[2] start working...
Worker:[0] start working...
Worker:[1] start working...
33.333336%
Worker:[0] finish the jobs...
Worker:[3] finish the jobs...
Worker:[4] finish the jobs...
Worker:[2] finish the jobs...
Worker:[1] finish the jobs...
100.0%
Worker:start working...
Worker:finish the jobs...
[{world4=4, world5=5, world2=2, world3=3, hello=15, world1=1}]

这个构造方法是为了满足指定任务集和结果集,即满足更加灵活的任务和结果指定,意味着jobQueue和resultQueue是可以拆分的,比如可以使用File、HDFS、Cache等实现机制。

WorkCountMaster

思考

这里的实现还是不够彻底,Map和Reduce集成的类不应该是同一个Worker;Map和Reduce的输入输出没有合理的定义;计算内容存储没有和Master解耦等。这些就留给各位再做优化,这里做个引子而已。

总结

一个完成的分布式Master-Worker实现是Demo用例没有体现的。高可用的分布式Master-Worker要考虑任务的分配,方式数据倾斜带来的单点等待;考虑单点故障重试和任务重新分配策略;考虑网络传输损耗,避免过多的数据传输和交换;考虑Worker Failure处理机制;考虑Master Failure重试机制;并且还应该考虑更多的可拓展性,比如分区算法、快速排序算法等等问题。这些问题我们将在后续的章节展开讨论。