一 问题来源:南京烽火通信面试
二 问题:
现有一种定长队列,长度为2的n次方,此队列可被多线程访问,但必须确保线程级安全,即在任意时刻,队列的长度保持不变。
三 笔者分析
1. 队列的存储结构--链式和顺序均可。
2. 长度为2的n次方--便于移动队头和队尾指针,假设队列长度为size,那么rear = (rear + 1) & (size - 1),&运算比算术求模运算效率高。为此,笔者使用了顺序队列。
3. 怎么确保线程级安全
笔者认为此系统一定同时存在多个读者和写者,读者是读取队头元素,写者是不断向队尾插入元素,读者与读者之间,读者与写者之间,写者与写者之间都必须互斥访问,因此,必须采用互斥锁,保证任意时刻队列长度不变。为此,笔者定义一个全局变量balance。
读者 get
加锁
检测balance为1吗?如果为1,表明队尾已经有一个元素加入, 那么读取队头,并将balance = 0
解锁
写者 put
加锁
检测balance为0吗?如果为0,表明队列平衡,没有元素插入, 那么在队列插入元素,并将balance = 1
解锁
这样能够确保同一时刻读、写者之间彼此是互斥的。
四 代码组织
五 代码详解
1. queue.h
/*************************************************************************
> File Name: queue.h
> Author: wangzhicheng
> Created Time: Fri 14 Nov 2014 11:23:55 AM WST
************************************************************************/
#ifndef _QUEUE_H_
#define _QUEUE_H_
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#define MAX 1024 // the max storage space
typedef int DataType; // data type
/*
* sequenece queue
* */
typedef struct Queue {
DataType *array;
int front; // pointer to the header node of queue
int rear; // pointer to the next position of last node in the queue
int size; // initial size of the queue size should be 8 bytes
}Queue;
Queue Q;
int balance; // should be 0, 1 -- a node add in the queue
pthread_mutex_t mutex; // mutex lock
int InitQueue();
int reInitQueue(uint);
int get(DataType *);
int put(DataType);
#endif
2. queue.c
/*************************************************************************
> File Name: queue.c
> Author: wangzhicheng2013
> Created Time: Fri 14 Nov 2014 03:01:32 PM WST
************************************************************************/
#include "queue.h"
/*
* initialize the queue
* return 0 if successfully
* else return -1
* */
int InitQueue() {
int rc = 0;
int i;
Q.array = (DataType *)malloc(sizeof(DataType) * MAX);
if(!Q.array) {
rc = -1;
printf("Init Queue failed, Memeory lack...!\n");
}
else { // initialize the queue
Q.front = 0;
Q.size = 8;
Q.rear = Q.size;
for(i = 0;i < Q.size;i++) Q.array[i] = i;
pthread_mutex_init(&mutex, NULL);
}
return rc;
}
/*
* reinitialize the queue
* return 0 if successfully
* else return -1
* */
int reInitQueue(uint capacity) {
int rc = 0;
int i;
if(capacity >= MAX) rc = -1;
else { // initialize the queue
for(i = 2;i < capacity;i = i << 1) ;
if(i > capacity) i = i / 2; // compute the max 2 ^ n less than capacity
Q.size = i;
for(i = Q.rear;i < Q.size;i++) Q.array[i] = i;
Q.rear = Q.size;
}
return rc;
}
/*
* get a element from the queue
* return 0 if successfully
* else return -1
* */
int get(DataType *e) {
int rc = -1;
pthread_mutex_lock(&mutex);
if(balance) { // if put a node in the queue before
*e = Q.array[Q.front];
Q.front = (Q.front + 1) & (Q.size - 1); // circularly move, save much time
balance = 0; // return orgin
rc = 0;
}
pthread_mutex_unlock(&mutex);
return rc;
}
/*
* put a element in the queue
* return 0 if successfully
* else return -1
* */
int put(DataType e) {
int rc = -1;
pthread_mutex_lock(&mutex);
if(!balance) { // if the queue is balanced
Q.array[Q.rear] = e;
Q.rear = (Q.rear + 1) & (Q.size - 1); // circularly move, save much time
balance = 1;
rc = 0;
}
pthread_mutex_unlock(&mutex);
return rc;
}
3. main.c
/*************************************************************************
> File Name: main.c
> Author: ma6174
> Created Time: Fri 14 Nov 2014 03:26:48 PM WST
************************************************************************/
#include "queue.h"
#include <time.h>
#define THREAD_NUM 20
void *_read(void *arg) {
DataType e;
while(1) {
if(get(&e)) {
perror("read failed...!\n");
}
else printf("%d has been read...!\n", e);
usleep(20);
// sleep(1);
}
}
void *_write(void *arg) {
DataType e = rand() % 10;
while(1) {
if(put(e)) {
perror("write failed...!\n");
}
else printf("%d has been write...!\n", e);
usleep(20);
// sleep(1);
}
}
int main() {
if(InitQueue()) {
exit(EXIT_FAILURE);
}
srand((unsigned)time(NULL));
pthread_t threads[THREAD_NUM];
int i;
for(i = 0;i < THREAD_NUM;i++) {
if(i < THREAD_NUM / 2) {
if(pthread_create(&threads[i], NULL, _read, NULL)) {
perror("thread created failed...!\n");
}
}
else {
if(pthread_create(&threads[i], NULL, _write, NULL)) {
perror("thread created failed...!\n");
}
}
}
for(i = 0;i < THREAD_NUM;i++) {
pthread_join(threads[i], NULL);
}
return 0;
}
4. Makefile
CC=gcc
all:
$(CC) -g -o main src/main.c src/queue.c header/queue.h -I src/ -I header/ -lpthread
六 运行测试
C++版本
一 代码组织与以上相似,将接口与实现分离
二 代码详解
1. queue.h
/*************************************************************************
> File Name: queue.h
> Author: ma6174
> Created Time: Sat 15 Nov 2014 10:30:21 AM WST
************************************************************************/
#ifndef _QUEUE_H_
#define _QUEUE_H_
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <iostream>
#include <queue>
#define MAX 1024
using namespace std;
template<class DataType>
class Queue {
private:
queue<DataType>Q; // the STL quque
uint size; // the size of queue
uint balance;
pthread_mutex_t mutex;
public:
Queue();
bool get(DataType &);
bool put(DataType);
};
#include "queue.cpp"
#endif
2. queue.cpp
/*************************************************************************
> File Name: queue.h
> Author: ma6174
> Created Time: Sat 15 Nov 2014 10:30:21 AM WST
************************************************************************/
#ifndef QUEUE_CPP
#define QUEUE_CPP
#include "queue.h"
template<class DataType>
Queue<DataType>::Queue() {
this->size = 8;
this->balance = 0;
pthread_mutex_init(&this->mutex, NULL);
int i;
for(i = 0;i < this->size;i++) this->Q.push(i);
}
template<class DataType>
bool Queue<DataType>::get(DataType &e) {
bool rc = false;
pthread_mutex_lock(&this->mutex);
if(this->balance) {
e = this->Q.front();
this->Q.pop();
this->balance = 0;
rc = true;
}
pthread_mutex_unlock(&this->mutex);
return rc;
}
template<class DataType>
bool Queue<DataType>::put(DataType e) {
bool rc = false;
pthread_mutex_lock(&this->mutex);
if(!this->balance) {
this->Q.push(e);
this->balance = 1;
rc = true;
}
pthread_mutex_unlock(&this->mutex);
return rc;
}
#endif
3. main.cpp
/*************************************************************************
> File Name: main.cpp
> Author: ma6174
> Created Time: Sat 15 Nov 2014 10:52:49 AM WST
************************************************************************/
#include "queue.h"
#define THREAD_NUM 10
using namespace std;
typedef int DataType;
Queue<DataType>q;
void *_read(void *arg) {
DataType e;
while(1) {
if(q.get(e) == false) {
perror("read failed...!\n");
}
else printf("%d has been read...!\n", e);
usleep(20);
// sleep(1);
}
}
void *_write(void *arg) {
DataType e = rand() % 10;
while(1) {
if(q.put(e) == false) {
perror("write failed...!\n");
}
else printf("%d has been write...!\n", e);
usleep(20);
// sleep(1);
}
}
int main() {
srand((unsigned)time(NULL));
pthread_t threads[THREAD_NUM];
int i;
for(i = 0;i < THREAD_NUM;i++) {
if(i < THREAD_NUM / 2) {
if(pthread_create(&threads[i], NULL, _read, NULL)) {
perror("thread created failed...!\n");
}
}
else {
if(pthread_create(&threads[i], NULL, _write, NULL)) {
perror("thread created failed...!\n");
}
}
}
for(i = 0;i < THREAD_NUM;i++) {
pthread_join(threads[i], NULL);
}
return 0;
}
4. Makefile
CC=g++
all:
# $(CC) -g -o main src/main.c $/src/queue.c $/header/queue.h -lpthread
$(CC) -g -o main src/main.cpp src/queue.cpp header/queue.h -I src/ -I header/ -lpthread
三 测试
无锁实现
1. cas.h
/*************************************************************************
> File Name: cas.h
> Author: ma6174
> Created Time: Wed 26 Nov 2014 09:11:28 PM WST
************************************************************************/
#ifndef _CAS_H_
#define _CAS_H_
int cas(int *reg, int oldval, int newval) {
if(*reg == oldval) {
*reg = newval;
return 1;
}
return 0;
}
#endif
2. queue.h
/*************************************************************************
> File Name: queue.h
> Author: wangzhicheng
> Created Time: Fri 14 Nov 2014 11:23:55 AM WST
************************************************************************/
#ifndef _QUEUE_H_
#define _QUEUE_H_
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#define MAX 1024 // the max storage space
enum {
Empty, Full
};
typedef struct node {
int elem;
int status; // Empty or Full
}node;
/*
* sequenece queue
* */
typedef struct Queue {
node *array;
int front; // pointer to the header node of queue
int rear; // pointer to the next position of last node in the queue
int size; // the size of the queue
}Queue;
Queue Q;
int InitQueue();
int get(int *);
int put(int);
#endif
3. queue.c
/*************************************************************************
> File Name: queue.c
> Author: wangzhicheng2013
> Created Time: Fri 14 Nov 2014 03:01:32 PM WST
************************************************************************/
#include "queue.h"
#include "cas.h"
/*
* initialize the queue
* return 0 if successfully
* else return -1
* */
int InitQueue() {
int rc = 0;
int i;
Q.array = (node *)malloc(sizeof(node) * MAX);
if(!Q.array) {
rc = -1;
printf("Init Queue failed, Memeory lack...!\n");
}
else { // initialize the queue
for(i = 0;i < MAX;i++) Q.array[i].status = Empty;
Q.front = 0;
Q.rear = 8;
Q.size = 8;
for(i = 0;i < Q.size;i++) {
Q.array[i].status = Full;
Q.array[i].elem = i;
}
}
return rc;
}
/*
* putt a element in the queue
* return 0 if successfully
* else return -1
* */
int put(int e) {
int rc = 0;
do {
if((Q.rear + 1) & (Q.size - 1) == Q.front) {
perror("Now queue is full...!\n ");
rc = -1;
goto quit;
}
}while(!cas(&Q.array[Q.rear].status, Empty, Full));
Q.array[Q.rear].elem = e;
cas(&Q.rear, Q.rear, (Q.rear + 1) & (Q.size - 1));
quit:
return rc;
}
/*
* gett a element from the queue
* return 0 if successfully
* else return -1
* */
int get(int *e) {
int rc = 0;
do {
if(Q.front == Q.rear) {
perror("Now queue is empty...!\n ");
rc = -1;
goto quit;
}
}while(!cas(&Q.array[Q.front].status, Full, Empty));
quit:
*e = Q.array[Q.front].elem;
cas(&Q.front, Q.front, (Q.front + 1) & (Q.size - 1));
return rc;
}
4. main.c
/*************************************************************************
> File Name: main.c
> Author: ma6174
> Created Time: Fri 14 Nov 2014 03:26:48 PM WST
************************************************************************/
#include "queue.h"
#include <time.h>
#define THREAD_NUM 20
void *_read(void *arg) {
int e;
while(1) {
if(get(&e)) {
perror("read failed...!\n");
}
else printf("%d has been read...!\n", e);
usleep(20);
// sleep(1);
}
}
void *_write(void *arg) {
int e = rand() % 10;
while(1) {
if(put(e)) {
perror("write failed...!\n");
}
else printf("%d has been write...!\n", e);
usleep(20);
// sleep(1);
}
}
int main() {
if(InitQueue()) {
exit(EXIT_FAILURE);
}
srand((unsigned)time(NULL));
pthread_t threads[THREAD_NUM];
int i;
for(i = 0;i < THREAD_NUM;i++) {
if(i < THREAD_NUM / 2) {
if(pthread_create(&threads[i], NULL, _read, NULL)) {
perror("thread created failed...!\n");
}
}
else {
if(pthread_create(&threads[i], NULL, _write, NULL)) {
perror("thread created failed...!\n");
}
}
}
for(i = 0;i < THREAD_NUM;i++) {
pthread_join(threads[i], NULL);
}
return 0;
}
5.Makefile
CC=gcc
all:
# $(CC) -g -o main src/main.c $/src/queue.c $/header/queue.h -lpthread
$(CC) -g -o main src/main.c src/queue.c header/queue.h header/cas.h -I src/ -I header/ -lpthread
运行