本文介绍了java多线程消息队列的实现代码,分享给大家,希望对大家有帮助,顺便也自己留个笔记
1、定义一个队列缓存池:
1
2
|
//static修饰的成员变量和成员方法独立于该类的任何对象。也就是说,它不依赖类特定的实例,被类的所有实例共享。
private static List<Queue> queueCache = new LinkedList<Queue>();
|
2、定义队列缓冲池最大消息数,如果达到该值,那么队列检入将等待检出低于该值时继续进行。
1
|
private Integer offerMaxQueue = 2000 ;
|
3、定义检出线程,如果队列缓冲池没有消息,那么检出线程会线程等待中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
new Thread(){
public void run(){
while ( true ){
String ip = null ;
try {
synchronized (queueCache) {
Integer size = queueCache.size();
if (size== 0 ){
//队列缓存池没有消息,等待。。。。 queueCache.wait();
}
Queue queue = queueCache.remove( 0 );
if (isIpLock(queueStr)){ //假若这个是一个多应用的分布式系统,那么这个判断应该是分布式锁,这里说的锁不是线程停止,而是跳过该消息,滞后处理
queueCache.add(queue);该queue重新加入队列缓冲池,滞后处理,
continue ;
} else {
; //这里是处理该消息的操作。
}
size = queueCache.size();
if (size<offerMaxQueue&&size>= 0 ){ queueCache.notifyAll(); //在队列缓存池不超过最大值的前提下,假若检入正在等待中,那么那么让他们排队检入。
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try { //检出该消息队列的锁
unIpLock(queueStr);
} catch (Execption e) { //捕获异常,不能让线程挂掉
e.printStackTrace();
}
}
}
}.start();
|
4、检入队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
synchronized (queueCache) {
while ( true ){
Integer size = queueCache.size();
if (size>=offerMaxQueue){
try {
queueCache.wait();
continue ; //继续执行等待中的检入任务。
} catch (InterruptedException e) {
e.printStackTrace();
}
} //IF
if (size<=offerMaxQueue&&size> 0 ){
queueCache.notifyAll();
}
break ; //检入完毕
} //while
}
|
5、锁方法实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
/**
* 锁
* @param ip
* @return
* @throws
*/
public Boolean isLock(String queueStr) {
return this .redisManager.setnx(queueStr+ "_lock" , "LOCK" , 10000 )!= 1 ;
}
//解锁
public void unIpLock(String queueStr) {
if (ip!= null ){
this .redisManager.del(queueStr+ "_lock" );
// lock.unlock();
}
}
|
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:http://blog.csdn.net/mr_linjw/article/details/51367719