功能:
主线程根据负载工作线程负载均衡算法,每隔一秒钟向特定的工作线程发送一条字符串信息,工作线程简单的把字符串信息打开出来。
Makefile
- eventtest : eventtest.c
- gcc -Wall -g -levent -lpthread -o eventtest eventtest.c
- .PHONY : clean
- clean :
- rm eventtest -f
eventtest.c
- #include <stdio.h>
- #include <stdlib.h>
- #include <unistd.h>
- #include <pthread.h>
- #include <sys/types.h>
- #include <sys/socket.h>
- #include <event.h>
- typedef struct {
- pthread_t tid;
- struct event_base *base;
- struct event event;
- int read_fd;
- int write_fd;
- }LIBEVENT_THREAD;
- typedef struct {
- pthread_t tid;
- struct event_base *base;
- }DISPATCHER_THREAD;
- const int thread_num = 10;
- LIBEVENT_THREAD *threads;
- DISPATCHER_THREAD dispatcher_thread;
- int last_thread = 0;
- static void
- thread_libevent_process(int fd, short which, void *arg)
- {
- int ret;
- char buf[128];
- LIBEVENT_THREAD *me = arg;
- if (fd != me->read_fd) {
- printf("thread_libevent_process error : fd != me->read_fd\n");
- exit(1);
- }
- ret = read(fd, buf, 128);
- if (ret > 0) {
- buf[ret] = '\0';
- printf("thread %llu receive message : %s\n", (unsigned long long)me->tid, buf);
- }
- return;
- }
- static void *
- worker_thread(void *arg)
- {
- LIBEVENT_THREAD *me = arg;
- me->tid = pthread_self();
- event_base_loop(me->base, 0);
- return NULL;
- }
- static void
- timeout_cb(int fd, short event, void *arg)
- {
- struct timeval tv;
- struct event *timeout = arg;
- int tid = (last_thread + 1) % thread_num; //memcached中线程负载均衡算法
- LIBEVENT_THREAD *thread = threads + tid;
- last_thread = tid;
- write(thread->write_fd, "Hello world!", sizeof("Hello world!") - 1);
- evutil_timerclear(&tv);
- tv.tv_sec = 1;
- event_add(timeout, &tv);
- }
- int
- main (int argc, char *argv[])
- {
- int ret;
- int i;
- int fd[2];
- struct event timeout;
- struct timeval tv;
- pthread_t tid;
- dispatcher_thread.base = event_init();
- if (dispatcher_thread.base == NULL) {
- perror("event_init( base )");
- return 1;
- }
- dispatcher_thread.tid = pthread_self();
- threads = calloc(thread_num, sizeof(LIBEVENT_THREAD));
- if (threads == NULL) {
- perror("calloc");
- return 1;
- }
- for (i = 0; i < thread_num; i++) {
- ret = socketpair(AF_LOCAL, SOCK_STREAM, 0, fd);
- if (ret == -1) {
- perror("socketpair()");
- return 1;
- }
- threads[i].read_fd = fd[1];
- threads[i].write_fd = fd[0];
- threads[i].base = event_init();
- if (threads[i].base == NULL) {
- perror("event_init()");
- return 1;
- }
- event_set(&threads[i].event, threads[i].read_fd, EV_READ | EV_PERSIST, thread_libevent_process, &threads[i]);
- event_base_set(threads[i].base, &threads[i].event);
- if (event_add(&threads[i].event, 0) == -1) {
- perror("event_add()");
- return 1;
- }
- }
- for (i = 0; i < thread_num; i++) {
- pthread_create(&tid, NULL, worker_thread, &threads[i]);
- }
- evtimer_set(&timeout, timeout_cb, &timeout);
- event_base_set(dispatcher_thread.base, &timeout);
- evutil_timerclear(&tv);
- tv.tv_sec = 1;
- event_add(&timeout, &tv);
- event_base_loop(dispatcher_thread.base, 0);
- return 0;
- }