IO非阻塞模型

时间:2021-07-11 23:59:57

 

 

#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<unistd.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<fcntl.h>
#include<thread> 
#include<mutex>
#include<queue>

using namespace std;

#define MaxSize 2048

std::queue<int> SocketQueue;

/*
tcp多线程并发,非阻塞IO模式,
单线程对socket遍历。
因为设置socket为非阻塞模式,所以能快速读取是否有数据。
*/
void *Task(void *arg)
{
    while (1)
    {
        while (SocketQueue.empty())
        {
            sleep(1);
        }
        
        int socket = SocketQueue.front();
        SocketQueue.pop();

        fd_set rfds, wfds;

        FD_ZERO(&rfds);
        FD_ZERO(&wfds);
        FD_SET(socket, &rfds);
        FD_SET(socket, &wfds);
        int selres = select(socket + 1, &rfds, &wfds, NULL, NULL);
        //socket 可读
        char msg[MaxSize] = { '\0' };
        int len = 0;
        if (1 == FD_ISSET(socket, &rfds))
        {

            len = recv(socket, msg, MaxSize, 0);
            if (len > 0)
            {
                printf("recv %s \n", msg);
            }
        }
        else
        {
            printf("读取数据还没准备好 !\n");
        }

        //socket 可写
        if (1 == FD_ISSET(socket, &wfds))
        {
            if (len > 0)
            {
                send(socket, msg, len, 0);
                printf("send %s \n", msg);
            }
        }
        else
        {
            printf("写数据还没准备好 !\n");
        }
        len = 0;
        SocketQueue.push(socket);
    }
    
    return NULL;
}


void Server_Run()
{
    unsigned short port = 8000;

    printf("TCP Server Started at port %d!\n", port);

    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0)
    {
        perror("socket");
        exit(-1);
    }

    struct sockaddr_in my_addr;
    bzero(&my_addr, sizeof(my_addr));
    my_addr.sin_family = AF_INET;
    my_addr.sin_port = htons(port);
    my_addr.sin_addr.s_addr = htonl(INADDR_ANY);

    printf("Binding server to port %d\n", port);
    int err_log = bind(sockfd, (struct sockaddr*)&my_addr, sizeof(my_addr));
    if (err_log != 0)
    {
        perror("binding");
        close(sockfd);
        exit(-1);
    }

    err_log = listen(sockfd, 10);
    if (err_log != 0)
    {
        perror("listen");
        close(sockfd);
        exit(-1);
    }

    printf("Waiting client...\n");

    while (1)
    {
        size_t recv_len = 0;
        struct sockaddr_in client_addr;
        char cli_ip[INET_ADDRSTRLEN] = "";
        socklen_t cliaddr_len = sizeof(client_addr);
        int NewClient;
        NewClient = accept(sockfd, (struct sockaddr*)&client_addr, &cliaddr_len);
        if (NewClient < 0)
        {
            perror("accept");
            continue;
        }

        inet_ntop(AF_INET, &client_addr.sin_addr, cli_ip, INET_ADDRSTRLEN);
        printf("client ip = %s\n", cli_ip);

        //设置socket为非阻塞模式。
        fcntl(NewClient, F_SETFL, fcntl(NewClient, F_GETFL, 0) | O_NONBLOCK);
        SocketQueue.push(NewClient);
    }
    close(sockfd);
}

int main()
{
    pthread_t thread_id;
    pthread_create(&thread_id, NULL, &Task, NULL);

    Server_Run();

    return 0;
}