基于无锁队列和c++11的高性能线程池
线程使用c++11库
和线程池之间的消息通讯使用一个简单的无锁消息队列
适用于linux平台,gcc 4.6以上
线程使用c++11库
和线程池之间的消息通讯使用一个简单的无锁消息队列
适用于linux平台,gcc 4.6以上
标签: <无>
代码片段(6)[全屏查看所有代码]
1. [代码]lckfree.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
// lckfree.h // Implementation of lock free queue using CAS operations // for simple multi-threading use cases like: // 1. multiple worker to process incoming messages // 2. async processing using a thread pool // 3. simple tcp server deal with async requests // Author: typhoon_1986@163.com // Refrence: http://coolshell.cn/articles/8239.html #ifndef __LCKFREE_H__ #define __LCKFREE_H__ #include <string> using namespace std;
namespace bfd {
struct LinkNode {
string data;
LinkNode* next;
}; typedef struct LinkNode LinkNode;
class LckFreeQueue {
public :
LckFreeQueue();
~LckFreeQueue();
int push( const string &msg);
string pop(); // non-block pop method
// string bpop(); // block pop method bool empty();
private :
LinkNode * head_;
LinkNode * tail_;
bool empty_;
unsigned int length_;
}; } // namespace bfd
#endif |
2. [代码]lckfree.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
#include <lckfree.h> namespace bfd {
LckFreeQueue::LckFreeQueue(): head_(NULL), tail_(NULL), empty_( true ), length_(0) {
head_ = new LinkNode;
head_->next = NULL;
tail_ = head_;
} LckFreeQueue::~LckFreeQueue() { LinkNode *p = head_;
if (p) {
LinkNode *q = p->next;
delete p;
p = q;
}
} int LckFreeQueue::push( const string &msg) {
LinkNode * q = new LinkNode;
q->data = msg;
q->next = NULL;
LinkNode * p = tail_;
LinkNode * oldp = p;
do {
while (p->next != NULL)
p = p->next;
} while ( __sync_bool_compare_and_swap(&(p->next), NULL, q) != true ); //如果没有把结点链在尾上,再试
__sync_bool_compare_and_swap(&tail_, oldp, q); //置尾结点
return 0;
} string LckFreeQueue::pop() { LinkNode * p;
do {
p = head_;
if (p->next == NULL){
return "" ;
}
} while ( __sync_bool_compare_and_swap(&head_, p, p->next) != true );
return p->next->data;
} bool LckFreeQueue::empty() {
return empty_;
} } |
3. [代码]workthreadpool.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
// workthreadpool.h // 一个用于将消息分发给多个进程,并使用多个进程处理的库,工作进程并不返回数据 #ifndef __WORK_THREAD_POOL__ #define __WORK_THREAD_POOL__ #include <stdio.h> #include <thread> #include <queue> #include <string> #include <vector> #include "lckfree.h" using namespace std;
namespace bfd {
class WorkThreadPool {
public :
WorkThreadPool( int size);
virtual ~WorkThreadPool();
// 需要子类继承并实现的函数,每个线程实际执行的内容
virtual void Init() {};
virtual void Finish() {};
virtual void Handle( const string &msg)=0;
// 将消息放入处理队列, 消息只支持string类型
int SendMessage( const string &msg);
int Start();
int Stop();
private :
void Worker();
int size_;
LckFreeQueue msg_queue_; // 线程池的协作基于这个无锁队列
vector< thread > thread_pool_;
}; } // namespace
#endif |
4. [代码]workthreadpool.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
#include "workthreadpool.h" #include <sstream> #include <unistd.h> namespace bfd {
WorkThreadPool::WorkThreadPool( int size) {
if (size <= 0) { // 最小也需要有1个线程
size_ = 1;
} else {
size_ = size;
}
} WorkThreadPool::~WorkThreadPool() { } int WorkThreadPool::SendMessage( const string &msg) {
msg_queue_.push(msg);
return 0;
} void WorkThreadPool::Worker() {
unsigned int msg_count = 0;
while (1) {
string msg = msg_queue_.pop();
if (msg.empty()) {
printf ( "no msg got, sleep for 0.1 sec\n" );
usleep(100000); // 0.1 sec
continue ;
}
if (msg == "__exit__" ) {
stringstream ss;
ss << "exit worker: " << std::this_thread::get_id() << ", processed: " << msg_count << ".." ;
printf ( "%s\n" , ss.str().c_str());
return ;
}
Handle(msg);
msg_count++;
if (msg_count % 1000 == 0) {
printf ( "every 1000 msg count\n" );
}
}
} int WorkThreadPool::Start() {
for ( int i=0; i < size_; i++) {
thread_pool_.push_back( thread (&WorkThreadPool::Worker, this ) );
}
return 0;
} int WorkThreadPool::Stop() {
for ( int i=0; i < size_; i++) {
SendMessage( "__exit__" );
}
for ( int i=0; i < size_; i++) {
thread_pool_[i].join();
}
return 0;
} } |
5. [代码]main.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
#include "workthreadpool.h" #include <sstream> #include <math.h> class MyThreadPool : public bfd::WorkThreadPool {
public :
MyThreadPool( int size) : bfd::WorkThreadPool(size) {
}
void Handle( const string &msg) {
stringstream ss;
ss << "worker (" << std::this_thread::get_id() << ") got msg: " << msg;
printf ( "%s\n" , ss.str().c_str());
for ( int i=0; i<=999999; i++) {
double result = sqrt ( sqrt (i) / 93.234);
}
}
}; int main() {
printf ( "start running ....\n" );
MyThreadPool pool(5);
pool.Start();
for ( int i=0; i<100; i++) {
pool.SendMessage( "msg info ----------" );
}
pool.Stop();
return 0;
} |
6. [代码]Makefile
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
LIB_SRC_FILES = src/workthreadpool.cpp src/lckfree.cpp TEST_SRC_FILES = src/main.cpp INCLUDE_DIR = src STD_FLAG = -std=c++0x all: main.o libs g++ $(STD_FLAG) -o test_workthreadpool main.o libworkthreadpool.so -lpthread
main.o: $(TEST_SRC_FILES) g++ $(STD_FLAG) -c $(TEST_SRC_FILES) -I$(INCLUDE_DIR)
libs: $(LIB_SRC_FILES) g++ $(STD_FLAG) -o libworkthreadpool.so -fPIC -O2 -shared -Wl,--no-as-needed -Isrc $(LIB_SRC_FILES) -lpthread
.PHONY : clean clean : rm -f test_workthreadpool main.o libworkthreadpool.so
|