libevent带负载均衡的多线程使用示例

时间:2021-06-30 03:05:14
功能:
主线程根据负载工作线程负载均衡算法,每隔一秒钟向特定的工作线程发送一条字符串信息,工作线程简单的把字符串信息打开出来。
 
Makefile
 
  1. eventtest : eventtest.c
  2. gcc -Wall -g -levent -lpthread -o eventtest eventtest.c
  3. .PHONY : clean
  4. clean :
  5. rm eventtest -f
 
eventtest.c
 
  1. #include <stdio.h>
  2. #include <stdlib.h>
  3. #include <unistd.h>
  4. #include <pthread.h>
  5. #include <sys/types.h>
  6. #include <sys/socket.h>
  7. #include <event.h>
  8. typedef struct {
  9. pthread_t tid;
  10. struct event_base *base;
  11. struct event event;
  12. int read_fd;
  13. int write_fd;
  14. }LIBEVENT_THREAD;
  15. typedef struct {
  16. pthread_t tid;
  17. struct event_base *base;
  18. }DISPATCHER_THREAD;
  19. const int thread_num = 10;
  20. LIBEVENT_THREAD *threads;
  21. DISPATCHER_THREAD dispatcher_thread;
  22. int last_thread = 0;
  23. static void
  24. thread_libevent_process(int fd, short which, void *arg)
  25. {
  26. int ret;
  27. char buf[128];
  28. LIBEVENT_THREAD *me = arg;
  29. if (fd != me->read_fd) {
  30. printf("thread_libevent_process error : fd != me->read_fd\n");
  31. exit(1);
  32. }
  33. ret = read(fd, buf, 128);
  34. if (ret > 0) {
  35. buf[ret] = '\0';
  36. printf("thread %llu receive message : %s\n", (unsigned long long)me->tid, buf);
  37. }
  38. return;
  39. }
  40. static void *
  41. worker_thread(void *arg)
  42. {
  43. LIBEVENT_THREAD *me = arg;
  44. me->tid = pthread_self();
  45. event_base_loop(me->base, 0);
  46. return NULL;
  47. }
  48. static void
  49. timeout_cb(int fd, short event, void *arg)
  50. {
  51. struct timeval tv;
  52. struct event *timeout = arg;
  53. int tid = (last_thread + 1) % thread_num;        //memcached中线程负载均衡算法
  54. LIBEVENT_THREAD *thread = threads + tid;
  55. last_thread = tid;
  56. write(thread->write_fd, "Hello world!", sizeof("Hello world!") - 1);
  57. evutil_timerclear(&tv);
  58. tv.tv_sec = 1;
  59. event_add(timeout, &tv);
  60. }
  61. int
  62. main (int argc, char *argv[])
  63. {
  64. int ret;
  65. int i;
  66. int fd[2];
  67. struct event timeout;
  68. struct timeval tv;
  69. pthread_t tid;
  70. dispatcher_thread.base = event_init();
  71. if (dispatcher_thread.base == NULL) {
  72. perror("event_init( base )");
  73. return 1;
  74. }
  75. dispatcher_thread.tid = pthread_self();
  76. threads = calloc(thread_num, sizeof(LIBEVENT_THREAD));
  77. if (threads == NULL) {
  78. perror("calloc");
  79. return 1;
  80. }
  81. for (i = 0; i < thread_num; i++) {
  82. ret = socketpair(AF_LOCAL, SOCK_STREAM, 0, fd);
  83. if (ret == -1) {
  84. perror("socketpair()");
  85. return 1;
  86. }
  87. threads[i].read_fd = fd[1];
  88. threads[i].write_fd = fd[0];
  89. threads[i].base = event_init();
  90. if (threads[i].base == NULL) {
  91. perror("event_init()");
  92. return 1;
  93. }
  94. event_set(&threads[i].event, threads[i].read_fd, EV_READ | EV_PERSIST, thread_libevent_process, &threads[i]);
  95. event_base_set(threads[i].base, &threads[i].event);
  96. if (event_add(&threads[i].event, 0) == -1) {
  97. perror("event_add()");
  98. return 1;
  99. }
  100. }
  101. for (i = 0; i < thread_num; i++) {
  102. pthread_create(&tid, NULL, worker_thread, &threads[i]);
  103. }
  104. evtimer_set(&timeout, timeout_cb, &timeout);
  105. event_base_set(dispatcher_thread.base, &timeout);
  106. evutil_timerclear(&tv);
  107. tv.tv_sec = 1;
  108. event_add(&timeout, &tv);
  109. event_base_loop(dispatcher_thread.base, 0);
  110. return 0;
  111. }