C++:高并发队列(含双锁[队头锁,队尾锁])

时间:2021-04-25 17:59:54

最近再做一个高并发的服务器处理程序,服务器要用多线程处理大数据量计算,然后将计算结果封装成消息放入队列中,然后另起几个线程专门负责处理消息队列中的消息分发给不同客户端,这样瓶颈就出来了,N多线程都在频繁锁消息队列,这样导致队里的利用效率下降一半,无论是入队还是出队都要对队列进行全部枷锁,恩。后来仔细分析后得出结论,弄一个双锁[队头锁,队尾锁]队列,恩!
 网上也有不少大牛对此进行研究,恩。
原文转自:Parallel Labs
URL:http://www.parallellabs.com/2010/10/25/practical-concurrent-queue-algorithm/
同时另有高人研究:http://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html#nbq
代码如下:

typedef struct node_t {
    TYPE value; 
    node_t *next
} NODE;
 
typedef struct queue_t {
    NODE *head; 
    NODE *tail;
    LOCK q_h_lock;
    LOCK q_t_lock;
} Q;
 
initialize(Q *q) {
   node = new_node()   // Allocate a free node
   node->next = NULL   // Make it the only node in the linked list
   q->head = q->tail = node   // Both head and tail point to it
   q->q_h_lock = q->q_t_lock = FREE   // Locks are initially free
}
 
enqueue(Q *q, TYPE value) {
   node = new_node()       // Allocate a new node from the free list
   node->value = value     // Copy enqueued value into node
   node->next = NULL       // Set next pointer of node to NULL
   lock(&q->q_t_lock)      // Acquire t_lock in order to access Tail
      q->tail->next = node // Link node at the end of the queue
      q->tail = node       // Swing Tail to node
   unlock(&q->q_t_lock)    // Release t_lock
}
 
dequeue(Q *q, TYPE *pvalue) {
   lock(&q->q_h_lock)   // Acquire h_lock in order to access Head
      node = q->head    // Read Head
      new_head = node->next       // Read next pointer
      if new_head == NULL         // Is queue empty?
         unlock(&q->q_h_lock)     // Release h_lock before return
         return FALSE             // Queue was empty
      endif
      *pvalue = new_head->value   // Queue not empty, read value
      q->head = new_head  // Swing Head to next node
   unlock(&q->q_h_lock)   // Release h_lock
   free(node)             // Free node
   return TRUE            // Queue was not empty, dequeue succeeded
}