MapProcessor

时间:2024-01-25 17:44:58

tech/powerjob/worker/core/processor/sdk/MapProcessor.java

public interface MapProcessor extends BasicProcessor {

    Logger log = LoggerFactory.getLogger(MapProcessor.class);

    int RECOMMEND_BATCH_SIZE = 200;

    /**
     * 分发子任务
     * @param taskList 子任务,再次执行时可通过 TaskContext#getSubTask 获取
     * @param taskName 子任务名称,即子任务处理器中 TaskContext#getTaskName 获取到的值
     * @throws PowerJobCheckedException map 失败将抛出异常
     */
    default void map(List<?> taskList, String taskName) throws PowerJobCheckedException {

        if (CollectionUtils.isEmpty(taskList)) {
            return;
        }

        TaskDO task = ThreadLocalStore.getTask();
        WorkerRuntime workerRuntime = ThreadLocalStore.getRuntimeMeta();

        if (taskList.size() > RECOMMEND_BATCH_SIZE) {
            log.warn("[Map-{}] map task size is too large, network maybe overload... please try to split the tasks.", task.getInstanceId());
        }

        // 修复 map 任务命名和根任务名或者最终任务名称一致导致的问题(无限生成子任务或者直接失败)
        if (TaskConstant.ROOT_TASK_NAME.equals(taskName) || TaskConstant.LAST_TASK_NAME.equals(taskName)) {
            log.warn("[Map-{}] illegal map task name : {}! please do not use 'OMS_ROOT_TASK' or 'OMS_LAST_TASK' as map task name. as a precaution, it will be renamed 'X-{}' automatically." ,task.getInstanceId() ,taskName , taskName);
            taskName ="X-"+taskName;
        }

        // 1. 构造请求
        ProcessorMapTaskRequest req = new ProcessorMapTaskRequest(task, taskList, taskName);

        // 2. 可靠发送请求(任务不允许丢失,需要使用 ask 方法,失败抛异常)
        boolean requestSucceed = TransportUtils.reliableMapTask(req, task.getAddress(), workerRuntime);

        if (requestSucceed) {
            log.info("[Map-{}] map task[name={},num={}] successfully!", task.getInstanceId(), taskName, taskList.size());
        }else {
            throw new PowerJobCheckedException("map failed for task: " + taskName);
        }
    }

    /**
     * 是否为根任务
     * @return true -> 根任务 / false -> 非根任务
     */
    default boolean isRootTask() {
        TaskDO task = ThreadLocalStore.getTask();
        return TaskConstant.ROOT_TASK_NAME.equals(task.getTaskName());
    }
}

MapProcessor接口继承了BasicProcessor,它提供了默认的map方法用于分发子任务,它主要是构造了ProcessorMapTaskRequest,通过TransportUtils.reliableMapTask发送请求;它还提供了isRootTask方法用于判断当前任务是不是根任务