DelayQueue 实现简单的定时任务

时间:2022-01-20 23:30:12

今天听qq群里面说了一个,用户 可以自定义任务的场景,我首相想到的是 spring 定时任务,不过 想了下 这个 不够灵活而且有一定的延迟。后来想到了

DelayQueue + 线程池 的想法,自己简单做了下实现 。代码如下 


package com.lyq.jsoup.delayqueue;


public enum MethodEnum {

SEND_MESSAGE(0, "handlerSendMessage"),
AUTO_BUY(1, "handlerAutoBuy");

private int index;
private String methodName;

public static MethodEnum getMethodEnumByIndex(int index) {
MethodEnum[] methodEnums = values();
for (int i = 0; i < methodEnums.length; i++) {
MethodEnum methodEnum = methodEnums[i];
if (methodEnum.getIndex() == index) {
return methodEnum;
}
}
return null;
}

MethodEnum(int index, String methodName) {
this.index = index;
this.methodName = methodName;
}

public int getIndex() {
return index;
}

public void setIndex(int index) {
this.index = index;
}

public String getMethodName() {
return methodName;
}

public void setMethodName(String methodName) {
this.methodName = methodName;
}


}

这个类是一个枚举类,用了模拟方法类型。



package com.lyq.jsoup.delayqueue;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class PersonalTask implements Delayed {
private static int counter = 0;
private int id = counter++;
private long triggerTime;
private MethodEnum methodEnum;

public PersonalTask(long triggerTime, MethodEnum methodEnum) {
this.triggerTime = triggerTime;
this.methodEnum = methodEnum;
}

@Override
public long getDelay(TimeUnit unit) {
return unit.convert(triggerTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}

@Override
public int compareTo(Delayed o) {
PersonalTask that = (PersonalTask) o;
if (triggerTime < that.triggerTime) return -1;
if (triggerTime > that.triggerTime) return 1;
return 0;
}

public int getId() {
return id;
}

public void setId(int id) {
this.id = id;
}

public long getTriggerTime() {
return triggerTime;
}

public void setTriggerTime(long triggerTime) {
this.triggerTime = triggerTime;
}

public MethodEnum getMethodEnum() {
return methodEnum;
}

public void setMethodEnum(MethodEnum methodEnum) {
this.methodEnum = methodEnum;
}
}


这个类是 用户自定义任务vo,这个类可以持久化到数据库。



package com.lyq.jsoup.delayqueue;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.DelayQueue;

/**
* Created by Administrator on 2017/1/22.
*/
public class PersonalCussumer implements Runnable {

private DelayQueue<PersonalTask> tasks;

public PersonalCussumer(DelayQueue<PersonalTask> tasks) {
this.tasks = tasks;
}

@Override
public void run() {
while (!Thread.interrupted()) {
try {
PersonalTask task = tasks.take();
if (null == task) {
continue;
}
handler(task);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Finished PersonalCussumer");
}
}

private void handler(PersonalTask task) {
try {
Method method = this.getClass().getMethod(task.getMethodEnum().getMethodName(), PersonalTask.class);
method.invoke(this, task);
} catch (NoSuchMethodException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}

public void handlerSendMessage(PersonalTask task) {
System.out.println("id:" + task.getId() + " start send message");
}

public void handlerAutoBuy(PersonalTask task) {
System.out.println("id:" + task.getId() + " start auto buy");
}
}



这个类是,用户自定义任务的处理逻辑,可以根据用户的定制去做不同的处理 。



package com.lyq.jsoup.delayqueue;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* Created by Administrator on 2017/1/22.
*/
public class ThreadPoolUtils {
public static void main(String[] args){
//存放任务处理的队列
DelayQueue<PersonalTask> delayQueue = new DelayQueue();

//初始化线程池
ArrayBlockingQueue<Runnable> arrayWorkQueue = new ArrayBlockingQueue(10);
ExecutorService threadPool = new ThreadPoolExecutor(5,
10,
60L,
TimeUnit.SECONDS,
arrayWorkQueue,
new ThreadPoolExecutor.AbortPolicy());

//模拟10条数据 这里可以是从数据库里面查询出满足条件的数据
for (int i = 0;i<10;i++){
delayQueue.put(getPersonalTask());
}
threadPool.execute(new PersonalCussumer(delayQueue));
threadPool.shutdown();
System.out.println(Thread.currentThread().getName());
}

public static PersonalTask getPersonalTask(){
Random random = new Random();
int randomAddMilliSenconds = random.nextInt(20);

long triggerTime = System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(randomAddMilliSenconds, TimeUnit.SECONDS);

int methodIndex = new Random().nextInt(2);
MethodEnum methodEnum = MethodEnum.getMethodEnumByIndex(methodIndex);
PersonalTask task = new PersonalTask(triggerTime,methodEnum);

//调适语句 可以忽略
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println(sdf.format(new Date(task.getTriggerTime())));

return task;
}
}



       核心处理逻辑,用线程池去消费队列里面的任务,现实项目中可以把 DelayQueue 做成全局 变量,如果 有用户定制任务,持久化后就放进队列中,等待处理。需要注意的是当服务器重启的时候 需要初始化队列里面的数据。这就需要持久化中有处理状态字段。