2.2 Reactor的原理和实现

2.2.1 什么是Reactor?

​ Reactor是一种计算模型,常用于处理异步事件驱动的编程架构。它能够有效地管理和调度多个事件源的输入输出操作,尤其是在高并发的环境中。将IO的管理转变成了对事件的管理。

在Reactor模式中,主要有以下几个组件:

  1. 事件源(Event Source):这些是产生事件的实体,比如网络连接、文件IO等。
  2. 事件分发器(Event Demultiplexer):负责监控所有事件源,并将就绪的事件分发给相应的处理程序。
  3. 事件处理程序(Event Handler):定义如何处理特定类型的事件。

Reactor模式的优点在于,它能够有效地使用系统资源,减少上下文切换的开销,从而提高应用程序的性能。

Reactor的核心:不同的IO事件对应不同的回调函数:

  1. register
  2. callback

​ io -> event -> call back

listenfd -> EPOLLIN ->accept_cd

clientfd-> EPOLLIN ->recv_cb

clientfd-> EPOLLOUT ->send_cb

俩类事件,俩类fd(IO)

我们要采用事件的处理方式,所以要更关注事件和回调的匹配。每一个IO与之对应的有哪些匹配的参数。

EPOLLIN这个事件对应俩种情况,分别是客户端连入和客户端发来消息,

EPOLLOUT这个事件对应客户端可写

2.2.2 Reactor 的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
#include "../base_tool.h"
#include <sys/epoll.h>
#include <signal.h>
#include <string.h>
int epfd = 0;
#define BUFFER_LENGTH 1024
#define CONN_SIZE 1048576

typedef int (*RCALLBACK)(int fd);
struct conn // 什么事件,执行什么回调函数
{
int fd;
char rbuffer[BUFFER_LENGTH];
char rlength;
char wbuffer[BUFFER_LENGTH];
char wlength;

RCALLBACK send_callback;
union
{
RCALLBACK recv_callback;
RCALLBACK accept_callback; // 只有 EPOLLIN 和EPOLLOUT 俩个事件,所以accept和recv是或的关系
} r_action;
};
struct conn conn_list[CONN_SIZE] = {0};

long GetUnixTime()
{
auto now = std::chrono::system_clock::now();
// 转换为时间戳
auto timestamp = std::chrono::system_clock::to_time_t(now);
return timestamp;
}
// fd

void set_event(int fd, int event,int flag) // 设置事件
{
if (flag) // add
{
struct epoll_event ev;
ev.events = event;
ev.data.fd = fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);
}
else // modify
{
struct epoll_event ev;
ev.events = event;
ev.data.fd = fd;
epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
}
}

int init_server(unsigned short port) // 初始化一个server
{
int sockfd = socket(AF_INET, SOCK_STREAM, 0); // 创建一个socket
struct sockaddr_in servaddr;

servaddr.sin_family = AF_INET; // 指定IP地址地址版本 为IPV4
servaddr.sin_addr.s_addr = htonl(INADDR_ANY); // 绑定本地0.0.0.0
servaddr.sin_port = htons(port); // 0~1023 系统默认,需要给一个大于1024的地址,小于65535

if (-1 == bind(sockfd, (struct sockaddr *)&servaddr, sizeof(struct sockaddr)))
{
std::cout << "bind failed :\n" ;
return -1;
}
listen(sockfd, 10);
printf("listen finished\n");
return sockfd;
}

int recv_callback(int fd)
{
int count = recv(fd, conn_list[fd].rbuffer, BUFFER_LENGTH, 0);
conn_list[fd].rlength = count;
if (count == 0)
{
printf("client disconnect%d\n", fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); // TODO: unfinished
close(fd);
return -1;
}
printf("RECV :%s\n", conn_list[fd].rbuffer);

//echo:
//默认将客户端发送的内容,回给客户端
memcpy(conn_list[fd].wbuffer,conn_list[fd].rbuffer,conn_list[fd].rlength);
conn_list[fd].wlength = conn_list[fd].rlength;
set_event(fd, EPOLLOUT,0);
return count;
}

int send_callback(int fd)
{
int count = send(fd, conn_list[fd].wbuffer, BUFFER_LENGTH, 0);

printf("SEND :%s\n", conn_list[fd].wbuffer);
set_event(fd, EPOLLIN,0);
return count;
}

int event_register(int fd, int event) //事件注册函数
{
if(fd <0){
return -1;
}
conn_list[fd].fd = fd;
conn_list[fd].r_action.recv_callback = recv_callback;
conn_list[fd].send_callback = send_callback;
// 为新的链接设置回调

memset(conn_list[fd].rbuffer, 0, BUFFER_LENGTH);
conn_list[fd].rlength = 0;

memset(conn_list[fd].wbuffer, 0, BUFFER_LENGTH);
conn_list[fd].wlength = 0;

set_event(fd, event,1);
return 0;
}

int accept_callback(int fd)
{
//
struct sockaddr client_addr;
socklen_t client_addr_len = sizeof(client_addr);
int clientfd = accept(fd, &client_addr, &client_addr_len);

printf("accept clienfd=%d\n",clientfd);
if(clientfd < 0){
printf("accept failed %s\n",strerror(errno));
return -1;
}
event_register(clientfd, EPOLLIN); // 接受到新的客户端fd,注册EPOLLIN事件
return 0;
}

int main(int argc, char **argv)
{
if (argc <= 1) {
printf("Usage: %s port\n", argv[0]);
exit(0);
}
unsigned short port = atoi(argv[1]);
int sockfd = init_server(port);

epfd = epoll_create(1);
conn_list[sockfd].fd = sockfd;
conn_list[sockfd].r_action.recv_callback = accept_callback;
set_event(sockfd, EPOLLIN,1);
while (1)
{ // mainloop
struct epoll_event events[1024] = {0};
int nready = epoll_wait(epfd, events, 1024, -1);
for (int i = 0; i < nready; i++)
{
// 当有epoll连入时,根据具体时间,调用对应的函数
int connfd = events[i].data.fd;
// 一次循环中,IO的读写可能并存,一起处理。
if (events[i].events & EPOLLIN)
{
conn_list[connfd].r_action.recv_callback(connfd); //可能是accept_callback,他们共用一个函数空间,所以都能调到
}

if (events[i].events & EPOLLOUT)
{
conn_list[connfd].send_callback(connfd);
}
}

// sleep(1);
}
// conn_list[sockfd].r_action.recv_callback = sockfd;
}
  • 不同的IO事件,做不同的action
  • IO做到独立,每个事件互相独立

2.2.3 测试并发的问题解决

2.2.3.1 文件打开数量过多

image-20241222162936072

  • 解决方案:ulimit -n 1048576
  • 或者:在该文件后添加:
  • image-20241222164239927

2.2.3.2 客户端提示

image-20241222163021052

原因:不能够分配请求地址:五元组不够 (可以开辟的端口数不足导致的)

tcp: 分为五元: (sip,dip,sport,dport,proto) 源ip,目标ip,源端口,目标端口,协议

解决方案:

服务器可以选择启20个端口的服务器

1
2
3
4
5
6
7
8
// main    
epfd = epoll_create(1);
for(int i=0;i<MAX_PORT;i++){ //初始化20个server,每个都注册
int sockfd = init_server(port + i);
conn_list[sockfd].fd = sockfd;
conn_list[sockfd].r_action.recv_callback = accept_callback;
set_event(sockfd, EPOLLIN,1);
}

2.2.3.3 未能通过编译

image-20241222164727354

由于一次性开辟1048576个conn_list 数字太大,编译器报错了

编译方式改为:

1
g++ reactor.cpp -mcmodel=medium

2.2.3.4 客户端端口号有限制:

image-20241222165533370

2.2.3.5 为什么客户端Ctrl+c 服务端会崩?

当客户端Ctrl+c时,服务端的cpu占用率会很高。一次性断开多个连接,服务端被系统kill了

解决方案:尝试去捕获系统的kill -9信号,做自处理。