今天听qq群里面说了一个,用户 可以自定义任务的场景,我首相想到的是 spring 定时任务,不过 想了下 这个 不够灵活而且有一定的延迟。后来想到了
DelayQueue + 线程池 的想法,自己简单做了下实现 。代码如下
package com.lyq.jsoup.delayqueue;
public enum MethodEnum {
SEND_MESSAGE(0, "handlerSendMessage"),
AUTO_BUY(1, "handlerAutoBuy");
private int index;
private String methodName;
public static MethodEnum getMethodEnumByIndex(int index) {
MethodEnum[] methodEnums = values();
for (int i = 0; i < methodEnums.length; i++) {
MethodEnum methodEnum = methodEnums[i];
if (methodEnum.getIndex() == index) {
return methodEnum;
}
}
return null;
}
MethodEnum(int index, String methodName) {
this.index = index;
this.methodName = methodName;
}
public int getIndex() {
return index;
}
public void setIndex(int index) {
this.index = index;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
}
这个类是一个枚举类,用了模拟方法类型。
package com.lyq.jsoup.delayqueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class PersonalTask implements Delayed {
private static int counter = 0;
private int id = counter++;
private long triggerTime;
private MethodEnum methodEnum;
public PersonalTask(long triggerTime, MethodEnum methodEnum) {
this.triggerTime = triggerTime;
this.methodEnum = methodEnum;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(triggerTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
PersonalTask that = (PersonalTask) o;
if (triggerTime < that.triggerTime) return -1;
if (triggerTime > that.triggerTime) return 1;
return 0;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public long getTriggerTime() {
return triggerTime;
}
public void setTriggerTime(long triggerTime) {
this.triggerTime = triggerTime;
}
public MethodEnum getMethodEnum() {
return methodEnum;
}
public void setMethodEnum(MethodEnum methodEnum) {
this.methodEnum = methodEnum;
}
}
这个类是 用户自定义任务vo,这个类可以持久化到数据库。
package com.lyq.jsoup.delayqueue;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.DelayQueue;
/**
* Created by Administrator on 2017/1/22.
*/
public class PersonalCussumer implements Runnable {
private DelayQueue<PersonalTask> tasks;
public PersonalCussumer(DelayQueue<PersonalTask> tasks) {
this.tasks = tasks;
}
@Override
public void run() {
while (!Thread.interrupted()) {
try {
PersonalTask task = tasks.take();
if (null == task) {
continue;
}
handler(task);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Finished PersonalCussumer");
}
}
private void handler(PersonalTask task) {
try {
Method method = this.getClass().getMethod(task.getMethodEnum().getMethodName(), PersonalTask.class);
method.invoke(this, task);
} catch (NoSuchMethodException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
public void handlerSendMessage(PersonalTask task) {
System.out.println("id:" + task.getId() + " start send message");
}
public void handlerAutoBuy(PersonalTask task) {
System.out.println("id:" + task.getId() + " start auto buy");
}
}
这个类是,用户自定义任务的处理逻辑,可以根据用户的定制去做不同的处理 。
package com.lyq.jsoup.delayqueue;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Created by Administrator on 2017/1/22.
*/
public class ThreadPoolUtils {
public static void main(String[] args){
//存放任务处理的队列
DelayQueue<PersonalTask> delayQueue = new DelayQueue();
//初始化线程池
ArrayBlockingQueue<Runnable> arrayWorkQueue = new ArrayBlockingQueue(10);
ExecutorService threadPool = new ThreadPoolExecutor(5,
10,
60L,
TimeUnit.SECONDS,
arrayWorkQueue,
new ThreadPoolExecutor.AbortPolicy());
//模拟10条数据 这里可以是从数据库里面查询出满足条件的数据
for (int i = 0;i<10;i++){
delayQueue.put(getPersonalTask());
}
threadPool.execute(new PersonalCussumer(delayQueue));
threadPool.shutdown();
System.out.println(Thread.currentThread().getName());
}
public static PersonalTask getPersonalTask(){
Random random = new Random();
int randomAddMilliSenconds = random.nextInt(20);
long triggerTime = System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(randomAddMilliSenconds, TimeUnit.SECONDS);
int methodIndex = new Random().nextInt(2);
MethodEnum methodEnum = MethodEnum.getMethodEnumByIndex(methodIndex);
PersonalTask task = new PersonalTask(triggerTime,methodEnum);
//调适语句 可以忽略
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println(sdf.format(new Date(task.getTriggerTime())));
return task;
}
}
核心处理逻辑,用线程池去消费队列里面的任务,现实项目中可以把 DelayQueue 做成全局 变量,如果 有用户定制任务,持久化后就放进队列中,等待处理。需要注意的是当服务器重启的时候 需要初始化队列里面的数据。这就需要持久化中有处理状态字段。