libevent带负载均衡的多线程使用示例

时间:2022-12-01 00:14:50

from http://blog.chinaunix.net/uid-756931-id-353318.html

分类: LINUX

功能: 主线程根据负载工作线程负载均衡算法,每隔一秒钟向特定的工作线程发送一条字符串信息,工作线程简单的把字符串信息打开出来。
Makefile
  1. eventtest : eventtest.c
  2. gcc -Wall -g -levent -lpthread -o eventtest eventtest.c
  3. .PHONY : clean
  4. clean :
  5. rm eventtest -f

eventtest.c

    #include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>

#include <sys/types.h>
#include <sys/socket.h>

#include <event.h>


typedef struct {
pthread_t tid;
struct event_base *base;
struct event event;
int read_fd;
int write_fd;
}LIBEVENT_THREAD;

typedef struct {
pthread_t tid;
struct event_base *base;
}DISPATCHER_THREAD;


const int thread_num = 10;

LIBEVENT_THREAD *threads;
DISPATCHER_THREAD dispatcher_thread;
int last_thread = 0;



static void
thread_libevent_process(int fd, short which, void *arg)
{
int ret;
char buf[128];
LIBEVENT_THREAD *me = arg;

if (fd != me->read_fd) {
printf("thread_libevent_process error : fd != me->read_fd\n");
exit(1);
}

ret = read(fd, buf, 128);
if (ret > 0) {
buf[ret] = '\0';
printf("thread %llu receive message : %s\n", (unsigned long long)me->tid, buf);
}


return;
}

static void *
worker_thread(void *arg)
{
LIBEVENT_THREAD *me = arg;
me->tid = pthread_self();

event_base_loop(me->base, 0);


return NULL;
}

static void
timeout_cb(int fd, short event, void *arg)
{
struct timeval tv;
struct event *timeout = arg;

int tid = (last_thread + 1) % thread_num; //memcached中线程负载均衡算法

LIBEVENT_THREAD *thread = threads + tid;

last_thread = tid;

write(thread->write_fd, "Hello world!", sizeof("Hello world!") - 1);

evutil_timerclear(&tv);
tv.tv_sec = 1;
event_add(timeout, &tv);
}

int
main (int argc, char *argv[])
{
int ret;
int i;
int fd[2];
struct event timeout;
struct timeval tv;

pthread_t tid;

dispatcher_thread.base = event_init();
if (dispatcher_thread.base == NULL) {
perror("event_init( base )");
return 1;
}
dispatcher_thread.tid = pthread_self();


threads = calloc(thread_num, sizeof(LIBEVENT_THREAD));
if (threads == NULL) {
perror("calloc");
return 1;
}

for (i = 0; i < thread_num; i++) {

ret = socketpair(AF_LOCAL, SOCK_STREAM, 0, fd);
if (ret == -1) {
perror("socketpair()");
return 1;
}

threads[i].read_fd = fd[1];
threads[i].write_fd = fd[0];

threads[i].base = event_init();
if (threads[i].base == NULL) {
perror("event_init()");
return 1;
}


event_set(&threads[i].event, threads[i].read_fd, EV_READ | EV_PERSIST, thread_libevent_process, &threads[i]);
event_base_set(threads[i].base, &threads[i].event);
if (event_add(&threads[i].event, 0) == -1) {
perror("event_add()");
return 1;
}
}

for (i = 0; i < thread_num; i++) {
pthread_create(&tid, NULL, worker_thread, &threads[i]);
}

evtimer_set(&timeout, timeout_cb, &timeout);
event_base_set(dispatcher_thread.base, &timeout);
evutil_timerclear(&tv);
tv.tv_sec = 1;
event_add(&timeout, &tv);

event_base_loop(dispatcher_thread.base, 0);


return 0;
}