关键词:C++、Linux


References:

Linux下操作。

零、前言

这个项目原作者暂未更新,单凭我自己的话,还没能力续写下去。我的想法是有时间再重新组织一下语言,细化一下每一章的描述。这个项目作为一个 Linux 网络编程的入门项目还是相当不错的,能够了解到 Socket、线程池以及一系列抽象编程思想。如果想继续深入学习网络编程,那路还有很长很长……

——Fingsinz,2024.06.06留

碰巧看到一位大牛在原仓库的基础上专注于功能的实现。

Reference:https://github.com/Wlgls/30daysCppWebServer

——Fingsinz,2024.07.11留

一、从socket开始

socket,被翻译为套接字,它是计算机之间进行通信的一种约定或一种方式。套接字是双方通过网络进行通信的通道。Socket 连接的一边是客户端,另一边是服务器端。一个正常的服务器端能服务多个客户端。

  • 通过 socket 这种约定,一台计算机可以接收其他计算机的数据,也可以向其他计算机发送数据。

1.1 服务端干了什么

在服务器端,需要建立一个 socket 套接字,对外提供一个网络通信接口。

  • 在 Linux 系统中这个套接字仅仅是一个文件描述符,也就是一个int类型的值。
  • 对套接字的所有操作(包括创建)都是最底层的系统调用。
  1. 创建套接字:
1
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
  1. 创建一个 sockaddr_in 结构体并初始化(bzero 函数):
1
2
struct sockaddr_in serverAddr;
bzero(&serverAddr, sizeof(serverAddr));
  1. 设置地址族、IP 地址和端口号:
1
2
3
serverAddr.sin_family = AF_INET;
serverAddr.sin_addr.s_addr = inet_addr("127.0.0.1");
serverAddr.sin_port = htons(1234);
  1. 将 socket 地址与文件描述符绑定:
1
bind(sockfd, (struct sockaddr *)&serverAddr, sizeof(serverAddr));
  1. 使用 listen 函数监听套接字:
1
listen(sockfd, SOMAXCONN);
  1. 服务端想要接受一个客户端连接,需要使用 accept 函数:
1
2
3
4
struct sockaddr_in clientAddr;
socklen_t clientAddrLen = sizeof(clientAddr);
bzero(&clientAddr, sizeof(clientAddr));
int clientSockfd = accept(sockfd, (sockaddr *)&clientAddr, &clientAddrLen);
  1. 输出 socket 连接信息:
1
printf("Client connected: %d!\tIP: %s\tPort: %d\n", clientSockfd, inet_ntoa(clientAddr.sin_addr), ntohs(clientAddr.sin_port));

至此,客户端已经可以通过 IP 地址和端口号连接到这个 socket 端口了。

1.2 客户端如何配合

在客户端,也需要建立一个 socket 套接字。

对于客户端,服务器存在的唯一标识是 IP 地址和端口号。此时需要将套接字绑定到一个 IP 地址和端口上。

  1. 创建套接字:
1
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
  1. 创建一个 sockaddr_in 结构体,并绑定 IP 族、IP 地址和端口号:
1
2
3
4
5
struct sockaddr_in serverAddr;
bzero(&serverAddr, sizeof(serverAddr));
serverAddr.sin_family = AF_INET;
serverAddr.sin_addr.s_addr = inet_addr("127.0.0.1");
serverAddr.sin_port = htons(1234);
  1. 使用 connect 函数进行连接:
1
connect(sockfd, (sockaddr *)&serverAddr, sizeof(serverAddr));

注意,需要先 ./server 运行服务端进行等待,再 ./client 运行客户端进行连接请求。

1.3 该节涉及函数及源代码

  • 相关头文件:
1
2
#include <sys/socket.h> // 创建 socket 所需
#include <arpa/inet.h> // socket 地址结构体所需
  • 创建 socket:
1
2
3
4
5
6
int socket (int __domain, int __type, int __protocol);
/*
* __domain:IP 地址类型,AF_INET 表示 IPv4,AF_INET6 表示 IPv6。
* __type:数据传输方式,SOCK_STREAM 表示流格式、面向链接,多用于 TCP; SOCK_DGRAM 表示数据报格式、无连接,多用于 UDP。
* __protocol:协议,0 表示根据前面两个参数自动推导协议类型。设置为 IPPROTO_TCP 和 IPPROTO_UDP,分别表示 TCP 和 UDP。
*/
  • 初始化:
1
2
3
4
5
6
7
8
void bzero (void *__s, size_t __n);
/*
* __s:指向要清零的内存块的指针。
* __n:要清零的内存块的大小。
* 该函数在头文件 string.h 或 cstring 中。
* Effective C++ - 条款01:视 C++ 为一个语言联邦。写 C 就用 string.h,写 C++ 就用 cstring。
* Effective C++ - 条款04:确定对象被使用前已先被初始化。使用 bzero 进行初始化。
*/
  • 绑定函数:
1
2
3
4
5
6
int bind (int __fd, __CONST_SOCKADDR_ARG __addr, socklen_t __len);
/*
* __fd:文件描述符。
* __addr:sockaddr 参数。
* __len:sockaddr 参数的大小。
*/
  • listen 函数:
1
2
3
4
5
int listen (int __fd, int __n);
/*
* __fd:文件描述符。
* __n:最大监听队列长度,宏定义 SOMAXCONN 为最大值。
*/
  • accept 函数:
1
2
3
4
5
6
7
int accept (int __fd, __SOCKADDR_ARG __addr, socklen_t *__restrict __addr_len);
/*
* __fd:服务端的文件描述符。
* __addr`:sockaddr 参数。
* __addr_len`:指向 sockaddr 参数大小的指针。因为 accept 需要写入客户端 socket 长度,所以需要地址
* 另外,该函数会阻塞当前程序,直到有一个客户端 socket 被接受后程序才会往下执行。
*/
  • connect 函数:
1
2
3
4
5
6
int connect (int __fd, __CONST_SOCKADDR_ARG __addr, socklen_t __len);
/*
* __fd:客户端的文件描述符。
* __addr:sockaddr 参数。
* __len:sockaddr 参数大小。
*/

关于 Socket 的有些地址结构需要清楚:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
 // 通用的套接字地址类型
struct sockaddr {
unsigned short sa_family; // AF_INET 或 AF_INET6
char sa_data[14]; // 无关紧要
};

// 实际使用的套接字地址类型
struct sockaddr_in {
short sin_family; // AF_INET
unsigned short sin_port; // 端口号,大端
struct in_addr sin_addr; // IPv4 地址
char sin_zero[8]; // 无关紧要
};
struct sockaddr_in6 {
uint16_t sin6_family; // AF_INET6
uint16_t sin6_port; // 端口号,大端
uint32_t sin6_flowinfo;
struct in6_addr sin6_addr; // IPv6 地址
uint32_t sin6_scope_id;
};

struct sockaddr_storage {
sa_family_t ss_family; // AF_INET 或 AF_INET6
// 为IPv4和IPv6提供足够的空间
char __ss_pad1[_SS_PAD1SIZE];
int64_t __ss_align;
char __ss_pad2[_SS_PAD2SIZE];
};
  • struct sockaddr_storage 对于IPv4和IPv6都足够大,可以在实际中使用它。

  • struct sockaddr_instruct sockaddr_in6 是IPv4和IPv6的具体结构。

struct sockaddr * 是 socket API 使用的类型,结构本身是无用的。程序员不应操作 sockaddrsockaddr 是给操作系统用的。应使用 sockaddr_in 来表示地址,sockaddr_in 区分了地址和端口,将 struct sockaddr_storage 引用(指针)转换为 struct sockaddr_instruct sockaddr_in6 以初始化/读取结构。

当在 Linux 上调用任何系统调用时,实际上是在调用 libc 中的一个瘦包装器,即一个稳定的 Linux 系统调用接口的包装器。在 Windows 上,套接字 API 遵循相同的 BSD API,但有许多不同的细节。接口来自 OS DLL 而不是系统调用。

该节代码:GithubGitee

二、完善代码,数据读写

上面的代码是基础版的,但要想真正运行使用,需要完善代码,并抓住错误。

  • Effective C++ 中有提到:“别让异常逃离析构函数”(条款08)。

2.1 错误检查处理函数

对于 Linux 系统调用,常见的错误提示方式是使用返回值和设置错误码。

  • 当一个系统调用返回 -1,说明有错误发生。

增加一个错误检查处理函数:

1
2
3
4
5
6
7
8
void errorif(bool condition, const char *errmsg)
{
if (condition)
{
perror(errmsg);
exit(EXIT_FAILURE);
}
}
  • 第一个参数为判断是否发生错误条件,调用 iostream 中的 perror 打印错误。
  • 第二个参数为错误信息。
  • 然后使用 exit 函数让程序退出并返回一个预定义常量 EXIT_FAILURE

使用就很方便:

1
2
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
errorif(sockfd == -1, "socket create error");

对所有函数都进行处理错误:

1
2
3
4
errorif(bind(sockfd, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) == -1, "socket bind error");
errorif(listen(sockfd, SOMAXCONN) == -1, "socket listen error");
errorif(clientfd == -1, "socket accept error");
errorif(connect(sockfd, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) == -1, "socket connect error");
  • 错误的处理是必须的,但处理函数不一定这样写。

2.2 数据读写

当建立 socket 连接后,就可以使用 unistd.h 中的 readwrite 函数进行数据读写。(仅限于 TCP 连接。UDP 连接使用 sendtorecvfrom 函数。)

接下来做一个通信情况:客户端向服务端发送一定数据,然后服务端接收后转发回客户端,客户端将接收的转发数据再进行标准输出。

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
while (true)	// 持续通信
{
char buffer[1024]; // 定义缓冲区
std::cin >> buffer; // 从标准输入读取数据

// 向服务端发送数据
size_t writeLen = write(sockfd, buffer, sizeof(buffer));

// 发送失败处理
if (writeLen == -1)
{
std::cout << "Socket already disconnected!\n";
break;
}

bzero(buffer, sizeof(buffer)); // 清空缓冲区

// 读回数据
size_t readLen = read(sockfd, buffer, sizeof(buffer));

// 读数据问题判断
if (readLen > 0)
{
std::cout << buffer << "\n";
}

else if (readLen == 0)
{
std::cout << "Server socket disconnected!\n";
break;
}

else if (readLen == -1)
{
close(sockfd);
errorif(true, "socket read error");
}
}

服务端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
while (true)
{
// 定义并初始化缓冲区
char buffer[1024];
bzero(buffer, sizeof(buffer));

// 从客户端读取数据
size_t readLen = read(clientfd, buffer, sizeof(buffer));

// 读取数据问题判断
if (readLen > 0)
{
std::cout << "Message from client:" << clientfd << ": " << buffer << "\n";
write(clientfd, buffer, sizeof(buffer)); // 读到后转回客户端
}

else if (readLen == 0)
{
std::cout << "Client " << clientfd << " disconnected\n";
close(clientfd);
break;
}

else if (readLen == -1)
{
close(clientfd);
errorif(true, "socket read error");
}
}

需要注意的是:

  • 服务端和客户端都可以从对方中读写数据。
  • 使用完一个 fd(文件描述符) 后,记得使用 close 函数进行关闭。

2.3 该节涉及函数及源代码

  • 相关头文件:
1
2
#include <unistd.h>	// 读写数据等需要
#include "util.h" // 放置错误处理函数
  • write 函数:
1
2
3
4
5
6
7
ssize_t write (int __fd, const void *__buf, size_t __n);
/*
* __fd:文件描述符。
* __buf:写入缓冲区。
* __n:写入缓冲区大小。
* 返回写入的大小,或-1。
*/
  • read 函数:
1
2
3
4
5
6
7
ssize_t read (int __fd, void *__buf, size_t __nbytes);
/*
* __fd:文件描述符。
* __buf:读取缓冲区。
* __nbytes:读取缓冲区大小。
* 返回读取的大小,-1表示错误,0表示EOF。
*/
  • close 函数:
1
2
3
4
int close (int __fd);
/*
* __fd:文件描述符。
*/

该节代码:GithubGitee

三、高并发使用epoll

之前只写了一个简单的服务器,只能同时处理一个客户端连接。事实上,所有的服务都是高并发的,可以同时为成千上万个客户端提供服务——IO复用。

  • IO 复用和多线程相似,但不是一个概念。
    • IO 复用针对 IO 接口;
    • 多线程针对 CPU。

IO 复用的基本思想是事件驱动,服务器同时保持多个客户端 IO 连接。

  • 当 IO 上有可读或可写事件发生,表示这个 IO 对应的客户端在请求服务器的服务,服务器应当响应。
  • Linux 中, IO 复用使用 select、poll 和 epoll 来实现。
    • epoll 相比 select、poll,表现性能更好,更加高效。

3.1 从select、poll到epoll

从实现原理上来说,select 和 poll 采用的都是轮询的方式,即每次调用都要扫描整个注册文件描述符集合,并将其中就绪的文件描述符返回给用户程序,因此它们检测就绪事件的算法的时间复杂度是 O(n)O(n)。epoll_wait 则不同,它采用的是回调的方式。内核检测到就绪的文件描述符时,将触发回调函数,回调函数就将该文件描述符上对应的事件插人内核就绪事件队列。内核最后在适当的时机将该就绪事件队列中的内容拷贝到用户空间。因此 epoll_wait 无须轮询整个文件描述符集合来检测哪些事件已经就绪,其算法时间复杂度是 O(1)O(1)。详见《Linux高性能服务器编程-游双,第9章》

  • 当活动连接比较多的时候,epoll_wait 的效率未必比 select 和 poll 高,因为此时回调函数被触发得过于频繁。所以 epoll_wait 适用于连接数量多,但活动连接较少的情况。

epoll 是 Linux 特有的 IO 复用函数。

  • 使用一组函数完成任务。
  • 把用户关心的文件描述符上的事件放到内核的一个事件表中。
    • 而不像 select 和 poll 那样每次调用都重复传入文件描述符或事件集。
  • 需要额外的文件描述符来标识内核中的事件表。

创建文件描述符:

1
2
3
4
5
6
#include <sys/epoll.h>
int epoll_create(int size);
/*
* size:内核事件表大小。
* 返回文件描述符,用作其他所有 epoll 系统调用的第一个参数,指定访问的内核事件表。
*/

操作 epoll 的内核事件表:

1
2
3
4
5
6
7
8
9
10
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
/*
* epfd:文件描述符。
* op:操作类型。有 EPOLL_CTL_ADD(往事件表中注册 fd 上的事件)、
* EPOLL_CTL_DEL(修改 fd 上的注册事件)、
* EPOLL_CTL_MOD(删除 fd 上的注册事件) 三种。
* fd:文件描述符。
* event:指定事件,是 epoll_event 结构体指针。
* 返回值:成功返回 0,失败返回 -1 并设置错误码。
*/

而关于 epoll_event 结构体的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
struct epoll_event
{
__uint32_t events; /*epoll事件*/
epoll_data_t data; /*用户数据*/
}

typedef union epoll_data
{
void *ptr; // 指定与fd相关的用户数据
int fd; // 指定事件所从属的目标文件描述符
uint32_t u32;
uint64_t u64;
} epoll_data_t;

epoll 系列系统调用的主要接口是 epoll_wait 函数,它在一段超时时间内等待一组文件描述符上的事件:

1
2
3
4
5
6
7
8
9
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
/*
* 该函数如果检测到事件,就将所有就绪的事件从内核事件表(epfd指定)中复制到events中。
* epfd:文件描述符。
* events:事件数组。
* maxevents:监听事件数组大小。
* timeout:超时时间,单位为毫秒。
* 返回值:成功返回就绪事件个数,失败返回 -1 并设置错误码。
*/

epoll 对文件描述符的操作有两种:

  • LT(Level Trigger,电平触发)模式
    • 默认的工作模式,相当于效率较高的 poll。
    • 对于采用 LT 工作模式的文件描述符,当 epoll_wait 检测到其上有事件发生并将此事件通知应用程序后,应用程序可以不立即处理该事件。这样,当应用程序下一次调用 epoll,_wait 时,epoll_wait 还会再次向应用程序通告此事件,直到该事件被处理。
  • ET(Edge Trigger,边沿触发)模式
    • 对于采用ET工作模式的文件描述符,当 epoll_wait 检测到其上有事件发生并将此事件通知应用程序后,应用程序必须立即处理该事件,因为后续的epoll_wait 调用将不再向应用程序通知这一事件。可见,ET 模式在很大程度上降低了同一个 epoll 事件被重复触发的次数,因此效率要比 LT 模式高。
    • ET 模式必须搭配非阻塞式 socket 使用。

epoll 的事件有:

  • EPOLLIN:表示对应的文件描述符可读(包括对端 socket 正常关闭);
  • EPOLLOUT:表示对应的文件描述符可写;
  • EPOLLPRI:表示对应的文件描述符有紧急的数据可读(表示有带外数据到来);
  • EPOLLERR:表示对应的文件描述符发生错误;
  • EPOLLHUP:表示对应的文件描述符被挂断;
  • EPOLLET:将 epoll 设为边缘触发模式。
  • EPOLLONESHOT:只监听一次事件,当监听完事件后,如果还需要继续监听这个 socket 的情况下,需要再次把这个 socket 加到 epoll 队列里。

3.2 将服务器改写成epoll版本

在创建了服务器 socket fd 后,将这个 fd 添加到 epoll。

  • epoll 监听事件的描述符会放在一棵红黑树上,将要监听的 IO 口放入 epoll 红黑树中,就可以监听该 IO 上的事件。
  • 只要这个 fd 上发生可读事件,表示有一个新的客户端连接。
  • 然后 accept 这个客户端并将客户端的 socket fd 添加到 epoll,epoll 会监听客户端 socket fd 是否有事件发生,如果发生则处理事件。

所以服务器大概的步骤如下:

  1. 创建 epoll,同时定义事件数组。
1
2
3
4
5
6
// 创建 epoll
int epfd = epoll_create1(0);
errorif(epfd == -1, "epoll create error");
// 定义事件数组
struct epoll_event events[MAX_EVENTS], ev;
bzero(&events, sizeof(events));
  1. 将要监听的 IO 口放入 epoll 中。
1
2
3
4
5
ev.data.fd = sockfd;			// 该 IO 口为服务器 socket fd
ev.events = EPOLLIN; // 可读
setnonblocking(sockfd); // 设置 sockfd 为非阻塞
// 将服务器 socket fd 注册到 epoll
epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);
  1. 不断监听 epoll 上的事件并处理。

  2. 如果监听发生的事件是服务器 socket fd 上的事件,表示有一个新的客户端连接。

1
2
3
4
5
6
7
8
9
10
11
12
if (events[i].data.fd == sockfd)
{
// 接收客户端信息

// 新增监听
bzero(&ev, sizeof(ev));
ev.data.fd = clientfd; // 该 IO 口为客户端 socket fd
ev.events = EPOLLIN | EPOLLET; // 客户端连接使用 ET 模式
setnonblocking(clientfd); // ET 需要搭配非阻塞式 socket 使用
// 将客户端 socket fd 注册到 epoll
epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev);
}
  1. 如果监听发生的事件是客户端,并且是可读事件,表示有客户端发送消息:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
else if (events[i].events & EPOLLIN)
{
while (true) // 非阻塞 IO,需要不断读取,直至完毕
{
ssize_t bytesRead = read(events[i].data.fd, buf, sizeof(buf));

// 正常读取数据
if (bytesRead > 0)
// ...

// 客户端正常中断,继续读取
else if (bytesRead == -1 and errno == EINTR)
//...

// 非阻塞 IO,这个条件表示数据全部读取完毕
else if (bytesRead == -1 and ((errno == EAGAIN) or (errno == EWOULDBLOCK)))
// ...

// EOF 事件,一般表示客户端断开连接
else if (bytesRead == 0)
// ...
}
}

该节代码:GithubGitee

四、封装成类,程序模块化

4.1 将socket和InetAddress封装成类

当新建服务器 socket 时,需要完成绑定 IP 地址、监听、接受客户端连接等任务,这些任务都封装成 Socket 类来完成。希望简化成以下操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 新建服务器 socket
Socket *serverSocket = new Socket();
// 实例化 IP 地址
InetAddress *serverAddr = new InetAddress("127.0.0.1", 1234);

// 绑定 IP 地址
serverSocket->bind(serverAddr);
// 监听
serverSocket->listen();

// 实例化一个客户端地址
InetAddress *clientAddr = new InetAddress();
// 接受一个客户端连接
Socket *clientSocket = new Socket(serverSocket->accept(clientAddr));

4.2 将epoll封装成类

对于 epoll,希望简化操作,封装成类后:

1
2
3
4
5
6
7
8
9
10
11
// 实例化 epoll
Epoll *ep = new Epoll();
// 将要监听的 IO 口放入 epoll
ep->epoll_add(serverSocket->getFd(), EPOLLIN | EPOLLET);

while(true)
{
std::vector<epoll_event> events = ep->poll();
for (auto &ev : events)
// 处理事件
}

4.3 目录结构及源代码

目录结构如下:

1
2
3
4
5
6
7
8
9
10
client.cpp
server.cpp
util.h
util.cpp
Socket.h
Socket.cpp
InetAddress.h
InetAddress.cpp
Epoll.h
Epoll.cpp

该节代码:GithubGitee

五、向着Reactor模式转变

5.1 Reactor和Proactor

Reactor 翻译过来的意思是「反应堆」,这里的反应指的是「对事件反应」。

  • 当来了一个事件,Reactor 就有相对应的反应/响应。

事实上,Reactor 模式也叫 Dispatcher 模式,我觉得这个名字更贴合该模式的含义,即 I/O 多路复用监听事件。

  • 收到事件后,根据事件类型分配(Dispatch)给某个进程 / 线程。

Reactor 模式主要由 Reactor 和处理资源池这两个核心部分组成。

  • Reactor 负责监听和分发事件,事件类型包含连接事件、读写事件;
  • 处理资源池负责处理事件,如 read -> 业务逻辑 -> send;

Reactor 模式是灵活多变的,可以应对不同的业务场景,灵活在于:

  • Reactor 的数量可以只有一个,也可以有多个;
  • 处理资源池可以是单个进程 / 线程,也可以是多个进程 / 线程;

有 3 个方案都是比较经典的,且都有应用在实际的项目中:

  • 单 Reactor 单进程 / 线程;
  • 单 Reactor 多线程 / 进程;
  • 多 Reactor 多进程 / 线程;

方案具体使用进程还是线程,要看使用的编程语言以及平台有关:

  • Java 语言一般使用线程,比如 Netty;
  • C 语言使用进程和线程都可以,例如 Nginx 使用的是进程,Memcache 使用的是线程。

Reactor 是非阻塞同步网络模式,感知的是就绪可读写事件。在每次感知到有事件发生(比如可读就绪事件)后,就需要应用进程主动调用 read 方法来完成数据的读取,也就是要应用进程主动将 socket 接收缓存中的数据读到应用进程内存中,这个过程是同步的,读取完数据后应用进程才能处理数据。

Proactor 是异步网络模式, 感知的是已完成的读写事件。在发起异步读写请求时,需要传入数据缓冲区的地址(用来存放结果数据)等信息,这样系统内核才可以自动帮我们把数据的读写工作完成,这里的读写工作全程由操作系统来做,并不需要像 Reactor 那样还需要应用进程主动发起 read/write 来读写数据,操作系统完成读写工作后,就会通知应用进程直接处理数据。

因此,Reactor 可以理解为「来了事件操作系统通知应用进程,让应用进程来处理」,而 Proactor 可以理解为「来了事件操作系统来处理,处理完再通知应用进程」。这里的「事件」就是有新连接、有数据可读、有数据可写的这些 I/O 事件这里的「处理」包含从驱动读取到内核以及从内核读取到用户空间。

作者:小林coding
链接:https://www.zhihu.com/question/26943938/answer/1856426252
来源:知乎著作权归作者所有。

详细请参考游双《Linux高性能服务器编程》第八章第四节、陈硕《Linux多线程服务器编程》第六章第六节。

接下来要将服务器向着 Reactor 模式转变:

  1. 首先将整个服务器抽象成一个 Server 类,这个类中有一个 main-Reactor,里面的核心是一个 EventLoop,这是一个事件循环;
  2. 添加需要监听的事务到这个事件循环内,每次有事件发生时就会通知,在程序中返回给 Channel(自封装的类),然后根据不同的描述符、事件类型以回调函数的方式进行处理。

5.2 加入Channel类

面对服务器许多服务时,不同的连接类型也将决定不同的处理逻辑,仅仅通过一个文件描述符来区分显然会很麻烦。希望得到文件描述符的更多消息。

  • epoll 的 epoll_event 结构体中,data 字段可以放一个 void * 类型的指针,用来保存更多信息。
1
2
3
4
5
6
7
8
9
10
11
12
13
typedef union epoll_data
{
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;

struct epoll_event
{
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
} __EPOLL_PACKED;
  • epoll 中的 data 是一个联合类型:
    • 可以存储一个指针,指向任何一个地址块的内容;
    • 可以是一个类的对象,就此将一个文件描述符封装成一个 Channel 类,一个 Channel 类始终负责一个文件描述符。对不同的服务、不同的事件类型,都可以在类中进行处理。

设计 Channel 类,核心成员如下:

1
2
3
4
5
6
7
8
9
10
class Channel
{
private:
EventLoop *loop;
int fd;
uint32_t events;
uint32_t revents;
bool isEpoll;
std::function<void()> callback;
}
  • loop:指向与之关联的事件循环的指针。
  • fd:Channel 负责的文件描述符。
  • events:表示希望监听这个文件描述符的哪些事件。
  • revents:表示在 epoll 返回该 Channel 时文件描述符正在发生的事件。
  • isEpoll:表示当前 Channel 是否已经添加到 epoll 红黑树中,区分使用 EPOLL_CTL_ADD 还是 EPOLL_CTL_MOD
  • callback:发生事件时执行的回调函数。

添加 Channel 类可以更加方便简单、多样化地处理 epoll 中发生的事件。同时脱离了底层,将 epoll、文件描述符和事件进行了抽象,形成了事件分发的模型,这也是 Reactor 模式的核心。

5.3 加入EventLoop类

EventLoop 类的定义如下:

1
2
3
4
5
6
7
8
9
10
class EventLoop {
private:
Epoll *ep;
bool quit;
public:
EventLoop();
~EventLoop();
void loop();
void updateChannel(Channel*);
};
  • ep:指向Epoll类实例的指针;
  • quit:指示程序是否应该退出;
  • loop():事件循环函数,调用开始事件驱动,即原来调用 epoll_wait 函数的死循环;
  • updateChannel():更新 Channel。

将需要监听的事务加入到事件循环中,每次有事件发生就会通知,返回到 Channel,然后根据不同的描述符、事件类型以回调函数方式进行处理:

1
2
3
4
5
6
7
8
9
10
11
12
void EventLoop::loop()
{
while (!quit)
{
// 使用epoll实例轮询事件
std::vector<Channel *> channels = ep->poll();

// 处理每个Channel的事件
for (Channel *channel : channels)
channel->handleEvent();
}
}

5.4 加入Server类

服务器类 Server 的核心成员如下:

1
2
3
4
5
6
7
8
9
10
11
class Server
{
private:
EventLoop *loop;
public:
Server(EventLoop*);
~Server();

void handleReadEvent(int);
void newConnection(Socket *serv_sock);
};
  • loop:事件循环对象。
  • handleReadEvent():处理读事件。
  • newConnection():处理新连接。

之后启动服务器的操作抽象为:

1
2
3
EventLoop *loop = new EventLoop();
Server *server = new Server(loop);
loop->loop();

这个版本服务器内只有一个 EventLoop,当其中有可读事件发生时,可以拿到该描述符对应的 Channel

在新建 Channel 时,根据 Channel 描述符的不同分别绑定了两个回调函数:

  • newConnection() 函数被绑定到服务器socket上;
    • 如果服务器 socket 有可读事件,Channel 里的 handleEvent() 函数实际上会调用 Server 类的 newConnection() 新建连接。
  • handlrReadEvent() 被绑定到新接受的客户端socket上。
    • 如果客户端 socket 有可读事件,Channel 里的 handleEvent() 函数实际上会调用 Server 类的 handleReadEvent() 响应客户端请求。

至此,根据抽象出的 EventLoopChannel,构成了事件驱动模型。这两个类和服务器核心 Server 已经没有任何关系,经过完善后可以被任何程序复用,达到了事件驱动的设计思想,现在的服务器也可以看成一个最简易的 Reactor 模式服务器。

需要注意的是,目前该服务器的内存管理一塌糊涂。

该节代码:GithubGitee

六、把服务器的接受抽象化

6.1 抽象化接受

服务器中,对于每一个事件,首先都是调用 accept() 函数去接受一个 TCP 连接,然后把 Socket 文件描述符添加到 epoll。当这个 IO 口有事件发生时,对该连接提供相应的服务。

分离接受连接这个功能,添加 Acceptor 类。

6.2 Acceptor 类

Acceptor 类应该有以下特点:

  • 类中有一个 Socket fd,就是服务器监听的 Socket fd,每一个
    Acceptor 对象都对应一个 Socket fd。
  • 类存在于事件驱动 EventLoop 类中。
  • 类也通过一个 Channel 负责分发到 epoll,该 Channel 的事件处理函数 handleEvent() 会调用 Acceptor 类中的连接函数进行新建一个 TCP 连接。

将新建连接的逻辑就在 Acceptor 类中。但逻辑上新 Socket 建立后就和之前的监听的服务器 Socket 没有任何关系了。

新的 TCP 连接应该由 Server 类来创建并管理生命周期,而不是 Acceptor。并且将一部分代码放在 Server 类里也并没有打破服务器的通用性,因为对于所有的服务,都要使用 Acceptor 来建立连接。

  • Acceptor 类的新建连接功能是在 Server 类中实现的。

可以使用 std::functionstd::bind、右值引用、std::move 等实现函数回调。

定义该类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class Acceptor
{
private:
// 用于事件处理的EventLoop指针
EventLoop *loop;

// 用于处理套接字操作的套接字指针
Socket *sock;

// 用于存储地址信息的指针
InetAddress *addr;

// 用于接受连接的 Channel 指针
Channel *acceptChannel;

public:
// 定义一个新建连接的回调函数
std::function<void(Socket *)> newConnectionCallback;

public:
Acceptor(EventLoop *_loop);
~Acceptor();

/**
* @brief 接受新连接
*/
void acceptConnection();

/**
* @brief 设置新连接的回调函数。
* @param _callback 为新连接设置的回调函数。
*/
void setNewConnectionCallback(std::function<void(Socket *)> _callback);
};

抽象后,Server类的变化如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 之前
class Server
{
private:
// 指向EventLoop对象的指针
EventLoop *loop;

// 服务器套接字
Socket *serverSock;

// 服务器地址
InetAddress *serverAddr;

// 服务器通道
Channel *serverChannel;

// 保存客户端的套接字
std::vector<std::pair<Socket *, InetAddress *>> clients;

// ...
}

// 之后
class Server
{
private:
// 指向EventLoop对象的指针
EventLoop *loop;

// 指向Acceptor对象的指针
Acceptor *acceptor;

// ...
}

该节代码:GithubGitee

七、把TCP连接抽象化

7.1 抽象化连接

对于 TCP 协议,在三次握手新建连接后,该连接会一直存在直至四次挥手断开连接。

那么把这个连接也抽象化,抽象成 Connection 类。

7.2 Connection 类

Connection 类应该有以下特点:

  • 类存在于事件驱动类中;
  • 类的 Socket fd 就是客户端的 Socket fd,每一个
    Connection 对象都对应一个 Socket fd。
  • 类也通过一个 Channel 负责分发到 epoll,该 Channel 的事件处理函数 handleEvent() 会调用 Connection 类中的事件处理函数进行响应客户端请求。

Connection 类与 Acceptor 类十分相似,它们都由 Server 管理,由一个 Channel 分发到 epoll,通过回调函数处理响应事件。

一个高并发服务器一般只有一个 Acceptor(可以有多个),但会同时有成千上万个 TCP 连接,也就是 Connection 的实例。

Connection 类的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class Connection
{
private:
EventLoop *loop;
Socket *sock;
Channel *channel;
std::function<void(Socket *)> deleteConnectionCallback;

public:
Connection(EventLoop *_loop, Socket *_sock);
~Connection();

// @brief 回显sockfd发来的数据
void echo(int sockfd);

// @brief 设置删除连接时要调用的回调函数
void setDeleteConnectionCallback(std::function<void(Socket *)> _callback);
};

7.3 改写 Server 类

Server 类的核心变成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class Server
{
private:
// 指向EventLoop对象的指针
EventLoop *loop;

// 指向Acceptor对象的指针
Acceptor *acceptor;

// 存储连接及其相应的文件描述符
std::map<int, Connection *> connections;

public:
Server(EventLoop *_loop);
~Server();

// @brief 处理客户端请求,暂时没有
// void handleReadEvent(int fd);

// @brief 处理与所提供套接字的新连接
void newConnection(Socket *_socket);

// @brief 断开与提供的套接字关联的连接
void deleteConnection(Socket *_socket);
};
  • 通过 Map 映射将众多连接保存起来,键为该连接客户端的 socket fd,值为指向该连接的指针。
  • 该连接客户端的 socket fd 通过一个 Channel 类分发到 epoll,该 Channel 的事件处理回调函数 handleEvent() 绑定为 Connection 的处理函数,这样每当该连接的 socket fd 上发生事件,就会通过 Channel 调用具体连接类的处理函数。

此处将新建连接的功能放回到 Acceptor 类中管理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void Acceptor::acceptConnection()
{
// 创建一个新的InetAddress对象来存储客户端地址信息
InetAddress *clientAddr = new InetAddress();

//通过使用客户端地址接受来自服务器套接字的连接,创建一个新的Socket对象
Socket *clientSock = new Socket(sock->accept(clientAddr));

// 打印有关新客户端连接的信息
std::cout << "New client " << clientSock->getFd() << ": " <<
inet_ntoa(clientAddr->addr.sin_addr) << " : " << ntohs(clientAddr->addr.sin_port) << "\n";

clientSock->setNonBlocking();

newConnectionCallback(clientSock);
delete clientAddr;
}

Server 类变得只负责管理 AcceptorConnection 类,其成员函数也集中在管理 AcceptorConnection 类中。改写后的 Server 类代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void Server::newConnection(Socket *_socket)
{
Connection *conn = new Connection(loop, _socket);
std::function<void(Socket *)> cb = std::bind(&Server::deleteConnection, this, std::placeholders::_1);
conn->setDeleteConnectionCallback(cb);
connections[_socket->getFd()] = conn;
}

void Server::deleteConnection(Socket *_socket)
{
Connection *conn = connections[_socket->getFd()];
connections.erase(_socket->getFd());
delete conn;
}
  • 当有新的 TCP 连接时,实例化一个 Connection 对象,设置其删除时的回调函数,并放置在 connections 中管理。
    • 目前该服务器的唯一功能——接受客户端的信息并发回,封装成 Connection 类的 echo 函数,在 Connection 构造时绑定给 Channel 类的事件回调函数,由 Channel 实例遇到事件时触发。
  • 当有 TCP 连接断开时,从 connections 中删除该连接,并释放对象。
    • 由于 Connection 的生命周期由 Server 进行管理,所以也应该由 Server 来删除连接

至此,服务器到了一个比较重要的阶段,服务器最核心的几个模块都已经抽象出来,一个完整的单线程服务器设计基本完成。

该节代码:GithubGitee

八、闲来无事,整个缓冲区

8.1 引入缓冲区

此节引入一个最简单、最基本的缓冲区,完善改进之前的服务器。

没有使用缓冲区时,服务器回送信息的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void Connection::echo(int sockfd)
{
char buf[READ_BUFFER];

while (true)
{
bzero(buf, sizeof(buf));
ssize_t readLen = read(sockfd, buf, sizeof(buf));

if (readLen > 0)
{
std::cout << buf << "\n";
write(sockfd, buf, sizeof(buf));
}

// ...
}
}
  • 这是非阻塞式 socket IO 的读取,缓冲区大小为 1024,表示每次 TCP 缓冲区读取 1024 大小的数据到缓冲区,然后发送到客户端。
  • 只能以 1024 地读,当数据没有 1024,用空值补满。

所以,封装一个缓冲区,为每一个 Connection 类分配一个读缓冲区和写缓冲区:

  • 从客户端读来的数据存放在都缓冲区。

8.2 Buffer类

Buffer 类的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#pragma once

#include <string>

class Buffer
{
private:
std::string buf;

public:
Buffer() = default;
~Buffer() = default;

// @brief 向当前字符串追加一个字符串
void append(char const *str, int _size);

// @brief 返回当前缓冲区字符串大小
ssize_t size();

// @brief返回指向底层字符串数据的指针
char const *c_str();

// @brief 清空当前缓冲区字符串
void clear();

// @brief 从控制台获取输入
void getline();
};

使用如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/* src/Connection.cpp */
void Connection::echo(int sockfd)
{
char buf[1024];
while (true)
{
bzero(buf, sizeof(buf));
ssize_t readLen = read(sockfd, buf, sizeof(buf));
if (readLen > 0)
{
readBuffer->append(buf, readLen); // 缓冲区追加
}
else if (readLen == -1 and errno == EINTR)
continue;
else if (readLen == -1 and ((errno == EAGAIN) or (errno == EWOULDBLOCK)))
{ // 从缓冲区中读取数据,同时进行回写
std::cout << readBuffer->c_str() << "\n";
errorif(write(sockfd, readBuffer->c_str(), readBuffer->size()) == -1, "***");
readBuffer->clear();
break;
}
else if (readLen == 0)
{
deleteConnectionCallback(sock);
break;
}
}
}

虽然仍有 char buf[1024] 这样的低级缓冲区,用于系统调用 read() 的读取,但这个缓冲区大小无所谓,设置为1到设备TCP缓冲区的大小都可以。

  • 太大导致资源浪费,单词读取速度低;
  • 太小导致读取次数增多。

以上代码会把 socket IO 上的可读数据全部读取到缓冲区,缓冲区大小就等于客户端发送的数据大小。全部读取完成之后,可以构造一个写缓冲区、填好数据发送给客户端。

  • 由于是echo服务器,所以这里使用了相同的缓冲区。

8.3 其他方面的改进

  • 优化 InetAddress 类,将成员私有化,提供访问方法。(—— src/InetAddress.hsrc/InetAddress.cpp

  • Socket 类添加 connect 方法,方便 client.cpp 调用。(—— src/Socket.hsrc/Socket.cpp

  • 结合现有的模块,改进 client 文件。(—— client.cpp

  • 整体改进了了输出信息提示。

服务器端

客户端

该节代码:GithubGitee

九、线程池啊线程池

9.1 为什么加入线程池

当前的代码是单线程模式,所有 fd 上的事件都由一个线程(主线程,EventLoop线程)处理。

  • 假设响应一个事件需要 1s,那么如果有 1000 个事件,那么主线程就要等待很久。
  • 这不现实。

引入多线程,当发现 socket fd 有事件时,应该分发一个工作线程。

  • 由这个工作线程处理 fd 上的事件。

再者,每一个 Reactor 只应该负责事件分发而不负责事件处理。

9.2 如何设计线程池

最简单的想法就是,每次遇到一个新的任务,就开一个新线程去执行。

  • 这种方式虽然简单,但是太粗暴了。
  • 我们的机器是有上限的,不可能无限开新线程。

那么,可以固定一个线程的数量。启动固定数量的工作线程,然后将任务添加到任务队列,工作线程不断取出任务队列的任务执行。

设计线程池还需要注意:

  1. 多线程环境下任务队列的读写应该考虑互斥锁。
  2. 当任务队列为空时,CPU 不应该一直轮询耗费 CPU 资源。

此处解决方法如下:

  1. std::mutex 对任务队列进行加锁解锁。
  2. std::condition_variable 使用条件变量。

9.3 线程池用到的语法知识

关于互斥锁:mutex头文件 - cppreference

  • mutex 类是能用于保护共享数据免受从多个线程同时访问的同步原语。
  • lock():成员函数,锁定互斥体,若互斥体不可用则阻塞。位于头文件 <mutex>
    • 通常不直接调用 lock()
    • std::unique_lockstd::lock_guard 管理排他性锁定。
    • unique_lock 类是一种通用互斥包装器,允许延迟锁定、有时限的锁定尝试、递归锁定、所有权转移和与条件变量一同使用。
    • 构造函数:explicit unique_lock( mutex_type& m );,通过调用 m.lock() 锁定关联互斥体。
    • 析构函数:若拥有关联互斥体且获得了其所有权,则解锁互斥体。
  • try_lock():成员函数,尝试锁定互斥体,若互斥体不可用则返回 false。位于头文件 <mutex>
  • unlock():成员函数,解锁互斥体。位于头文件 <mutex>

关于线程等待条件:condition_variable头文件 - cppreference

  • std::condition_variable(线程等待条件) 是与 std::mutex 一起使用的同步原语。
  • 它能用于阻塞一个线程,或同时阻塞多个线程,直至另一线程修改共享变量(条件)并通知 std::condition_variable
  • 有意修改变量的线程必须:
    1. 获得 std::mutex(常通过 std::lock_guard
    2. 在保有锁时进行修改
    3. std::condition_variable 上执行 notify_onenotify_all(可以释放锁后再通知)
  • 任何有意在 std::condition_variable 上等待的线程必须:
    1. 在用于保护共享变量的互斥体上获得 std::unique_lock<std::mutex>
    2. 执行下列之一:
      • 检查条件,是否为已更新且已提醒的情况。
      • 调用 std::condition_variablewaitwait_forwait_until(原子地释放互斥体并暂停线程的执行,直到条件变量被通知,时限过期,或发生虚假唤醒,然后在返回前自动获得互斥体)。
      • 检查条件,并在未满足的情况下继续等待。
  • wait():成员函数,阻塞当前进程,直至条件变量被唤醒。位于头文件 <condition_variable>
    • 类似还有wait_forwait_until。不多说,自行查阅。
  • notify_one():成员函数,通知一个等待的线程。位于头文件 <condition_variable>
  • notify_all():成员函数,通知所有等待的线程。位于头文件 <condition_variable>

9.4 线程池类

线程池类代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class ThreadPool
{
private:
// 线程池中的线程
std::vector<std::thread> threads;
// 要执行的函数
std::queue<std::function<void()>> tasks;
// 声明互斥锁以同步对任务队列的访问
std::mutex tasksMtx;
// 声明在线程之间进行协调的条件变量
std::condition_variable cv;
// 指示线程停止的标志
bool stop;

public:
ThreadPool(int size = 10);
~ThreadPool();

// @brief 加入任务到任务队列中
void add(std::function<void()> task);
};

线程池的构造函数设计为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
ThreadPool::ThreadPool(int size) : stop(false)
{
for (int i = 0; i < size; ++ i)
{
threads.emplace_back(std::thread([this] ()
{
while (true)
{
std::function<void()> task;
{// 使用作用域生命期解锁 std::mutex,而不调用unlock()
std::unique_lock<std::mutex> lock(tasksMtx);
// 当任务队列不为空或线程池停止时停止等待(阻塞)
cv.wait(lock, [this] (){
return stop or !tasks.empty();
});
if (stop and tasks.empty()) // 任务队列为空且线程池停止,退出循环
return;
task = tasks.front();
tasks.pop();
}
task(); // 执行任务
}
}));
}
}
  1. 初始线程池大小为 size,创建线程并让每个线程等待将任务添加到任务队列中。
  2. 使用 std::unique_lock 锁定任务互斥锁以防止并发访问,并将其置于局部作用域,当离开作用域时,它将自动解锁互斥锁。
  3. 当添加任务时,线程从队列中获取任务并执行它。线程将继续执行任务,直到线程池停止。

析构函数设计为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ThreadPool::~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(tasksMtx);
stop = true;
}

cv.notify_all(); // 通知所有等待的线程线程池正在停止
for (std::thread &th : threads)
{ // 检查线程是否可接合
if (th.joinable())
th.join(); // 阻塞当前线程,直到指定线程完成其执行
}
}
  • 在线程池析构时,需要注意将已经添加的所有任务执行完,最好不采用外部的暴力kill、而是让每个线程从内部自动退出,具体实现参考源代码。
  1. 在上锁的情况下,把线程池的停止状态设置为true,然后通知所有等待的线程线程池正在停止。
  2. 然后,等待所有线程完成其执行。

加入线程池后,当 Channel 类有事件需要处理时,将这个事件处理添加到线程池,主线程 EventLoop 就可以继续进行事件循环,而不在乎某个 socket fd 上的事件处理。

该节代码:GithubGitee

十、有了线程池之后的考虑

10.1 完善线程池

上一节添加的线程池是最简单的线程池,还存在许多问题,比如:

  • 任务队列的添加、取出都会有不必要的拷贝操作;
  • 线程池只接受 std::function<void> 类型的参数,所有函数参数都要事先使用 std::bind(),并且无法得到返回值。

解决方法一一对应:

  • 使用右值移动去避免拷贝操作。
  • 改写 add() 函数,希望使用前不需要手动绑定参数,直接传递并且可以得到任务的返回值。

10.2 完善线程池用到的语法知识

关于模板编程:模板 - MSLearn理解C++模板 - 知乎

  • 简单来说,模板编程就是提供了一套模具,对于不同的数据类型都可以适用于这套模具。
  • 函数模板的结构一般如下:
1
template <typename T> 返回类型 函数名(参数列表){ /*函数的主体*/ }
  • 类模板结构一般如下:
1
template <class T> class 类名 {}
  • 变长参数模板:参数个数和类型都可能发生变化的模板。
    • 使用模板形参包实现。
    • 模板形参包是可以接受 0 个或者 n 个模板实参的模板形参,至少有一个模板形参包的模板就可以称作变参数模板。
    • 模板形参包有:非类型模板形参包、类型模板形参包、模板模板形参包三种。
    • 此节使用类型模板形参包:表示该可变形参包可以接受无限个不同的实参类型。
1
typename... Args 或 class ... Args

关于右值和移动 std::moveC++引用和右值引用 - CSDN【C++】C++11——左右值|右值引用|移动语义|完美转发一文读懂C++右值引用和std::move - 知乎

  • C++11 后增加了移动语义,出现了移动构造、移动赋值等。
  • 简单来说,移动语义的出现,可以把旧对象所拥有的资源交给新对象,而旧对象什么都没有了。
  • 右值引用的出现也是为了移动语义。

关于完美转发 std::forward

  • std::move 类似,与 std::move相比,它更强大,move 只能转出来右值,forward 都可以。
  • std::forward<T>(u) 有两个参数:Tu
    • T 为左值引用类型时,u 将被转换为 T 类型的左值;
    • 否则 u 将被转换为 T 类型右值。

关于 std::futurefuture - cppreference

  • 类模板 std::packaged_task 可以包装任何可调用 (Callable) 目标(函数、lambda 表达式、bind 表达式或其他函数对象),使得能异步调用它。其返回值或所抛异常被存储于能通过 std::future 对象访问的共享状态中。
    • 成员函数 get_future(),返回与 *this 共享同一共享状态的 future,每个 packaged_task 对象只能调用一次。
  • 类模板 std::futurefuture 对象提供访问异步操作结果的机制,从异步任务中返回结果。
  • 类模板 std::future 提供访问异步操作结果的机制:
    • (通过 std::asyncstd::packaged_taskstd::promise 创建的)异步操作能提供一个 std::future 对象给该异步操作的创建者。
    • 然后,异步操作的创建者可以使用多个方法查询、等待或从 std::future 提取值。若异步操作尚未提供值,则这些方法可能阻塞。
    • 当异步操作准备好发送结果给创建者时,它可以修改与创建者的 std::future 相链接的共享状态(例如 std::promise::set_value)。

10.3 再修修补补

除了上面线程池的部分有修改,以下部分也有修改:

Channel 部分:

  • 新增标记位和是否使用线程池的函数;
  • 对于处理事件区分了读事件和写事件分别的回调函数;
  • 新增可选择性 epoll ET 模式或 epoll LT 模式;

Acceptor 部分:因为接受连接处理时间短、报文数据小,也不会有同时到达的新连接,所以

  • Acceptor 的 socket fd (服务器监听 socket)使用阻塞式:
  • Acceptor 从 epoll ET 模式改为 epoll LT 模式,建立好连接后处理事件 fd 读写用 ET 模式。
  • Acceptor 的连接建立不适用线程池,建立好连接后处理事件使用线程池。

Connection 部分:

  • 新增 send() 函数,独立发送数据。
  • 修改 deleteConnectionCallback() 函数,参数类型改为 int

Epoll 部分:

  • 新增 deleteChannel() 函数,用于删除 Channel。

Server 部分:

  • 新增 deleteConnection() 函数。

更多细节上的变化(可能有部分错误处理、变量变化)可比较前一天的文件。

服务器中还可能有潜在的bug。

最后,添加测试连接的程序 test.cpp,使用命令 make t 编译,使用如下:

1
./test -t 1000 -m 10 -w 100
  • -t 表示线程数量,此处为 1000 个线程进行服务器连接;
  • -m 表示每个线程的回显次数,此处为每个线程回显 10 次;
  • -w 表示每个线程的等待时间,可以测试最大连接数,可以不设置。

该节代码:GithubGitee

十一、改写成主从Reactor多线程模式

11.1 什么是主从Reactor多线程模式

现在实现的服务器多线程 Reactor 模式,是给每一个 Channel 的任务分配一个线程执行。但目前的线程池对象置于 EventLoop 中,而不是由服务器类 Server 类管理。

主从 Reactor 多线程模式是大多数高性能服务器采用的模式。

陈硕《Linux多线程服务器编程》书中的 one loop per thread 模式。

该模式的特点有:

  • 服务器一般只有一个 main Reactor,有多个 sub Reactor。
  • 服务器管理一个线程池,每一个 sub Reactor 由一个线程来负责 Connection 上的事件循环,事件执行也在这个线程中完成。
  • main Reactor 只负责 Acceptor 建立新连接,然后将这个连接分配给一个 sub Reactor。

11.2 代码上的变化

根据主从 Reactor 多线程模式的特点,将服务器类重写如下:

1
2
3
4
5
6
7
8
9
10
11
12
class Server
{
private:
EventLoop *mainReactor; // 只负责接受连接,然后分发给一个subReactor
Acceptor *acceptor; // 连接接受器
std::vector<EventLoop *> subReactors; // 负责处理事件循环
std::map<int, Connection *> connections; // 存储连接及其相应的文件描述符
ThreadPool *threadPool; // 线程池

public:
// ...
}

在有一个新连接到来时,采用随机调度策略分配给一个 subReactor:

1
2
int random = _socket->getFd() % subReactors.size();
Connection *conn = new Connection(subReactors[random], _socket);
  • 这种调度算法适用于每个socket上的任务处理时间基本相同,可以让每个线程均匀负载。但事实上,不同的业务传输的数据极有可能不一样,也可能受到网络条件等因素的影响,极有可能会造成一些 subReactor 线程十分繁忙,而另一些 subReactor 线程空空如也。此时需要使用更高级的调度算法,如根据繁忙度分配,或支持动态转移连接到另一个空闲 subReactor 等。

调度问题是个很有趣的问题,会直接影响服务器的效率和性能。

代码上,还将原来在 EventLoop 的线程池去掉,Channel 也不再区分是否使用线程池。

现在,服务器以事件驱动为核心,服务器线程只负责 mainReactor 的新建连接任务,同时维护一个线程池,每一个线程是一个事件循环,新连接建立后分发给一个 subReactor 开始事件监听,有事件发生则在当前线程处理。

该节代码:GithubGitee

十二、项目工程化

目前服务器的结构是主从 Reactor 多线程模式,是比较主流的模式。所以大体上的方向已经确定,接下来对细节进行优化,把项目工程化。

12.1 认识Cmake

首先,CMake是一个跨平台的编译工具,可以用简单的语句进行编译。

一个项目使用 CMake 维护一个 CMakeLists.txt 配置文件来描述一个项目的编译过程。利用这个文件,就可以搭建起来这个项目。

目前将所有文件都放在一个文件夹,并且没有分类。随着项目越来越复杂、模块越来越多,开发者需要考虑这座屎山的可读性,如将模块拆分到不同文件夹,将头文件统一放在一起等。

对于这样复杂的项目,如果手写复杂的Makefile来编译链接,那么将会相当负责繁琐。我们应当使用 CMake 来管理我们的项目,CMake 的使用非常简单、功能强大,会帮我们自动生成 Makefile 文件,使项目的编译链接更加容易,程序员可以将更多的精力放在写代码上。

这是 CmakeLists.txt 基本结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# xxx:本 CMakeLists.txt 的 project 名称
# 会自动创建两个变量,PROJECT_SOURCE_DIR 和 PROJECT_NAME
# ${PROJECT_SOURCE_DIR}:本 CMakeLists.txt 所在的文件夹路径
# ${PROJECT_NAME}:本CMakeLists.txt 的 project 名称
project(xxx)

# 获取路径下所有的.cpp/.c/.cc文件,并赋值给变量中
aux_source_directory(路径 变量)

# 给文件名/路径名或其他字符串起别名,用${变量}获取变量内容
set(变量 文件名/路径/...)

# 添加编译选项
add_definitions(编译选项)

# 打印消息
message(消息)

# 编译子文件夹的CMakeLists.txt
add_subdirectory(子文件夹名称)

# 将.cpp/.c/.cc文件生成.a静态库
# 注意,库文件名称通常为libxxx.so,在这里只要写xxx即可
add_library(库文件名称 STATIC 文件)

# 将.cpp/.c/.cc文件生成可执行文件
add_executable(可执行文件名称 文件)

# 规定.h头文件路径
include_directories(路径)

# 规定.so/.a库文件路径
link_directories(路径)

# 对add_library或add_executable生成的文件进行链接操作
# 注意,库文件名称通常为libxxx.so,在这里只要写xxx即可
target_link_libraries(库文件名称/可执行文件名称 链接的库文件名称)

12.2 工程化的实际操作

首先规范化目录的意义:

  • src 目录(即source),用于存放核心的代码;
    • include 目录,用于存放源代码中的头文件;
  • test 目录,用于存放测试的代码;
1
2
3
4
5
6
7
projiect/
├─src/
│ ├─include/
│ │ ├─*.h
│ ├─*.cpp
├─test/
│ ├─*.cpp

在这一章,我们使用的是一个 CMake 工程,所以 Visual Studio 创建的是 CMake 项目。接下来就是 CMake 的配置工作。(有关 CMake 的安装使用可参考附 2

构建上述文件目录,将对应的文件分类进去。

文件结构

接着,开始编写项目的根 CMakeLists.txt 文件(即根目录下的 CMakeLists.txt ):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
cmake_minimum_required(VERSION 3.10)    # CMake运行的最小版本
set(CMAKE_EXPORT_COMPILE_COMMANDS ON) # 启用编译命令的导出,常与代码分析工具配合使用
set(BUILD_SHARED_LIBS ON) # 构建共享(动态)库
set(CMAKE_CXX_STANDARD 17) # 设置C++标准为17
set(CMAKE_CXX_STANDARD_REQUIRED ON) # 要求编译器支持C++17

# 设置编译器
set(CMAKE_C_COMPILER "clang")
set(CMAKE_CXX_COMPILER "clang++")

# 项目信息
project(Day12 # 项目名称
LANGUAGES CXX # 项目语言
)

# 为源代码和测试添加子目录
add_subdirectory(src)
add_subdirectory(test)

# 设置包含目录
set(SRC_INCLUDE_DIR ${PROJECT_SOURCE_DIR}/src/include)
# set(TEST_INCLUDE_DIR ${PROJECT_SOURCE_DIR}/test/include)
include_directories(${SRC_INCLUDE_DIR})

# 设置输出目录
set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib)
set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib)
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin)

# 设置编译和链接选项
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fPIC -Wall -Wextra -std=c++17 -pthread")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -O0 -ggdb -fsanitize=address -fno-omit-frame-pointer -fno-optimize-sibling-calls")
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fPIC")
set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} -fPIC")
set(CMAKE_STATIC_LINKER_FLAGS "${CMAKE_STATIC_LINKER_FLAGS} -fPIC")
set(GCC_COVERAGE_LINK_FLAGS "-fPIC")

# 显示编译器和链接器标志
message(STATUS "CMAKE_CXX_FLAGS: ${CMAKE_CXX_FLAGS}")
message(STATUS "CMAKE_CXX_FLAGS_DEBUG: ${CMAKE_CXX_FLAGS_DEBUG}")
message(STATUS "CMAKE_EXE_LINKER_FLAGS: ${CMAKE_EXE_LINKER_FLAGS}")
message(STATUS "CMAKE_SHARED_LINKER_FLAGS: ${CMAKE_SHARED_LINKER_FLAGS}")
  • 第一次接触 CMake 命令可以参考注释理解。

接着,尝试把我们关于服务器的设计打包成一个库,即编写 src/CMakeLists.txt

1
2
3
4
5
6
7
8
9
10
11
12
# 设置包含目录
set(SRC_INCLUDE_DIR ${PROJECT_SOURCE_DIR}/src/include)
include_directories(${SRC_INCLUDE_DIR})

# 递归搜索/src目录中的所有.cpp文件
file(GLOB_RECURSE day12_sources ${PROJECT_SOURCE_DIR}/src/*.cpp)

# 设置共享库链接选项
set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} -fPIC -pthread")

# 使用源文件创建一个名为day12_shared的共享库
add_library(day12_shared SHARED ${day12_sources})

然后,把测试文件的 CMakeLists.txt(即 test/CMakeLists.txt)也编写一下,用于管理测试文件的编译:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 设置包含目录
set(SRC_INCLUDE_DIR ${PROJECT_SOURCE_DIR}/src/include)
include_directories(${SRC_INCLUDE_DIR})

# 设置变量TEST_SOURCES,将所有.cpp文件存储在测试目录中
file(GLOB TEST_SOURCES "${PROJECT_SOURCE_DIR}/test/*.cpp")

# 创建名为“build-tests”的自定义目标以仅显示测试
add_custom_target(build-tests COMMAND ${CMAKE_CTEST_COMMAND} --show-only)
# 创建名为“check-tests”的自定义目标以在详细模式下运行测试
add_custom_target(check-tests COMMAND ${CMAKE_CTEST_COMMAND} --verbose)

# 遍历TEST_SOURCES中的每个测试源文件
foreach (test_source ${TEST_SOURCES})
# 组合为可读的名称,使得每个Cpp都可以make
get_filename_component(test_filename ${test_source} NAME)
string(REPLACE ".cpp" "" test_name ${test_filename})

# 为测试添加可执行目标,默认情况下将其从所有生成中排除
add_executable(${test_name} EXCLUDE_FROM_ALL ${test_source})
# 添加对生成测试和检查测试的依赖项
add_dependencies(build-tests ${test_name})
add_dependencies(check-tests ${test_name})

# 将“day12_shared”库链接到可执行测试文件
target_link_libraries(${test_name} day12_shared)

# 设置测试目标的属性,指定输出目录和运行测试的命令
set_target_properties(${test_name}
PROPERTIES
RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/bin"
COMMAND ${test_name}
)
endforeach(test_source ${TEST_SOURCES})

当然,这章在代码上也有些许修改,比如函数参数做了 const &,类也禁止了拷贝和移动操作。

该节代码:GithubGitee

接着只需要把项目部署到远程 Linux 服务器,使用以下命令编译即可:

  • make server:编译服务端代码
  • make SingleClient:编译单个客户端连接代码
  • make MultipleClients:编译多个客户端连接代码
  • make clean:清理生成

运行只需要:

  • ./bin/server:启动服务端
  • ./bin/SingleClient:启动单个客户端连接
  • ./bin/MultipleClients -t 线程数 -m 回显消息数 -w 延时发送信息:启动多个客户端连接

原作者还进行了代码静态分析和代码格式化,详见地址:Github

十三、业务逻辑自定义化

13.1 业务逻辑思想

首先回顾之前的思想,我们目前服务器只有一个功能,就是进行回声(Echo):把客户端发来的消息再发送回去。而这个功能,或者说业务逻辑,就固定在 Connection 类。

而通过第十二章的设计,我们把网络方面的代码整合为一个链接库。很明显,作为一个库,并不能就这样把业务逻辑固定了,应该支持业务逻辑自定义。

业务逻辑由用户自定义,然后使用网络库进行服务器与客户端间的交互。

怎样事件触发、读取数据、异常处理等流程应该是网络库提供的基本功能,用户只应当关注怎样处理业务即可,所以业务逻辑的进入点应该是服务器读取完客户端的所有数据之后。这时,客户端传来的请求在 Connection 类的读缓冲区里,只需要根据请求来分发、处理业务即可。

总体上,服务器端提出这样的设计:

  • 具有一个 Server 类和一个事件循环类。
  • 通过回调函数的方式编写业务逻辑,传给 Server 类的实例。
    • 只需关心服务器的处理方法,比如一个 Echo 服务器只需要把对方发来的信息发回去。通过设置 onMessage 回调函数来自定义自己的业务逻辑,在服务器完全接收到客户端的数据之后,该函数触发。
    • 可以设置连接时的业务逻辑和整个服务端的业务逻辑。
1
2
3
4
5
6
7
8
9
10
11
Server *server = new Server(loop);

server->newConnect([] (Connection *conn)
{
// 服务器对有新连接时的操作函数
});

server->onMessage([] (Connection *conn)
{
// 服务器对客户端消息的操作函数
});

另外,希望客户端的代码也可以通过我们的网络库进行实现:将 Connection 类进行完善,使得其满足服务端(Server → Client)和客户端(Client → Server)的使用:

  • 服务端和客户端的传输数据方向是相反的:对于服务端,它从客户端中读取数据,或者写入数据到客户端;对于客户端,它从服务端中读取数据,或者写入数据到服务端。
  • 在发回数据时,应该考虑对方是否已经关闭了链接。所以还需要设计 Connection 的状态。

总体上,客户端要使用 Connection 类,提出这样的设计:

  • 提供 write()read() 函数。
    • write() 函数表示将写缓冲区里的内容发送到该 Connection 的 socket,发送后会清空写缓冲区;
    • read() 函数表示清空读缓冲区,然后将 TCP 缓冲区内的数据读取到读缓冲区。
  • 考虑 Connection 的状态 State

13.2 操刀动代码

根据上面的分析,Server 进行改动如下:

  1. Server 类进行改写:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// Server.h
class Server
{
private:
// ......
std::function<void(Connection *)> onConnectionCallback; // 连接的业务逻辑
std::function<void(Connection *)> onMessageCallback; // 消息的业务逻辑
std::function<void(Connection *)> newConnectCallback; // 新连接的业务逻辑

public:
// ...
/**
* @brief 设置服务器的业务逻辑
* @param fn 业务逻辑函数
*/
void onConnect(std::function<void(Connection *)> fn);

/**
* @brief 设置接收消息时调用的回调函数
* @param fn 回调函数
*/
void onMessage(std::function<void(Connection *)> fn);

/**
* @brief 设置在建立新连接时调用的回调函数。
* @param fn 回调函数
*/
void newConnect(std::function<void(Connection *)> fn);
// ...
}

但是我们不能急,修改 Server 必须还得对 Connection 类的完善。因为服务器的一些操作是通过连接类完成,改动如下:

  1. 添加连接状态(此处其实只关注是否连接建立即可):
1
2
3
4
5
6
7
8
enum State
{
Invalid = 1, // 初始无效状态
Handshaking, // 握手过程中的状态
Connected, // 连接建立
Closed, // 连接关闭
Failed, // 连接失败
};
  1. 提供读写函数(详细见章末 Gitee 或 Github 链接):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void Connection::read()
{
// 判断连接状态
// 清空读缓冲区
// 读操作
}
void Connection::write()
{
// 判断连接状态
// 写操作
// 清空写缓冲区
}

// 用于服务器程序回发消息
void send(std::string msg);

读操作和写操作区分是否阻塞:对于客户端,使用阻塞读写;对于服务端,使用非阻塞读写。在判断 Socket 是否阻塞时,需要添加个函数(之前没有)。

  1. 将原来成员属性 int fd 变成 Socket *mSocket

  2. 添加上对应的回调函数及其 Set 函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
std::function<void(Socket *)> mDeleteConnectionCallback;	// 删除连接的回调函数
std::function<void(Connection *)> mOnConnectCallback; // 连接建立时的回调函数
std::function<void(Connection *)> mOnMessageCallback; // 业务逻辑回调函数

/**
* @brief 设置连接时的业务逻辑回调函数
* @param callback 回调函数
*/
void setOnConnectionCallback(std::function<void(Connection *)> const &callback);

/**
* @brief 将回调函数设置为在收到消息时调用
* @param callback 接收到消息时调用的函数
*/
void setOnMessageCallback(std::function<void(Connection *)> const &callback);

/**
* @brief 设置删除连接时要调用的回调函数
* @param _callback 删除连接时要调用的回调函数
*/
void setDeleteConnectionCallback(std::function<void(Socket *)> const &callback);
  1. 编写业务函数。在构建服务器时指定对客户端消息的响应,然后通过对 Server 类的设置,传递到 Connection 类,最后传递到 Channel 类的 handleEvent() 进行调用。
1
2
3
4
5
6
7
8
9
/**
* @brief 该函数表示业务逻辑。
*/
void business()
{
// 大概操作有:
// - 接受客户端信息
// - 做出响应,即 mOnMessageCallback()
}

现在也差不多了,但是可以完善(重构)一下 Channel 类,让其意义更明确,更规范一些:

  1. 规范私有成员变量:
1
2
3
4
5
6
7
8
private:
EventLoop *mLoop; // 指向与之关联的事件循环
Socket *mSocket; // 与之关联的Socket
uint32_t mListenEvents{ 0 }; // 监听的事件
uint32_t mReadyEvents{ 0 }; // 就绪事件
bool exist{ false }; // 指示该Channel是否存在有效
std::function<void()> readCallback; // 读回调
std::function<void()> writeCallback;// 写回调
  1. 编写相关成员函数(函数名修改后记得在对应调用处修改):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 处理事件
void handleEvent();
// 开启读操作
void enableRead();
// 开启写操作
void enableWrite();
// 使用ET
void useET();
// 获取Socket
Socket *getSocket() const;
// 获取监听事件
uint32_t getListenEvents();
// 获取就绪事件
uint32_t getReadyEvents();
// 设置就绪事件
void setReadyEvents(uint32_t events);
// 检查有效性
bool getExist() const;
// 设置有效性
void setExist(bool _exist);
// 设置回调函数。
void setReadCallback(std::function<void()> const &callback);
  1. 修改 Channel 类的析构函数,其析构为 loop 调用 deleteChannel()(需要添加函数)(实际上还是相关联的 Epoll 封装类去 deleteChannel()):
1
2
3
4
5
6
7
8
9
Channel::~Channel()
{
mLoop->deleteChannel(this);
}

void EventLoop::deleteChannel(Channel *channel)
{
ep->deleteChannel(channel);
}

在修改了 Channel 类后,其相关联的 Epoll 封装类也需要修改:

  1. 修改 Epoll::deleteChannel(Channel *channel),需要把当前的 Channel 对象从 epoll 中删除,然后设置有效性为 false
1
2
3
4
5
6
void Epoll::deleteChannel(Channel *channel)
{
int fd = channel->getSocket()->getFd();
errorif(epoll_ctl(mEpFd, EPOLL_CTL_DEL, fd, nullptr) == -1, "epoll delete error");
channel->setExist(false);
}
  1. 完善 Epoll::updateChannel(Channel *channel)
1
2
3
4
5
6
7
8
9
void Epoll::updateChannel(Channel *channel)
{
// 更新事件event,读写事件区分开

if (!channel->getExist()) // 如果不存在,则添加
// 添加到epoll
else // 存在,只需要修改
// 修改epoll
}
  1. 完善 Epoll::poll(int timeout)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
vector<Channel *> Epoll::poll(int timeout)
{
// epoll_wait

for (int i = 0; i < nfds; ++i)
{
// 遍历事件 events
if(events & EPOLLIN)
// 读
if (events & EPOLLOUT)
// 写
if(events & EPOLLET)
// ET 模式
}
// 返回事件(Channel数组)
}

最后检查各个文件无报错后,根据需要修改 CMakeLists.txt 文件。

之后,如果想创建不一样功能的服务器,可以通用我们这样的一个网络库。

该节代码:GithubGitee

十四、再次重构,告一段落

14.1 重构思想

  1. 使用智能指针进行内存管理。在之前的开发中,使用的都是原始的指针,但是原始的指针对内存管理而言是困难的,极易产生内存泄漏、悬垂引用、野指针等问题。从 C++11 标准后,可以使用智能指针来管理内存,让程序员无需过多考虑内存资源的使用。

    • std::unique_ptr
    • std::shared_ptr
    • std::weak_ptr
  2. 避免资源的复制操作,尽量使用移动语义来进行所有权的转移,这对提升程序的性能有十分显著的帮助。

  3. 对错误、异常的处理。在项目上线后,我们不能因为某些错误就直接让程序崩溃或者终止。而且,绝大部分错误都是可恢复的:

    • 如创建 socket 失败可能是文件描述符超过操作系统限制,稍后再次尝试即可。
    • 监听 socket 失败可能是端口被占用,切换端口或提示并等待用户处理即可。
    • 打开文件失败可能是文件不存在或没有权限,此时只需创建文件或赋予权限即可。
    • 所以在底层的编码上,对于部分错误需要进行可恢复处理,避免一个模块或资源发生的小错误影响整个服务器的运行。

14.2 加入.clang-fromat

Clang 本身是一个 C++ 的编译器。而 Clang-Format 是其中的一个格式化工具,可用于格式化(排版)多种不同语言的代码。在 Linux 中安装一下 clang-format

1
sudo apt install -y clang-format

如果使用 VSCode 进行编程的话,需要安装插件 Clang-Format,格式化快捷键:shift + alt + f

详见:使用clang-format给你代码格式化

纯靠手动控制格式太麻烦了,还是使用工具吧

14.3 设计宏定义(Common.h)

显式将拷贝和移动函数删除,避免拷贝和移动操作:

1
2
3
4
5
6
7
8
9
10
11
#define DISALLOW_COPY(className) \
className(const className &) = delete; \
className &operator = (const className &) = delete;

#define DISALLOW_MOVE(className) \
className(className &&) = delete; \
className &operator = (className &&) = delete;

#define DISALLOW_COPY_AND_MOVE(className) \
DISALLOW_COPY(className); \
DISALLOW_MOVE(className);

新增 FLAG 标记,统一标记函数的返回:

1
2
3
4
5
6
7
8
9
enum FLAG {
FL_UNDIFINED,
FL_SUCCESS,
FL_SOCKET_ERROR,
FL_EPOLL_ERROR,
FL_CONNECTION_ERROR,
FL_ACCEPTOR_ERROR,
FL_UNIMPLEMENTED
};

记得修改包含的头文件。

14.4 重构Socket类

Socket 类主要是对 socket 操作进行了封装,并主要应用在 Acceptor 类中和 Connection 类中。对 Socket 类的函数进行重构,同时删去 InetAddress 类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class Socket {
public:
DISALLOW_COPY_AND_MOVE(Socket);

Socket();
~Socket();
void setFd(int fd);
int getFd() const;
std::string getAddr() const;
FLAG socketCreate();
FLAG socketBind(const char *ip, uint16_t port) const;
FLAG socketListen() const;
FLAG socketAccept(int &clientFd) const;
FLAG socketConnect(const char *ip, uint16_t port) const;
FLAG setNonBlocking() const;
bool isNonBlocking() const;
size_t recvBufSize() const;

private:
int mFd{-1}; // socket 文件描述符
};

对于 Socket的创建、绑定、监听、接受等操作进行错误、异常的处理,在函数中大概如下:

1
2
3
4
5
6
FLAG xxx() const {
// 断言 fd 是否合法
// 进行创建、绑定、监听、接受等操作
// 判断上述操作是否出现异常
// 出现异常则输出并返回错误标记,无异常则返回成功标记
}

对于 Socket 的连接操作,是将 Socket 连接到某个 IP 地址,在函数中如下:

1
2
3
4
5
6
FLAG Socket::socketConnect(const char *ip, uint16_t port) const {
// 构建地址结构体
// 连接
// 判断上述操作是否出现异常
// 出现异常则输出并返回错误标记,无异常则返回成功标记
}

还有其他的 Get、Set 函数可详见代码。

14.4 小改Channel类和Epoll类

修改完 Socket 类后,比较底层的还有 Channel 类。Channel 类是网络库的核心组建之一,其对 socket 进行了更深度的封装,保存了需要对 socket 监听的事件和当前 socket 已经准备好的事件,并进行处理。此外,为了更新和获取在 epoll 中的状态,需要使用EventLoop进行管理。

对于 Channel 类的改动并不多,类声明如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class Channel {
public:
DISALLOW_COPY_AND_MOVE(Channel);

explicit Channel(EventLoop *loop, int fd);
~Channel();

void handleEvent() const;
void enableRead();
void enableWrite();
void useET();
int getFd() const;
uint32_t getListenEvents() const;
uint32_t getReadyEvents() const;
void setReadyEvents(uint32_t events);
bool getExist() const;
void setExist(bool _exist = true);
void setReadCallback(std::function<void()> const &callback);
void setWriteCallback(std::function<void()> const &callback);

private:
EventLoop *mLoop; // 指向与之关联的事件循环
int mFd; // 与之关联的Socket fd
uint32_t mListenEvents{0}; // 监听的事件
uint32_t mReadyEvents{0}; // 就绪事件
bool exist{false}; // 指示该Channel是否存在有效
std::function<void()> readCallback; // 读回调
std::function<void()> writeCallback; // 写回调
};

大部分代码没什么特别的,可以见代码。但是需要注意的是,设置回调函数时,使用 std::move()

1
2
3
4
void Channel::setxxxCallback(std::function<void()> const &callback)
{
xxxCallback = std::move(callback);
}

Epoll 类主要是进行 IO 多路复用,保证高并发。在 Epoll 类主要是对 epoll 中 channel 的监听与处理。声明改为如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class Epoll {
public:
// 禁用拷贝和移动
DISALLOW_COPY_AND_MOVE(Epoll);

Epoll();
~Epoll();

FLAG updateChannel(Channel *channel) const;
FLAG deleteChannel(Channel *channel) const;

std::vector<Channel *> poll(int timeout = -1) const;

private:
int mEpFd{-1}; // epoll文件描述符
struct epoll_event *mEvents{nullptr}; // epoll事件
};

函数方面也是小改。

14.5 小改EventLoop类

EventLoop 类用于对事件的轮询和处理。每一个 EventLoop 不断地调用 epoll_wait 来获取激活的事件,并处理。原本的 EventLoop 类中有一个普通的指针 Epoll*,现改为 std::unique_ptr。顺便再把函数声明为 const,使其更安全。

EventLoop 类声明如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class EventLoop {
public:
EventLoop();
~EventLoop();

DISALLOW_COPY_AND_MOVE(EventLoop);

void loop() const;
void updateChannel(Channel *channel) const;
void deleteChannel(Channel *channel) const;

private:
std::unique_ptr<Epoll> ep; // 指向Epoll类实例的指针
};

由于使用了智能指针,所以其构造函数和析构函数也简化了不少。

14.6 小改Acceptor类

Acceptor 主要用于服务器接受连接,并在接受连接之后进行相应的处理。这个类需要独属于自己的 Channel,因此采用了智能指针管理。

Acceptor 类的重构类似。

  • 将一些指针变成智能指针;
  • 使用之前定义的 FLAG 标记。

Acceptor 类声明如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class Acceptor {
public:
DISALLOW_COPY_AND_MOVE(Acceptor);

explicit Acceptor(EventLoop *loop);
~Acceptor();

FLAG acceptConnection() const;
void setNewConnectionCallback(std::function<void(int)> const &callback);

private:
std::unique_ptr<Socket> mSocket; // 用于处理套接字操作的套接字指针
std::unique_ptr<Channel> mChannel; // 用于接受连接的 Channel 指针

// 定义一个新建连接的回调函数
std::function<void(int)> mNewConnectionCallback;
};

14.7 小改Connection类

对于每个 TCP 连接,都可以使用一个类进行管理,在这个类中,将注意力转移到对客户端 socket 的读写上,除此之外,他还需要绑定几个回调函数,例如当接收到信息时,或者需要关闭时进行的操作。

对于 Connection 类中的指针改用智能指针,同时按需求简化了部分函数,声明如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class Connection {
public:
enum State {
Invalid = 0, // 初始无效状态
Handshaking, // 握手过程中的状态
Connected, // 连接建立
Closed, // 连接关闭
Failed, // 连接失败
};

public:
explicit Connection(EventLoop *loop, int fd);
~Connection();

DISALLOW_COPY_AND_MOVE(Connection);

FLAG read();
FLAG write();
FLAG send(std::string msg);
void setOnMessageCallback(std::function<void(Connection *)> const &callback);
void setDeleteConnectionCallback(std::function<void(int)> const &callback);
void business();
State getState();
void close();
void setSentBuffer(char const *str);
Buffer *getReadBuffer();
Buffer *getSendBuffer();
Socket *getSocket();

private:
// EventLoop *mLoop; // EventLoop指针
std::unique_ptr<Socket> mSocket; // Socket指针
std::unique_ptr<Channel> mChannel{nullptr}; // Channel指针
std::unique_ptr<Buffer> mReadBuffer{nullptr}; // 读缓冲区
std::unique_ptr<Buffer> mSendBuffer{nullptr}; // 写缓冲区

State mState{Invalid}; // 连接状态
std::function<void(int)> mDeleteConnectionCallback; // 删除连接的回调函数
std::function<void(Connection *)> mOnMessageCallback; // 业务逻辑回调函数

FLAG readNonBlocking(); // 非阻塞读
FLAG writeNonBlocking(); // 非阻塞写
FLAG readBlocking(); // 阻塞读
FLAG writeBlocking(); // 阻塞写
};

14.8 重头戏Server类

Server 类是对整个服务器的管理,他通过创建 acceptor 来接收连接。并管理 Connection 的添加。

对 Server 类的众多指针都改为智能指针,声明如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Server {
public:
Server();
~Server();
DISALLOW_COPY_AND_MOVE(Server);

void start();
void onConnect(std::function<void(Connection *)> fn);
void onMessage(std::function<void(Connection *)> fn);
FLAG newConnection(int fd);
FLAG deleteConnection(int fd);

private:
std::unique_ptr<EventLoop> mainReactor; // 只负责接受连接,然后分发给一个subReactor
std::vector<std::unique_ptr<EventLoop>> subReactors; // 处理事件循环
std::unique_ptr<Acceptor> acceptor; // 连接接受器
std::unordered_map<int, std::unique_ptr<Connection>> connections; // 存储连接及其相应的文件描述符
std::unique_ptr<ThreadPool> threadPool; // 线程池

std::function<void(Connection *)> onConnectionCallback; // 连接的业务逻辑
std::function<void(Connection *)> onMessageCallback; // 接收消息后的业务逻辑
};

修改完这么多类后,记得查看代码修改一下测试的 server.cppSingleClient.cppMultipleClients.cpp 等文件,然后编译即可。

该节代码:GithubGitee


附录

附 1 - 代码运行环境

前十四章:

  • 代码编写:Windows 下 Visual Studio 2022
  • 代码编译及执行:阿里云 ECS,Ubuntu 20.04.6 LTS (GNU/Linux 5.4.0-169-generic x86_64)

在 Visual Studio 2022 中编写代码,接着连接远程服务器,将代码部署到服务器上。

剩下章节:

  • 代码编写:Windows 下 Visual Studio Code
  • 代码编译及执行:WSL2 - Ubuntu 18.04

用 Visual Studio Code 远程连接 WSL 进行编写代码,使用 cmake 进行项目管理。

附 2 - CMake的安装和使用

此处的环境是:Visual Studio 2022 远程连接 Ubuntu 20.04.6 LTS

  • 当然 Windows 也有 CMake,此处主要是在 Linux 下的使用。

附 2.1 检查远程的CMake环境和编译环境

可能需要先 apt-get update 更新一下 apt。

  1. 安装 CMake 工具
1
apt-get install cmake
  1. 可选择安装使用 clang 编译器
1
apt-get install clang

附 2.2 Visual Studio 2022中使用CMake进行远程Linux服务器开发

  1. 创建选择 CMake 项目,我此处构建演示项目 CMakeTestProject

选择CMake项目

  1. 选择远程的 Linux 计算机,管理配置,新建一个配置。

管理配置

新建配置

  1. 编辑配置,具体看下图,主要修改部分已经框选。

编辑配置

  1. 正常编写代码,此处我用 Day12(第十二章)的代码作为演示。编写代码后,可以点进去根目录的 CMakeLists.txt 文件,保存一下(ctrl + s),就会自动复制到远程。

  2. 进行项目生成和编译,项目生成有两种方式:

  • 可以选择右键项目名,选择以 CMake 视图查看。再进行生成或清理,最后使用编译命令进行编译。

选择CMake视图

生成和清理

生成成功1

  • 可以在 Linux 的终端上执行以下命令:
1
mkdir build && cd build && cmake ../src/ && make

生成成功2

可以在 CMakeLists.txt 中的项目信息之前设置编译器为Clang。

  1. 生成成功后就可以在 CMakeLists.txt 中指定的输出文件夹中找到可执行文件或其他。

附 2.2 Visual Studio 2022中使用CMake进行WSL开发

有的人可能没有Linux服务器,但是WSL可以有的。如何安装WSL可以查看这里

基本步骤同上,但在新建配置和编辑时,需要做一些修改:

新建关于WSL的配置

编辑关于WSL的配置

附 2.3 CMake相关资料

附 3 - 可能出现的问题

  • 在 Ubuntu 中安装 make 时,出现 “dpkg: error processing package ***” 的问题,可以参考 CSDN

  • 在线程池部分中,直接使用 g++ 命令会出现 “对‘pthread_create’未定义的引用” 的问题,需要加上 -lpthread 参数,详见 CSDNMakefile 中的做法。