TBSchedule源码阅读2-TBScheduleManagerFactory 定时任务ManagerFactoryTimerTask

时间:2022-05-22 21:08:42

定时任务 :

主要功能:监听zookeeper状态,正常则this.factory.refresh(),异常则this.factory.reStart();

1 正常情况this.factory.refresh()

异常则停止所有调度任务
正常则重新分配调度器

1.1分配调度器

    public void reRegisterManagerFactory() throws Exception{
        //重新分配调度器
        List<String> stopList = this.getScheduleStrategyManager().registerManagerFactory(this);
        for (String strategyName : stopList) {
            this.stopServer(strategyName);
        }
        this.assignScheduleServer();
        this.reRunScheduleServer();
    }

1.1.1 this.assignScheduleServer();

    /**
     * 根据策略重新分配调度任务的机器
     * @throws Exception
     */
    public void assignScheduleServer() throws Exception{
        for(ScheduleStrategyRunntime run: this.scheduleStrategyManager.loadAllScheduleStrategyRunntimeByUUID(this.uuid)){
            List<ScheduleStrategyRunntime> factoryList = this.scheduleStrategyManager.loadAllScheduleStrategyRunntimeByTaskType(run.getStrategyName());
            if(factoryList.size() == 0 || this.isLeader(this.uuid, factoryList) ==false){
                continue;
            }
            ScheduleStrategy scheduleStrategy =this.scheduleStrategyManager.loadStrategy(run.getStrategyName());
            
            int[] nums =  ScheduleUtil.assignTaskNumber(factoryList.size(), scheduleStrategy.getAssignNum(), scheduleStrategy.getNumOfSingleServer());
            for(int i=0;i<factoryList.size();i++){
                ScheduleStrategyRunntime factory =  factoryList.get(i);
                //更新请求的服务器数量
                this.scheduleStrategyManager.updateStrategyRunntimeReqestNum(run.getStrategyName(), 
                        factory.getUuid(),nums[i]);
            }
        }
    }
  • 从策略目录获取所有策略信息
    策略目录/tbSchedule/zmlTbScheduleTest/strategy
    策略信息
[ScheduleStrategyRunntime [strategyName=DBDemoSingle-stratrery,
  uuid=192.168.255.1$zhuml$69F86F4CFF4E4DA397880E19411DF068$0000000005, 
  ip=null, kind=null, taskName=null, taskParameter=null, requestNum=0, currentNum=0, message=]]
  • 再根据策略名字DBDemoSingle-stratrery,获取策略详细信息
    目录路径:/tbSchedule/zmlTbScheduleTest/strategy/DBDemoSingle-stratrery

    {"strategyName":"DBDemoSingle-stratrery","IPList":["127.0.0.1"],"numOfSingleServer":0,"assignNum":4,
      "kind":"Schedule","taskName":"DBDemoSingle_task","taskParameter":"","sts":"resume"}
  • 然后任务分片,并把分片信息写到zookeeper上
    路径 /tbSchedule/zmlTbScheduleTest/strategy/DBDemoSingle-stratrery/192.168.10.249$zhuml$43AB1F13800E4469BBB2DD385C6A0193$0000000006
    data:

    [{"strategyName":"DBDemoSingle-stratrery","uuid":"192.168.10.249$zhuml$43AB1F13800E4469BBB2DD385C6A0193$0000000006","requestNum":4,"currentNum":0,"message":""}]

1.1.2 运行 this.assignScheduleServer();

    public void reRunScheduleServer() throws Exception{
        for (ScheduleStrategyRunntime run : this.scheduleStrategyManager.loadAllScheduleStrategyRunntimeByUUID(this.uuid)) {
            List<IStrategyTask> list = this.managerMap.get(run.getStrategyName());
            if(list == null){
                list = new ArrayList<IStrategyTask>();
                this.managerMap.put(run.getStrategyName(),list);
            }
            while(list.size() > run.getRequestNum() && list.size() >0){
                IStrategyTask task  =  list.remove(list.size() - 1);
                    try {
                        task.stop(run.getStrategyName());
                    } catch (Throwable e) {
                        logger.error("注销任务错误:strategyName=" + run.getStrategyName(), e);
                    }
                }
           //不足,增加调度器
           ScheduleStrategy strategy = this.scheduleStrategyManager.loadStrategy(run.getStrategyName());
           while(list.size() < run.getRequestNum()){
               IStrategyTask result = this.createStrategyTask(strategy);
               if(null==result){
                   logger.error("strategy 对应的配置有问题。strategy name="+strategy.getStrategyName());
               }
               list.add(result);
            }
        }
    }
  • 获取策略信息,根据分片数量判断是否停止任务或者创建任务
    停止任务stop就ok了
    创建任务 createStrategyTask
    /**
     * 创建调度服务器
     * @param baseTaskType
     * @param ownSign
     * @return
     * @throws Exception
     */
    public IStrategyTask createStrategyTask(ScheduleStrategy strategy)
            throws Exception {
        IStrategyTask result = null;
        try{
            if(ScheduleStrategy.Kind.Schedule == strategy.getKind()){
                String baseTaskType = ScheduleUtil.splitBaseTaskTypeFromTaskType(strategy.getTaskName());
                String ownSign =ScheduleUtil.splitOwnsignFromTaskType(strategy.getTaskName());
                result = new TBScheduleManagerStatic(this,baseTaskType,ownSign,scheduleDataManager);
            }else if(ScheduleStrategy.Kind.Java == strategy.getKind()){
                result=(IStrategyTask)Class.forName(strategy.getTaskName()).newInstance();
                result.initialTaskParameter(strategy.getStrategyName(),strategy.getTaskParameter());
            }else if(ScheduleStrategy.Kind.Bean == strategy.getKind()){
                result=(IStrategyTask)this.getBean(strategy.getTaskName());
                result.initialTaskParameter(strategy.getStrategyName(),strategy.getTaskParameter());
            }
        }catch(Exception e ){
            logger.error("strategy 获取对应的java or bean 出错,schedule并没有加载该任务,请确认" +strategy.getStrategyName(),e);
        }
        return result;
    }

创建任务有三种类型 public enum Kind{Schedule,Java,Bean}
这里只介绍Schedule:result = new TBScheduleManagerStatic(this,baseTaskType,ownSign,scheduleDataManager);