Java多任务并行执行(带前置任务)

时间:2022-09-08 19:57:23

出现需求

在登录时,需要向服务器多个接口请求程序所需数据。有些任务B必须等待任务A请求数据返回后在请求。

Java多任务并行执行(带前置任务)

Java多任务并行执行(带前置任务)

MeshTask

public final class MeshTask{

private static final String TOP_TASK_TAG = "TopPointTaskByAutoCreate";

private PointTask topPointTask;

private HashMap<String, PointTask> taskCache;

private boolean isStarted = false;

private ReentrantLock lock = new ReentrantLock();
// private Condition newCondition = lock.newCondition();

private TaskRecycler recycler = new TaskRecycler() {

@Override
public void onPointTaskComplete(String taskTag) {
if (taskCache != null && taskCache.containsKey(taskTag)) {
lock.lock();
try {
taskCache.remove(taskTag);
_toLog("GC Task【"+ taskTag + "】,剩余任务: " + taskCache.size());
}finally{
lock.unlock();
}
}
}
};


MeshTask() {
super();
taskCache = new HashMap<String, PointTask>();
initTopTask();
}

public MeshTask addTask(PointTask task){
return addTask(task, TOP_TASK_TAG);
}

public MeshTask addTask(PointTask task, String... prevTaskTags){
if (isStarted) {
throw new RuntimeException("The MeshTask is already running, add new PointTask is illegal operation.");
}
List<String> prevTaskTagList = new ArrayList<String>(Arrays.asList(prevTaskTags));
for (String taskTag : prevTaskTagList) {
PointTask pointTask = taskCache.get(taskTag);
if (pointTask == null) {
throw new RuntimeException("No pre task for " + taskTag + " was found, you must add the "+ taskTag +" task before adding the " + task.taskTag + " task");
}
pointTask.addNextPointTask(task);
}
task.setPrevTaskTags(prevTaskTagList);
task.setRecycler(recycler);
taskCache.put(task.taskTag, task);
return MeshTask.this;
}

public void execute() {
if (isStarted) {
return;
}
isStarted = true;
topPointTask.toRun();
}


private void initTopTask(){
topPointTask = new PointTask(TOP_TASK_TAG) {

@Override
public boolean doTask4Result(int runNumber) {
return true;
}
};
topPointTask.setRecycler(recycler);

taskCache.put(topPointTask.taskTag, topPointTask);
}

private interface TaskRecycler{
void onPointTaskComplete(String taskTag);
}


/**
* 【任务点】:在网状依赖异步任务中充当 网的节点 ,一个点表示一个任务
*/

public static abstract class PointTask extends Task{

/** 任务的唯一标示 */
public String taskTag;
private int runNumber = -1;
private LinkedList<PointTask> nextTasks;
private List<String> prevTaskTagList;
private TaskRecycler recycler;

/**
* @param taskTag 节点任务的唯一标示
*/

public PointTask(String taskTag) {
super();
this.taskTag = taskTag;
}

/**
* 设置任务回收器
*/

public void setRecycler(TaskRecycler recycler) {
this.recycler = recycler;
}

/**
* 设置上级节点
*/

private void setPrevTaskTags(List<String> prevTaskTagList) {
this.prevTaskTagList = prevTaskTagList;
}

/**
* 添加下级任务
*/

private void addNextPointTask(PointTask pointTask){
if (nextTasks == null) {
nextTasks = new LinkedList<PointTask>();
}
nextTasks.add(pointTask);
}

private void notifyTask(String prevPointTaskTag){
_toLog("Task【" + prevPointTaskTag + "】 try notify Task【"+ this.taskTag +"】");
if (prevTaskTagList != null && prevTaskTagList.contains(prevPointTaskTag)) {
prevTaskTagList.remove(prevPointTaskTag);
}
if (prevTaskTagList != null && !prevTaskTagList.isEmpty()) {
String preTags = "";
for (String string : prevTaskTagList) {
preTags += " 【"+ string + "】 ";
}
_toLog("Task【"+ this.taskTag + "】 notify failed,PreTask left:" + preTags);
return;
}
_toLog("Task【"+ this.taskTag + "】 notify success.");
toRun();
}

private void toRun(){
//这里是用自定义线程池执行任务
TaskThreadPoolExecutor.getInstance().execute(PointTask.this);
}

@Override
protected void doTask() {
boolean isContinue = true;
while (isContinue) {
isContinue = !doTask4Result(++ this.runNumber);
if (isContinue) {
_toLog("Task【"+ this.taskTag + "】 return failed, try aging.");
}
}

_toLog("Task【"+ this.taskTag + "】 return OK.");

// 通知下级节点的任务
notifyNextPointTask();

destroy();
}

private void notifyNextPointTask() {
if (nextTasks == null) {
return;
}
for (PointTask nextTask : nextTasks) {
nextTask.notifyTask(this.taskTag);
}
}

/**
* 执行任务。注意,自己要返回True啊,不然目前设计逻辑是会一直执行直到返回true
*/

public abstract boolean doTask4Result(int runNumber);

private void destroy(){

if (this.recycler != null) {
this.recycler.onPointTaskComplete(this.taskTag);
this.recycler = null;
}

if (nextTasks != null) {
nextTasks.clear();
nextTasks = null;
}

if (prevTaskTagList != null) {
prevTaskTagList.clear();
prevTaskTagList = null;
}
}
}
}

其他

缺乏依赖的class无法直接使用,这个Demo里面的可以直接使用,而且有更详细的注解和其他功能。