java同步代码(synchronized)中使用BlockingQueue

时间:2022-07-25 20:14:09

说起BlockingQueue,大家最熟悉的就是生产者-消费者模式下的应用。但是如果在调用queue的上层代码加了同步块就会导致线程死锁。

例如:

    static BlockingQueue<String> queue = new LinkedBlockingQueue();

    /**
* 同步锁
*/
static Object lock = new Object(); static void producer(){
synchronized (lock){
queue.put("1");
}
} static void cosumer(){
synchronized (lock){
//一旦阻塞,将挂起当前线程,lock锁永远等不到释放,生产者也就无法添加元素,take也就永远阻塞
String msg = queue.take();
}
}

但是同步块必须使用的情况下,怎样改进queue的使用呢?见下面示例:

package com.hdwang;

import com.alibaba.fastjson.JSON;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; /**
* Created by hdwang on 2018/4/17.
*/
public class MultiQueueSynTest { static BlockingQueue<Packet> queue1 = new LinkedBlockingQueue(); static BlockingQueue<Packet> queue2 = new LinkedBlockingQueue(); static int seq = 1; /**
* 同步锁
*/
static Object lock = new Object(); static void commit(String msg){
synchronized (lock) {
Packet packet = new Packet();
packet.setSeq(seq++);
packet.setMsg(msg);
try { //queue1.put(packet); //阻塞式添加元素 while(queue1.size()== Integer.MAX_VALUE){ //队满,等待
lock.wait();
} queue1.offer(packet); //非阻塞式添加元素即可
System.out.println("commit msg:" + JSON.toJSONString(packet));
lock.notifyAll(); //通知等待线程 } catch (InterruptedException e) {
e.printStackTrace();
}
}
} static void send(){
while(true) {
synchronized (lock) {
try { //Packet packet = queue1.take(); //阻塞式取元素
//queue2.put(packet); while(queue1.isEmpty()) { //队空,等待
lock.wait(); //等待,交出锁
} Packet packet = queue1.poll(); //非阻塞式取元素即可
System.out.println("send msg:" + JSON.toJSONString(packet));
lock.notifyAll(); //通知等待线程 while (queue2.size() == Integer.MAX_VALUE){ //队满,等待
lock.wait(); //等待,交出锁
}
queue2.offer(packet);
System.out.println("msg->queue2:"+JSON.toJSONString(packet));
lock.notifyAll(); //通知等待线程 } catch (InterruptedException e) {
e.printStackTrace();
}
}
}
} public static void main(String[] args) {
//生产者1
new Thread(new Runnable() {
@Override
public void run() {
while(true){ //不断产生消息
commit("hello1"); try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
//生产者2
new Thread(new Runnable() {
@Override
public void run() {
while(true){ //不断产生消息
commit("hello2"); try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start(); //消费者
new Thread(new Runnable() {
@Override
public void run() { send();
}
}).start(); } static class Packet{
int seq;
String msg; public int getSeq() {
return seq;
} public void setSeq(int seq) {
this.seq = seq;
} public String getMsg() {
return msg;
} public void setMsg(String msg) {
this.msg = msg;
}
} }

运行结果

commit msg:{"msg":"hello1","seq":1}
send msg:{"msg":"hello1","seq":1}
msg->queue2:{"msg":"hello1","seq":1}
commit msg:{"msg":"hello2","seq":2}
send msg:{"msg":"hello2","seq":2}
msg->queue2:{"msg":"hello2","seq":2}
commit msg:{"msg":"hello1","seq":3}
send msg:{"msg":"hello1","seq":3}
msg->queue2:{"msg":"hello1","seq":3}
commit msg:{"msg":"hello2","seq":4}
send msg:{"msg":"hello2","seq":4}
msg->queue2:{"msg":"hello2","seq":4}
commit msg:{"msg":"hello1","seq":5}
send msg:{"msg":"hello1","seq":5}
msg->queue2:{"msg":"hello1","seq":5}
commit msg:{"msg":"hello2","seq":6}
send msg:{"msg":"hello2","seq":6}
msg->queue2:{"msg":"hello2","seq":6}