socket 网络编程

时间:2024-01-21 23:01:21

1. 基础socket库

socket.h:

/**
* 网络套接字库
*/ #ifndef Socket_h
#define Socket_h #include <stdio.h>
#include <string> #ifdef WIN32
// windows
#include <winsock.h> typedef int socklen_t;
#else
// linux, MacOS
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <errno.h> #define INVALID_SOCKET -1
#define SOCKET_ERROR -1 typedef int SOCKET;
#endif #define SOCKET_BACKLOG 100 namespace avalon {
int socket_error();
int socket_init();
int socket_clean();
void socket_debug(const char* message, ...); class Socket {
public:
static Socket* create(SOCKET socket_fd);
static Socket* create(int family, int type, int protocal = IPPROTO_IP); public:
Socket(SOCKET socket_fd);
Socket(int family, int type, int protocal = IPPROTO_IP);
Socket& operator = (SOCKET socket_fd);
virtual ~Socket(); bool connect(const char* host, unsigned short port);
bool bind(unsigned short port);
bool listen(int backlog = SOCKET_BACKLOG);
Socket* accept(char* client_host = nullptr);
ssize_t send(const char* buffer, size_t len, int flag = );
ssize_t recv(char* buffer, size_t len, int flag = ); int close(); SOCKET getSocketFD(); void set_blocking(const bool blocking); private:
SOCKET _socket_fd; int _family;
int _type;
int _protocal;
};
} #endif

socket.cpp

/**
* 网络套接字库
*/ #include "AvalonSocket.h" #ifdef WIN32
#pragma comment(lib, "wsock32")
#endif
#define SOCKET_DEBUG_LEVEL 0 namespace avalon {
int socket_error() {
int error = ;
#ifdef WIN32
error = WSAGetLastError();
#else
error = errno;
#endif printf("Avalon socket error: %d %s \n", error, strerror(error)); return error;
} void socket_debug(const char* message, ...) {
char buf[] = ""; va_list args;
va_start(args, message);
vsnprintf(buf, , message, args);
va_end(args); std::string error = "Avalon sokcet: ";
error.append(buf);
error.append("\n"); printf(error.c_str()); if (SOCKET_DEBUG_LEVEL) {
int error_no = socket_error();
if (error_no != -) {
throw error_no;
}
}
} int socket_init() {
#ifdef WIN32
WSADATA wsadata;
WORD version = MAKEWORD(, ); int ret = WSAStartup(version,&wsadata);
if (ret) {
socket_debug("Initilize winsock error");
return -;
}
#endif
return ;
} int socket_clean() {
#ifdef WIN32
return WSACleanup();
#endif
return ;
} Socket* Socket::create(SOCKET socket_fd) {
if (socket_fd < ) {
socket_debug("socket_fd(%d) is invailed.", socket_fd);
return nullptr;
}
else {
Socket* socket = new Socket(socket_fd);
if (socket) {
return socket;
}
else {
socket_debug("Create avalon socket failed.");
return nullptr;
}
}
} Socket* Socket::create(int family, int type, int protocal) {
Socket* socket = new Socket(family, type, protocal);
if (socket) {
if (socket->getSocketFD() == INVALID_SOCKET) {
delete socket;
socket_debug("Create socket failed.");
return nullptr;
}
socket_debug("Create socket(%d) successfully.", socket->getSocketFD());
return socket;
}
else {
socket_debug("Create avalon socket failed.");
return nullptr;
}
} Socket::Socket(SOCKET socket_fd)
: _family(AF_INET)
, _type(SOCK_STREAM)
, _protocal(IPPROTO_IP) {
_socket_fd = socket_fd;
} Socket::Socket(int family, int type, int protocal)
: _family(AF_INET)
, _type(SOCK_STREAM)
, _protocal(IPPROTO_IP) {
_socket_fd = socket(family, type, protocal);
if (_socket_fd != INVALID_SOCKET) {
_family = family;
_type = type;
_protocal = protocal;
}
} Socket& Socket::operator = (SOCKET socket_fd) {
_socket_fd = socket_fd; return *this;
} Socket::~Socket() {
if (_socket_fd != -) {
this->close();
}
} bool Socket::connect(const char* host, unsigned short port) {
struct sockaddr_in remote_addr;
remote_addr.sin_family = _family;
remote_addr.sin_port = htons(port);
inet_pton(_family, host, &remote_addr.sin_addr);
if (errno == EAFNOSUPPORT) return false; int ret = ::connect(_socket_fd, (struct sockaddr*)(&remote_addr), sizeof(remote_addr));
if (ret == SOCKET_ERROR) {
socket_debug("Connect %s:%d failed.", host, port);
socket_error();
return false;
}
socket_debug("Connect %s:%d successfully.", host, port);
return true;
} bool Socket::bind(unsigned short port) {
int opt = ;
if (setsockopt(_socket_fd, SOL_SOCKET, SO_REUSEADDR, (char*)&opt, sizeof(opt)) < )
return false; struct sockaddr_in remote_addr;
remote_addr.sin_family = _family;
remote_addr.sin_addr.s_addr = INADDR_ANY;
remote_addr.sin_port = htons(port); int ret = ::bind(_socket_fd, (struct sockaddr*)(&remote_addr), sizeof(remote_addr));
if (ret == SOCKET_ERROR) {
socket_debug("Socket(%d) bind port(%d) failed.", _socket_fd, port);
return false;
}
socket_debug("Socket(%d) bind port(%d) successfully.", _socket_fd, port);
return true;
} bool Socket::listen(int backlog) {
int ret = ::listen(_socket_fd, backlog);
if (ret == SOCKET_ERROR) {
socket_debug("Socket(%d) listen failed.", _socket_fd);
return false;
}
socket_debug("Socket(%d) Listen successfully.", _socket_fd);
return true;
} Socket* Socket::accept(char* client_host) {
struct sockaddr_in com_socket;
socklen_t len = sizeof(com_socket); SOCKET ret = -;
do {
ret = ::accept(_socket_fd, (struct sockaddr*)(&com_socket), &len);
if (ret == SOCKET_ERROR) {
if (errno == EINTR) continue;
else {
socket_debug("Socket(%d) accept failed.", _socket_fd);
socket_error();
return nullptr;
}
}
else break;
} while (true); avalon::Socket* socket = avalon::Socket::create(ret);
if (client_host) {
sprintf(client_host, "%s", inet_ntoa(com_socket.sin_addr));
} socket_debug("Socket(%d) accept successfully, client socket: %d ip: %s", _socket_fd, socket->getSocketFD(), inet_ntoa(com_socket.sin_addr));
return socket;
} ssize_t Socket::send(const char* buffer, size_t len, int flag) {
ssize_t count = ; while (count < len) {
ssize_t bytes = ::send(_socket_fd, buffer + count, len - count, flag);
count += bytes; if (bytes == - || bytes == ) {
socket_error();
break;
}
} return count;
} ssize_t Socket::recv(char* buffer, size_t len, int flag) {
return ::recv(_socket_fd, buffer, len, flag);
} ssize_t Socket::write(const char* buffer, size_t len) {
ssize_t count = ; while (count < len) {
ssize_t bytes = ::write(_socket_fd, buffer + count, len - count);
count += bytes; if (bytes == - || bytes == ) {
socket_error();
break;
}
} return count;
} ssize_t Socket::read(char* buffer, size_t len) {
return ::read(_socket_fd, buffer, len);
} void Socket::set_blocking(const bool blocking) {
int opts; opts = fcntl(_socket_fd, F_GETFL);
if (opts < ) return; if (!blocking) opts = (opts | O_NONBLOCK);
else opts = (opts & ~O_NONBLOCK);
fcntl(_socket_fd, F_SETFL, opts);
} int Socket::close() {
int ret = -;
#ifdef WIN32
ret = closesocket(_socket_fd);
#else
ret = ::close(_socket_fd);
#endif
if (ret == SOCKET_ERROR) {
socket_debug("Socket(%d) close failed.", _socket_fd);
}
_socket_fd = -; return ret;
} SOCKET Socket::getSocketFD() {
return _socket_fd;
}
}

2. 多线程的模型:

在accept成功之后,为每个通信socket创建新的进程和线程,单独用于处理服务器和客户端的通信。但是系统都会有创建进程数量的限制,在linux下,创建的线程也叫轻量级进程,所以即时创建的是线程也会受到系统的限制,通常这个默认限制是2048个,而且进程或者线程数量过多,也会导致进程或者线程切换的开销:

客户端:

    avalon::Socket* socket = avalon::Socket::create(AF_INET, SOCK_STREAM);
if (socket) {
if (!socket->connect("127.0.0.1", )) continue; char buf[] = "";
sprintf(buf, "%d I am a client socket!", i);
ssize_t bytes = socket->send(buf, strlen(buf), ); char recvBuf[];
while (true) {
memset(recvBuf, , );
bytes = socket->recv(recvBuf, );
if (bytes > ) {
printf("%d recv data from remote: %d %s \n", i, bytes, recvBuf);
}
else if (bytes == ) {
printf("remote socket %d cloese. \n", socket->getSocketFD()); break;
}
else {
int error = avalon::socket_error();
printf("%d socket error: %d %s \n", i, error, strerror(error)); break;
}
}
}

服务端:

void communiction_handler(avalon::Socket* socket) {
char buffer[]; while (true) {
if (!socket) continue; printf("thread %ld \n", std::this_thread::get_id()); ssize_t bytes = socket->recv(buffer, , );
if (bytes > ) {
buffer[bytes] = '\0'; printf("recv msg from client: %s \n", buffer); const char* data = "I am remote.0123456789abcdefg!wwwwwer";
ssize_t sendedBytes = socket->send(data, strlen(data), );
}
else if (bytes == ) {
printf("client socket(%d) closed. thread(%ld) \n", socket->getSocketFD(), std::this_thread::get_id()); socket->close(); break;
}
else {
int error_no = avalon::socket_error();
printf("recv msg from client failed %d %s \n", error_no, strerror(error_no)); socket->close(); break;
}
}
} int main(int argc, const char * argv[]) {
// 多线程
std::vector<std::thread> threads; avalon::Socket* listen_socket = avalon::Socket::create(AF_INET, SOCK_STREAM); do {
if (!listen_socket->bind()) break;
if (!listen_socket->listen()) break; while (true) {
// 多线程
avalon::Socket* clientSocket = listen_socket->accept();
if (clientSocket) {
threads.push_back(
std::move(
std::thread(communiction_handler, clientSocket)
)
);
}
} while (false); for(std::thread& thread : threads){
thread.join();
} delete listen_socket;
}

3. I/O多路复用

  内核一旦检测到某个I/O的读取条件就绪,就通知用户进程进行响应;

  多路复用一般用于需要同时处理多个文件描述符,多个套接字口,多种协议的情况;

  相比使用多进程和多线程的机制,I/O多路复用具有系统开销小的优势;

(1)select模型:

最大的问题就是连接数限制,通常是1024或者2048个,不过可以修改内核配置达到更多的连接数。但是由于select模型需要线性遍历fd集合,因此如果连接数改的过大,例如10万个,会导致线性遍历的性能问题,最后的结果可能是导致超时。其次,就是内存拷贝问题,select模型在fd消息通知用户的时候,是采用的将内核中的数据拷贝到用户空间中:

服务端:

//
// main.cpp
// SocketServer
//
// Created by avl-showell on 16/8/8.
// Copyright © 2016年 avl-showell. All rights reserved.
// #include <iostream>
#include "socket/AvalonSocket.h"
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <thread>
#include <vector>
// select
#include <sys/select.h>
#include <sys/time.h>
#define MAX_CLIENT_SOCKET_COUNT 10000
#define RECV_BUFFER_LEN 10 int main(int argc, const char * argv[]) {
std::vector<avalon::Socket*> socket_fds(MAX_CLIENT_SOCKET_COUNT, nullptr);
fd_set read_fds, write_fds;
struct timeval timeout; char recvBuf[RECV_BUFFER_LEN]; avalon::Socket* listen_socket = avalon::Socket::create(AF_INET, SOCK_STREAM); do {
if (!listen_socket->bind()) break;
if (!listen_socket->listen()) break; while (true) {
// select
int listen_socket_fd = listen_socket->getSocketFD();
int max_socket_fd = listen_socket_fd; FD_ZERO(&read_fds);
FD_SET(listen_socket_fd, &read_fds); for (int i = ; i < MAX_CLIENT_SOCKET_COUNT; ++i) {
avalon::Socket* socket = socket_fds[i];
if (!socket) continue; SOCKET socket_fd = socket->getSocketFD();
if (socket_fd > ) {
FD_SET(socket_fd, &read_fds);
if (socket_fd > max_socket_fd)
max_socket_fd = socket_fd;
}
} timeout.tv_sec = ;
timeout.tv_usec = ; int ret = select(max_socket_fd + , &read_fds, NULL, NULL, &timeout);
if (ret == SOCKET_ERROR) {
avalon::socket_error();
break;
}
else if (ret == ) {
printf("select socket timeout. \n");
continue;
}
else {
printf("_______________ \n"); for (int i = ; i < MAX_CLIENT_SOCKET_COUNT; ++i) {
avalon::Socket* socket = socket_fds[i];
if (!socket) continue; SOCKET socket_fd = socket->getSocketFD();
if (socket_fd > && FD_ISSET(socket_fd, &read_fds)) {
int recvedBytes = ; while (true) {
memset(recvBuf, , RECV_BUFFER_LEN);
int bytes = socket->recv(recvBuf, RECV_BUFFER_LEN);
if (bytes > ) {
recvedBytes += bytes;
socket->send(recvBuf, RECV_BUFFER_LEN);
break;
}
else {
avalon::socket_error(); delete socket;
socket_fds[i] = nullptr; break;
}
} recvBuf[recvedBytes] = '\0';
printf("select: recv data from client: %s \n", recvBuf);               // 处理数据...
}
} if (FD_ISSET(listen_socket_fd, &read_fds)) {
printf("select: new client connection. \n"); bool found = false; for (int i = ; i < MAX_CLIENT_SOCKET_COUNT; ++i) {
avalon::Socket* socket = socket_fds[i]; if (!socket) {
avalon::Socket* clientSocket = listen_socket->accept();
if (clientSocket) {
// clientSocket->set_blocking(false);
socket_fds[i] = clientSocket;
found = true;
break;
}
}
} if (!found) {
printf("select: out of max sockets limit. \n");
}
}
}
}
} while (false); delete listen_socket; return ;
}

(2) poll模型:

poll模型和select模型类似,但是poll没有最大文件数量的限制,不过依然存在将消息从内核空间拷贝到用户空间的问题:

服务端:

#include <poll.h>

#define MAX_CLIENT_SOCKET_COUNT 10
#define RECV_BUFFER_LEN 10 int main(int argc, const char * argv[]) {
char recvBuf[RECV_BUFFER_LEN]; avalon::Socket* listen_socket = avalon::Socket::create(AF_INET, SOCK_STREAM); // poll
struct pollfd client_fds[MAX_CLIENT_SOCKET_COUNT];
client_fds[].fd = listen_socket->getSocketFD();
client_fds[].events = POLLIN;
for (int i = ; i < MAX_CLIENT_SOCKET_COUNT; ++i) {
client_fds[i].fd = -;
}
int max_socket = ; do {
if (!listen_socket->bind()) break;
if (!listen_socket->listen()) break; while (true) {
int ready = poll(client_fds, max_socket + , );
if (ready == -) {
avalon::socket_error();
break;
}
else if (ready == ) {
printf("select socket timeout. \n");
continue;
} printf("_______________ \n"); if (client_fds[].revents & POLLIN) {
printf("select: new client connection. \n"); bool found = false; int i = ;
for (i = ; i < MAX_CLIENT_SOCKET_COUNT; ++i) {
avalon::Socket* socket = socket_fds[i]; if (!socket) {
avalon::Socket* clientSocket = listen_socket->accept();
if (clientSocket) {
client_fds[i].fd = clientSocket->getSocketFD();
client_fds[i].events = POLLIN;
socket_fds[i] = clientSocket;
found = true;
break;
}
}
} if (!found) {
printf("select: out of max sockets limit. \n");
}
else {
if (i > max_socket)
max_socket = i;
}
} for (int j = ; j <= max_socket; ++j) {
avalon::Socket* socket = socket_fds[j];
if (!socket) continue; if (client_fds[j].revents & (POLLIN | POLLERR)) {
int recvedBytes = ; while (true) {
memset(recvBuf, , RECV_BUFFER_LEN);
int bytes = socket->read(recvBuf, RECV_BUFFER_LEN);
if (bytes > ) {
recvedBytes += bytes;
int writedBytes = socket->write(recvBuf, RECV_BUFFER_LEN); recvBuf[bytes] = '\0';
printf("select: recv data from client: %s \n", recvBuf); if (bytes < RECV_BUFFER_LEN) break;
}
else {
avalon::socket_error(); delete socket;
client_fds[j].fd = -;
socket_fds[j] = nullptr; break;
}
}
}
}
}
} while (false); for(std::thread& thread : threads){
thread.join();
} delete listen_socket; return ;
}

(2) epoll模型

epoll模型是poll模型的改进版本,没有文件描述符的限制,epoll只处理活跃的文件描述符,不会遍历整个集合,而且epoll使用了内核中的“共享内存”,减少了内存的拷贝:

/* 实现功能:通过epoll, 处理多个socket
* 监听一个端口,监听到有链接时,添加到epoll_event
*/
#include "select.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <poll.h>
#include <sys/epoll.h>
#include <sys/time.h>
#include <netinet/in.h> typedef struct _CLIENT{
int fd;
struct sockaddr_in addr; /* client's address information */
} CLIENT; #define MYPORT 59000 //最多处理的connect
#define MAX_EVENTS 500 //当前的连接数
int currentClient = ; //数据接受 buf
#define REVLEN 10
char recvBuf[REVLEN]; //EPOLL相关
//epoll描述符
int epollfd;
//事件数组
struct epoll_event eventList[MAX_EVENTS]; void AcceptConn(int srvfd);
void RecvData(int fd); int main()
{
int i, ret, sinSize;
int recvLen = ;
fd_set readfds, writefds;
int sockListen, sockSvr, sockMax;
int timeout;
struct sockaddr_in server_addr;
struct sockaddr_in client_addr; //socket
if((sockListen=socket(AF_INET, SOCK_STREAM, )) < )
{
printf("socket error\n");
return -;
} bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(MYPORT);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY); //bind
if(bind(sockListen, (struct sockaddr*)&server_addr, sizeof(server_addr)) < )
{
printf("bind error\n");
return -;
} //listen
if(listen(sockListen, ) < )
{
printf("listen error\n");
return -;
} //1. epoll 初始化
epollfd = epoll_create(MAX_EVENTS);
struct epoll_event event;
event.events = EPOLLIN|EPOLLET;
event.data.fd = sockListen; //2. epoll_ctrl
if(epoll_ctl(epollfd, EPOLL_CTL_ADD, sockListen, &event) < )
{
printf("epoll add fail : fd = %d\n", sockListen);
return -;
} //epoll
while()
{
timeout=;
//3. epoll_wait
int ret = epoll_wait(epollfd, eventList, MAX_EVENTS, timeout); if(ret < )
{
printf("epoll error\n");
break;
}
else if(ret == )
{
printf("timeout ...\n");
continue;
} //直接获取了事件数量,给出了活动的流,这里是和poll区别的关键
int n = ;
for(n=; n<ret; n++)
{
//错误退出
if ((eventList[n].events & EPOLLERR) ||
(eventList[n].events & EPOLLHUP) ||
!(eventList[n].events & EPOLLIN))
{
printf ( "epoll error\n");
close (eventList[n].data.fd);
return -;
} if (eventList[n].data.fd == sockListen)
{
AcceptConn(sockListen); }else{
RecvData(eventList[n].data.fd);
//不删除
// epoll_ctl(epollfd, EPOLL_CTL_DEL, pEvent->data.fd, pEvent);
}
}
} close(epollfd);
close(sockListen); printf("test\n");
return ;
} /**************************************************
函数名:AcceptConn
功能:接受客户端的链接
参数:srvfd:监听SOCKET
***************************************************/
void AcceptConn(int srvfd)
{
struct sockaddr_in sin;
socklen_t len = sizeof(struct sockaddr_in);
bzero(&sin, len); int confd = accept(srvfd, (struct sockaddr*)&sin, &len); if (confd < )
{
printf("bad accept\n");
return;
}else
{
printf("Accept Connection: %d", confd);
} //setnonblocking(confd); //4. epoll_wait
//将新建立的连接添加到EPOLL的监听中
struct epoll_event event;
event.data.fd = confd;
event.events = EPOLLIN|EPOLLET;
epoll_ctl(epollfd, EPOLL_CTL_ADD, confd, &event);
} //读取数据
void RecvData(int fd)
{
int ret;
int recvLen = ; memset(recvBuf, , REVLEN);
printf("RecvData function\n"); if(recvLen != REVLEN)
{
while()
{
//recv数据
ret = recv(fd, (char *)recvBuf+recvLen, REVLEN-recvLen, );
if(ret == )
{
recvLen = ;
break;
}
else if(ret < )
{
recvLen = ;
break;
}
//数据接受正常
recvLen = recvLen+ret;
if(recvLen<REVLEN)
{
continue;
}
else
{
//数据接受完毕
printf("buf = %s\n", recvBuf);
recvLen = ;
break;
}
}
} printf("content is %s", recvBuf);
}