而且是完美的实现.

时间:2022-01-01 07:52:04

梗阻行列队伍(BlockingQueue)是一个撑持两个附加操纵的行列队伍。这两个附加的操纵是:在行列队伍为空时,获取元素的线程会期待行列队伍变为非空。当行列队伍满时,存储元素的线程会期待行列队伍可用。梗阻行列队伍常用于出产者和消费者的场景,出产者是往行列队伍里添加元素的线程,,消费者是从行列队伍里拿元素的线程。梗阻行列队伍就是出产者存放元素的容器,而消费者也只从容器里拿元素。

先放张图:

而且是完美的实现.


按照前面的描述, 我们来考虑下梗阻行列队伍在措施中会呈现的问题:
梗阻行列队伍 需要实现两个成果: 使线程期待与唤醒线程. 具体介绍如下:
在极端条件下, 需要挂起线程, 期待行列队伍满足条件后,再去执行添加或提取 操纵
待行列队伍满足了条件之后, 通知线程去继续其挂起之前的操纵....
涉及到的技术:
线程同步 与 线程间通信
可能孕育发存亡锁的分析:
在某个时刻,行列队伍为空或者是已满, 此时出产者未能存入数据或者还在存入数据到行列队伍中, 这就会孕育产生使得行列队伍堕落
如果此时, 消费者对行列队伍在进行操纵就会孕育发存亡锁...由于之前的出产者的操纵使得行列队伍出了问题并没有释放锁, 此时就会造成死锁
这是从预防死锁的角度来解决死锁问题
首先就是同步资源-行列队伍的锁定,既然有锁那么就要考虑死锁问题,最后就是线程间的通信。

也就是说,实现梗阻行列队伍需要考虑这三个点。

查了下资料,大多都是java的封装好的类库,不过没事,横竖思想,理论都是一样的,差此外就是实现差别。但还是有个不错的C#实现----<< C# 实现出产者消费者行列队伍 >>。该文其实也道出了梗阻行列队伍在撤除出产者-消费者模型外的应用,昨天查资料的时候,阿里措施员写了篇文章关于邮件接收下载的,就是使用梗阻行列队伍,但是我忘了原文在哪了。其时看的时候,想起来当初看<<C#高级编程>>第十章的管道。书上介绍的是:开一个task去读取文件名,放到梗阻行列队伍中,然后开一个行列队伍按照文件名读取内容,这个应用于邮件接收下载是一样的。暂时先不说这个了,有兴趣的可以本身去看看那本书。
那么我们如何本身实现梗阻行列队伍呢?正如上面说到的考虑点,同步,线程通信,防备死锁。看看代码:

而且是完美的实现.

using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Threading;using System.Threading.Tasks;namespace SuiBao.Utility {    ///梗阻行列队伍(BlockingQueue)是一个撑持两个附加操纵的行列队伍。这两个附加的操纵是:在行列队伍为空时,获取元素的线程会期待行列队伍变为非空。当行列队伍满时,存储元素的线程会期待行列队伍可用。梗阻行列队伍常用于出产者和消费者的场景,出产者是往行列队伍里添加元素的线程,消费者是从行列队伍里拿元素的线程。梗阻行列队伍就是出产者存放元素的容器,而消费者也只从容器里拿元素。 //梗阻行列队伍 需要实现两个成果: 使线程期待与唤醒线程. 具体介绍如下:    // 在极端条件下, 需要挂起线程, 期待行列队伍满足条件后,再去执行添加或提取 操纵    // 待行列队伍满足了条件之后, 通知线程去继续其挂起之前的操纵....    //涉及到的技术:    //线程同步(此实例用到了lock) 与 线程间通信(此示例用到了event)    // // 可能孕育发存亡锁的分析:    // 在某个时刻,行列队伍为空或者是已满, 此时出产者未能存入数据或者还在存入数据到行列队伍中, 这就会孕育产生使得行列队伍堕落    // 如果此时, 消费者对行列队伍在进行操纵就会孕育发存亡锁...由于之前的出产者的操纵使得行列队伍出了问题并没有释放锁, 此时就会造成死锁    // 这是从预防死锁的角度来解决死锁问题 public class BlockQueue<T>    {        private Queue<T> _inner_queue = null;        private ManualResetEvent _dequeue_wait = null;        public int Count        {            get { return _inner_queue.Count; }        }        public BlockQueue(int capacity = -1)        {            this._inner_queue = capacity == -1 ? new Queue<T>() : new Queue<T>(capacity);            this._dequeue_wait = new ManualResetEvent(false);        }        // 入队加锁 public void EnQueue(T item)        {            if (this._IsShutdown == true) throw new InvalidOperationException("处事未开启.[EnQueue]");            lock (this._inner_queue)            {                this._inner_queue.Enqueue(item);                this._dequeue_wait.Set();            }        }        // 出队加锁 public T DeQueue(int waitTime)        {            bool _queueEmpty = false;            T item = default(T);            while (true)            {                lock (this._inner_queue)                {                    // 判断行列队伍中是否有元素.... if (this._inner_queue.Count > 0)                    {                        item = this._inner_queue.Dequeue();                        this._dequeue_wait.Reset();                        //break;                    }                    else                    {                        if (this._IsShutdown == true)                        {                            throw new InvalidOperationException("处事未开启[DeQueue].");                        }                        else                        {                            _queueEmpty = true;                        }                    }                }                if (item != null)                {                    return item;                }                if (_queueEmpty)                {                    this._dequeue_wait.WaitOne(waitTime);                }            }        }        private bool _IsShutdown = false;        public void Shutdown()        {            this._IsShutdown = true;            this._dequeue_wait.Set();        }        public void Clear()        {            this._inner_queue.Clear();        }    } }