Tiny并行计算框架之复杂演示样例

时间:2021-03-27 07:15:21

问题来源  很感谢@doctorwho的问题:

假如职业介绍所来了一批生产汽车的工作,如果生产一辆汽车任务是这种:搭好底盘、拧4个轮胎、安装发动机、安装4个座椅、再装4个车门、最后安装顶棚。

之间有的任务是能够并行计算的(比方拧4个轮胎,安装发动机和安装座椅),有的任务有前置任务(比方先装好座椅。才干装车门和顶棚)。让两组包工头组织两种类型的工作:将工人分成两种类型,就可以并行计算的放在同一组内,由职业介绍所来控制A组包工头做完的任务交给B组包工头。中间环节的半成品保存到Warehouse中。是这样使用TINY框架来生产汽车么?


接下来。我就用Tiny并行计算框架来展示一下这个演示样例,在编写演示样例的时候,发现了一个BUG。这也充分体现了开源的精神与价值。再次感谢@doctorwho。


问题分析


doctorwho的问题还是比較复杂的。可是实际上道理是一样的。因此我把问题简化成以下的过程


第一步:构建底盘


第二步:并行进行安装引擎。座位和轮胎



第三步:并行进行安装门及车顶


因为我和doctorwho都不是造车行家,因此就不用纠结这么造是不是合理了,如果这么做就是合理的。

代码实现


按我前面说的过程。工人是必需要有的,因此我们首先构建工人:


第一步的底盘构建工人

1

2

3

4

5

6

7

8

9

10

11

12

13
public class StepFirstWorker extends AbstractWorker {

    public StepFirstWorker() throws RemoteException {

        super("first");

    }



    @Override

    protected Warehouse doWork(Work work) throws RemoteException {

        System.out.println(String.format("%s 构建底盘完毕.", work.getInputWarehouse().get("carType")));

        Warehouse outputWarehouse = work.getInputWarehouse();

        outputWarehouse.put("baseInfo", "something about baseInfo");

        return outputWarehouse;

    }

}

因为第二步工人有好几个类型。因此再搞个第二步抽象工人:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19
public abstract class StepThirdWorker extends AbstractWorker {

    public StepThirdWorker() throws RemoteException {

        super("third");

    }





    protected boolean acceptMyWork(Work work) {

        String workClass = work.getInputWarehouse().get("class");

        if (workClass != null) {

            return true;

        }

        return false;

    }

    protected Warehouse doMyWork(Work work) throws RemoteException {

        System.out.println(String.format("Base:%s ", work.getInputWarehouse().get("baseInfo")));

        System.out.println(String.format("%s is Ok", work.getInputWarehouse().get("class")));

        return work.getInputWarehouse();

    }

}

接下来构建第二步的引擎工人:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20
public class StepSecondEngineWorker extends StepSecondWorker {





public static final String ENGINE = "engine";





public StepSecondEngineWorker() throws RemoteException {

super();

}





public boolean acceptWork(Work work) {

return acceptMyWork(work);

}





protected Warehouse doWork(Work work) throws RemoteException {

return super.doMyWork(work);

}

}

第二步的座位工人:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21
public class StepSecondSeatWorker extends StepSecondWorker {





    public static final String SEAT = "seat";





    public StepSecondSeatWorker() throws RemoteException {

        super();

    }

    public boolean acceptWork(Work work) {

       return acceptMyWork(work);

    }

    protected Warehouse doWork(Work work) throws RemoteException {

        return super.doMyWork(work);

    }

}



<div>





</div>

第二步的轮胎工人:



?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18
public class StepSecondTyreWorker extends StepSecondWorker {

public static final String TYRE = "tyre";





public StepSecondTyreWorker() throws RemoteException {

super();

}





public boolean acceptWork(Work work) {

return acceptMyWork(work);

}





protected Warehouse doWork(Work work) throws RemoteException {

return super.doMyWork(work);

}

}

同理,第三步也是大同小异的。

第三步的抽象工人类:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19
public abstract class StepThirdWorker extends AbstractWorker {

    public StepThirdWorker() throws RemoteException {

        super("third");

    }





    protected boolean acceptMyWork(Work work) {

        String workClass = work.getInputWarehouse().get("class");

        if (workClass != null) {

            return true;

        }

        return false;

    }

    protected Warehouse doMyWork(Work work) throws RemoteException {

        System.out.println(String.format("Base:%s ", work.getInputWarehouse().get("baseInfo")));

        System.out.println(String.format("%s is Ok", work.getInputWarehouse().get("class")));

        return work.getInputWarehouse();

    }

}

第三步的车门工人:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17
public class StepThirdDoorWorker extends StepThirdWorker {





    public static final String DOOR = "door";





    public StepThirdDoorWorker() throws RemoteException {

        super();

    }

    public boolean acceptWork(Work work) {

        return acceptMyWork(work);

    }

    @Override

    protected Warehouse doWork(Work work) throws RemoteException {

        return super.doMyWork(work);

    }

}

第三步的车顶工人:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16
public class StepThirdRoofWorker extends StepThirdWorker {





    public static final String ROOF = "roof";





    public StepThirdRoofWorker() throws RemoteException {

        super();

    }

    public boolean acceptWork(Work work) {

        return acceptMyWork(work);

    }

    protected Warehouse doWork(Work work) throws RemoteException {

        return super.doMyWork(work);

    }

}

以上就把工人都构建好了,我们前面也说过。假设要进行任务分解,是必需要构建任务分解合并器的,这里简单起见。仅仅实现任务分解了。

第二部的任务分解:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22
public class SecondWorkSplitter implements WorkSplitter {

    public List<Warehouse> split(Work work, List<Worker> workers) throws RemoteException {

        List<Warehouse> list = new ArrayList<Warehouse>();

        list.add(getWareHouse(work.getInputWarehouse(), "engine"));

        list.add(getWareHouse(work.getInputWarehouse(), "seat"));

        list.add(getWareHouse(work.getInputWarehouse(), "seat"));

        list.add(getWareHouse(work.getInputWarehouse(), "seat"));

        list.add(getWareHouse(work.getInputWarehouse(), "seat"));

        list.add(getWareHouse(work.getInputWarehouse(), "tyre"));

        list.add(getWareHouse(work.getInputWarehouse(), "tyre"));

        list.add(getWareHouse(work.getInputWarehouse(), "tyre"));

        list.add(getWareHouse(work.getInputWarehouse(), "tyre"));

        return list;

    }



    private Warehouse getWareHouse(Warehouse inputWarehouse, String stepClass) {

        Warehouse warehouse = new WarehouseDefault();

        warehouse.put("class", stepClass);

        warehouse.putSubWarehouse(inputWarehouse);

        return warehouse;

    }

}

从上面能够看到,构建了一个引擎的仓库。4个座位仓库,4个轮胎仓库。呵呵,既然能并行。为啥不让他做得更快些?


接下来是第三步的任务分解器:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18
public class ThirdWorkSplitter implements WorkSplitter {

    public List<Warehouse> split(Work work, List<Worker> workers) throws RemoteException {

        List<Warehouse> list = new ArrayList<Warehouse>();

        list.add(getWareHouse(work.getInputWarehouse(), "door"));

        list.add(getWareHouse(work.getInputWarehouse(), "door"));

        list.add(getWareHouse(work.getInputWarehouse(), "door"));

        list.add(getWareHouse(work.getInputWarehouse(), "door"));

        list.add(getWareHouse(work.getInputWarehouse(), "roof"));

        return list;

    }



    private Warehouse getWareHouse(Warehouse inputWarehouse, String stepClass) {

        Warehouse warehouse = new WarehouseDefault();

        warehouse.put("class", stepClass);

        warehouse.putSubWarehouse(inputWarehouse);

        return warehouse;

    }

}

从上面能够看到,第三部构建了4个门仓库一个车顶仓库,相同的,能够让4个工人同一时候装门。

上面就把全部的准备工作都做好了,接下来就是測试方法了:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49
public class Test {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        JobCenter jobCenter = new JobCenterLocal();





        for (int i = 0; i < 5; i++) {

            jobCenter.registerWorker(new StepFirstWorker());

        }

        for (int i = 0; i < 5; i++) {

            jobCenter.registerWorker(new StepSecondTyreWorker());

        }

        for (int i = 0; i < 5; i++) {

            jobCenter.registerWorker(new StepSecondSeatWorker());

        }

        for (int i = 0; i < 5; i++) {

            jobCenter.registerWorker(new StepSecondEngineWorker());

        }

        for (int i = 0; i < 5; i++) {

            jobCenter.registerWorker(new StepThirdDoorWorker());

        }

        for (int i = 0; i < 5; i++) {

            jobCenter.registerWorker(new StepThirdRoofWorker());

        }



        jobCenter.registerForeman(new ForemanSelectOneWorker("first"));

     <span></span> <span></span>jobCenter.registerForeman(new ForemanSelectAllWorker("second",

new SecondWorkSplitter()));

jobCenter.registerForeman(new ForemanSelectAllWorker("third",new ThirdWorkSplitter()));





Warehouse inputWarehouse = new WarehouseDefault();

inputWarehouse.put("class", "car");

inputWarehouse.put("carType", "普桑");

WorkDefault work = new WorkDefault("first", inputWarehouse);

work.setForemanType("first");

WorkDefault work2 = new WorkDefault("second");

work2.setForemanType("second");

WorkDefault work3 = new WorkDefault("third");

work3.setForemanType("third");

work.setNextWork(work2).setNextWork(work3);





Warehouse warehouse = jobCenter.doWork(work);





jobCenter.stop();



    }

}

呵呵,工人各加了5个。然后注冊了三个工头,第一步的工头是随便挑一个工人类型的。第二步和第三步是挑全部工人的。同一时候还指定了任务分解器。


接下来就构建了一个工作。造一个高端大气上档次的普桑汽车。然后告诉职业介绍所说给我造就能够了。


以下是造车的过程,我把日志也贴上来了:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61
普桑 构建底盘完毕.

-234  [RMI TCP Connection(1)-192.168.84.73] INFO   - 线程组<id:4af96b81d14a4954a6b649308d444e4c,type:second>执行開始,线程数9...

-234  [id:4af96b81d14a4954a6b649308d444e4c,type:second-a763f156ffd74b5db285198d2498edcf] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-a763f156ffd74b5db285198d2498edcf>执行開始...

-234  [id:4af96b81d14a4954a6b649308d444e4c,type:second-c2d2fb38ef6c4509b3a39b3e7d5c1d61] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-c2d2fb38ef6c4509b3a39b3e7d5c1d61>执行開始...

-234  [id:4af96b81d14a4954a6b649308d444e4c,type:second-d624ea0a6df3409c80df6b97ab3c813b] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-d624ea0a6df3409c80df6b97ab3c813b>执行開始...

-235  [id:4af96b81d14a4954a6b649308d444e4c,type:second-abdb57f0641a4727a9efa744d07cf2d1] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-abdb57f0641a4727a9efa744d07cf2d1>执行開始...

-236  [id:4af96b81d14a4954a6b649308d444e4c,type:second-d6f7074f6c4a4b12bd37ec5f5c11aff8] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-d6f7074f6c4a4b12bd37ec5f5c11aff8>执行開始...

-237  [id:4af96b81d14a4954a6b649308d444e4c,type:second-04db3f945b804500a2bbe2b9aabdce3b] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-04db3f945b804500a2bbe2b9aabdce3b>执行開始...

Base:something about baseInfo

engine is Ok

Base:something about baseInfo

seat is Ok

-245  [id:4af96b81d14a4954a6b649308d444e4c,type:second-a763f156ffd74b5db285198d2498edcf] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-a763f156ffd74b5db285198d2498edcf>执行结束

-246  [id:4af96b81d14a4954a6b649308d444e4c,type:second-f3efba2dc7804c6cbcd5a25f42fdc177] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-f3efba2dc7804c6cbcd5a25f42fdc177>执行開始...

-246  [id:4af96b81d14a4954a6b649308d444e4c,type:second-abdb57f0641a4727a9efa744d07cf2d1] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-abdb57f0641a4727a9efa744d07cf2d1>执行结束

Base:something about baseInfo

seat is Ok

-248  [id:4af96b81d14a4954a6b649308d444e4c,type:second-8c3b9359bcfa4de7b6e0492daab0d73a] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-8c3b9359bcfa4de7b6e0492daab0d73a>执行開始...

Base:something about baseInfo

tyre is Ok

-250  [id:4af96b81d14a4954a6b649308d444e4c,type:second-f3efba2dc7804c6cbcd5a25f42fdc177] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-f3efba2dc7804c6cbcd5a25f42fdc177>执行结束

-250  [id:4af96b81d14a4954a6b649308d444e4c,type:second-c2d2fb38ef6c4509b3a39b3e7d5c1d61] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-c2d2fb38ef6c4509b3a39b3e7d5c1d61>执行结束

Base:something about baseInfo

seat is Ok

-252  [id:4af96b81d14a4954a6b649308d444e4c,type:second-869b573e226046aca8ad30765f1f300c] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-869b573e226046aca8ad30765f1f300c>执行開始...

-253  [id:4af96b81d14a4954a6b649308d444e4c,type:second-8c3b9359bcfa4de7b6e0492daab0d73a] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-8c3b9359bcfa4de7b6e0492daab0d73a>执行结束

Base:something about baseInfo

seat is Ok

Base:something about baseInfo

tyre is Ok

-257  [id:4af96b81d14a4954a6b649308d444e4c,type:second-d624ea0a6df3409c80df6b97ab3c813b] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-d624ea0a6df3409c80df6b97ab3c813b>执行结束

-258  [id:4af96b81d14a4954a6b649308d444e4c,type:second-04db3f945b804500a2bbe2b9aabdce3b] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-04db3f945b804500a2bbe2b9aabdce3b>执行结束

Base:something about baseInfo

tyre is Ok

Base:something about baseInfo

tyre is Ok

-262  [id:4af96b81d14a4954a6b649308d444e4c,type:second-869b573e226046aca8ad30765f1f300c] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-869b573e226046aca8ad30765f1f300c>执行结束

-264  [id:4af96b81d14a4954a6b649308d444e4c,type:second-d6f7074f6c4a4b12bd37ec5f5c11aff8] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-d6f7074f6c4a4b12bd37ec5f5c11aff8>执行结束

-264  [RMI TCP Connection(1)-192.168.84.73] INFO   - 线程组<id:4af96b81d14a4954a6b649308d444e4c,type:second>执行结束, 用时:30ms

-333  [RMI TCP Connection(1)-192.168.84.73] INFO   - 线程组<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third>执行開始,线程数5...

-334  [id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-ca58e9b733514c668a224875417c9d26] INFO   - 线程<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-ca58e9b733514c668a224875417c9d26>执行開始...

-334  [id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-17debef817e34c49996a2c38840f3de2] INFO   - 线程<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-17debef817e34c49996a2c38840f3de2>执行開始...

-334  [id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-af0e914cce89480987c6184a885770d5] INFO   - 线程<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-af0e914cce89480987c6184a885770d5>执行開始...

-334  [id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-3f7707cb2a224d3d8844a09271b24a07] INFO   - 线程<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-3f7707cb2a224d3d8844a09271b24a07>执行開始...

Base:something about baseInfo

door is Ok

Base:something about baseInfo

door is Ok

Base:something about baseInfo

-338  [id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-ca58e9b733514c668a224875417c9d26] INFO   - 线程<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-ca58e9b733514c668a224875417c9d26>执行结束

door is Ok

-339  [id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-cfccfb37ebda4279b7552f7e060b2ddb] INFO   - 线程<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-cfccfb37ebda4279b7552f7e060b2ddb>执行開始...

-338  [id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-17debef817e34c49996a2c38840f3de2] INFO   - 线程<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-17debef817e34c49996a2c38840f3de2>执行结束

Base:something about baseInfo

door is Ok

Base:something about baseInfo

-340  [id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-3f7707cb2a224d3d8844a09271b24a07] INFO   - 线程<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-3f7707cb2a224d3d8844a09271b24a07>执行结束

roof is Ok

-340  [id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-af0e914cce89480987c6184a885770d5] INFO   - 线程<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-af0e914cce89480987c6184a885770d5>执行结束

-342  [id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-cfccfb37ebda4279b7552f7e060b2ddb] INFO   - 线程<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-cfccfb37ebda4279b7552f7e060b2ddb>执行结束

-343  [RMI TCP Connection(1)-192.168.84.73] INFO   - 线程组<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third>执行结束, 用时:10ms

从上面的日志能够看出:

因为第一步工作是挑一个单干的,因此是没有启用线程组的


第二步同一时候有9个线程干活:

1

2

3
-234  [RMI TCP Connection(1)-192.168.84.73] INFO   - 线程组<id:4af96b81d14a4954a6b649308d444e4c,type:second>执行開始,线程数9...

...

-264  [RMI TCP Connection(1)-192.168.84.73] INFO   - 线程组<id:4af96b81d14a4954a6b649308d444e4c,type:second>执行结束, 用时:30ms

第三步同一时候有5个线程干活:

1

2

3
-333  [RMI TCP Connection(1)-192.168.84.73] INFO   - 线程组<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third>执行開始,线程数5...

...

-343  [RMI TCP Connection(1)-192.168.84.73] INFO   - 线程组<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third>执行结束, 用时:10ms

总结:

Tiny并行计算框架确实是能够方便的解决各种复杂并行计算的问题。