生产者消费者模型

时间:2021-08-07 17:42:14
  1 public final class Data {
2
3 private String id;
4 private String name;
5
6 public Data(String id, String name){
7 this.id = id;
8 this.name = name;
9 }
10
11 public String getId() {
12 return id;
13 }
14
15 public void setId(String id) {
16 this.id = id;
17 }
18
19 public String getName() {
20 return name;
21 }
22
23 public void setName(String name) {
24 this.name = name;
25 }
26
27 @Override
28 public String toString(){
29 return "{id: " + id + ", name: " + name + "}";
30 }
31
32 }
33
34
35 /**
36 * 生产者
37 */
38 public class Provider implements Runnable{
39
40 //共享缓存区
41 private BlockingQueue<Data> queue;
42 //多线程间是否启动变量,有强制从主内存中刷新的功能。即时返回线程的状态
43 private volatile boolean isRunning = true;
44 //id生成器
45 private static AtomicInteger count = new AtomicInteger();
46 //随机对象
47 private static Random r = new Random();
48
49 public Provider(BlockingQueue queue){
50 this.queue = queue;
51 }
52
53 @Override
54 public void run() {
55 while(isRunning){
56 try {
57 //随机休眠0 - 1000 毫秒 表示获取数据(产生数据的耗时)
58 Thread.sleep(r.nextInt(1000));
59 //获取的数据进行累计...
60 int id = count.incrementAndGet();
61 //比如通过一个getData方法获取了
62 Data data = new Data(Integer.toString(id), "数据" + id);
63 System.out.println("当前线程:" + Thread.currentThread().getName() + ", 获取了数据,id为:" + id + ", 进行装载到公共缓冲区中...");
64 /**
65 * quere.offer()
66 * 参数一:元素对象、参数二:数值、参数三:给参数二的数值定义一个时间单位
67 * 此时表示将Data对象加入queue中,2秒钟之内加入成功返回ture,反之返回false
68 */
69 if(!this.queue.offer(data, 2, TimeUnit.SECONDS)){
70 System.out.println("提交缓冲区数据失败....");
71 //do something... 比如重新提交
72 }
73 } catch (InterruptedException e) {
74 e.printStackTrace();
75 }
76 }
77 }
78
79 public void stop(){
80 this.isRunning = false;
81 }
82
83 }
84
85
86 /**
87 * 消费者
88 */
89 public class Consumer implements Runnable{
90
91 private BlockingQueue<Data> queue;
92
93 public Consumer(BlockingQueue queue){
94 this.queue = queue;
95 }
96
97 //随机对象
98 private static Random r = new Random();
99
100 @Override
101 public void run() {
102 while(true){
103 try {
104 //获取数据
105 Data data = this.queue.take();
106 //进行数据处理。休眠0 - 1000毫秒模拟耗时
107 Thread.sleep(r.nextInt(1000));
108 System.out.println("当前消费线程:" + Thread.currentThread().getName() + ", 消费成功,消费数据为id: " + data.getId());
109 } catch (InterruptedException e) {
110 e.printStackTrace();
111 }
112 }
113 }
114 }
115
116
117 public class Main {
118
119 public static void main(String[] args) throws Exception {
120 //内存缓冲区 LinkedBlockingQueue阻塞*队列
121 BlockingQueue<Data> queue = new LinkedBlockingQueue<Data>(10);
122 //生产者
123 Provider p1 = new Provider(queue);
124
125 Provider p2 = new Provider(queue);
126 Provider p3 = new Provider(queue);
127 //消费者
128 Consumer c1 = new Consumer(queue);
129 Consumer c2 = new Consumer(queue);
130 Consumer c3 = new Consumer(queue);
131
132 //创建线程池运行,这是一个缓存的线程池,可以创建无穷大的线程
133 //没有任务的时候不创建线程。空闲线程存活时间为60s(默认值)
134 ExecutorService cachePool = Executors.newCachedThreadPool();
135 cachePool.execute(p1);
136 cachePool.execute(p2);
137 cachePool.execute(p3);
138 cachePool.execute(c1);
139 cachePool.execute(c2);
140 cachePool.execute(c3);
141
142 try {
143 Thread.sleep(3000);
144 } catch (InterruptedException e) {
145 e.printStackTrace();
146 }
147 p1.stop();
148 p2.stop();
149 p3.stop();
150 try {
151 Thread.sleep(2000);
152 } catch (InterruptedException e) {
153 e.printStackTrace();
154 }
155 // cachePool.shutdown();
156 // cachePool.shutdownNow();
157
158
159 }
160
161 }