目录
- 1. 项目简介
- 1.1 环境介绍
- 1.2 项目定位
- 1.3 功能模块整体划分
- 2. Reactor简介
- 2.1 Reactor模型分析
- 2.2 多Reactor多线程分析:多I/O多路复用+线程池(业务处理)
- 3. 日志宏的编写
- 4. Server模块
- 4.1 Buffer模块
- 4.1.1 Buffer的功能
- 4.1.2 Buffer的实现思想
- 4.1.3 Buffer的实现
- 4.2 Socket模块
- 4.2.1 Socket的功能
- 4.2.2 Socket的实现
- 4.3 TimeQueue模块
- 4.3.1 TimeQueue的功能
- 4.3.2 时间轮的思想
- 4.3.3 定时器任务TimerTask类
- 4.3.4 TimerWheel 类
- 4.4 Any模块
- 4.4.1 Any的功能
- 4.4.2 Any的实现
- 4.5 Channel模块
- 4.5.1 Channel的功能
- 4.5.2 Channel的实现
- 4.6 Acceptor模块
- 4.6.1 Acceptor的功能
- 4.6.2 Acceptor的实现
- 4.7 Poller模块
- 4.7.1 Poller的功能
- 4.7.2 Poller的实现
- 4.8 EventLoop模块
- 4.8.1 EventLoop的功能
- 4.8.2 EventLoop的实现
- 4.9 Connection模块
- 4.9.1 Connection的功能
- 4.9.2 Connection的实现
- 4.10 LoopThreadPool模块
- 4.10.1 LoopThreadPool的功能
- 4.10.2 LoopThread的功能
- 4.10.3 LoopThread的实现
- 4.10.4 LoopThreadPool的实现
- 4.11 TcpServer模块
- 4.11.1 TcpServer模块的功能
- 4.11.2 TcpServer模块的实现
- 5. Http协议模块
- 5.1 Util模块
- 5.2 HttpRequest模块
- 5.3 HttpResponse模块
- 5.4 HttpContext模块
- 5.5 HttpServer模块
- 6. 对服务器整体测试
- 6.1 长连接测试
- 6.2 超时连接测试
- 6.3 错误请求测试
- 6.4 业务处理超时测试
- 6.5 同时多条请求测试
- 6.6 大文件传输测试
1. 项目简介
1.1 环境介绍
- 服务器部署:Linux-Centos – 2核2G的腾讯云服务器。
- 项目用到的技术栈:C++、多线程、Epoll等。
1.2 项目定位
- 主(从)Reactor模型服务器,主Reactor线程只负责监听描述符,获取新建连接。这样就保证了新连接的获取比较高效,提高了服务器的并发性能。主Reactor获取到新连接后分发给子Reactor进行通信事件监控。
- 子(从)Reactor线程监控各自文件描述符下的读写事件,进行数据读写以及业务处理。
- One Thread One Loop的思想就是把所有的操作都放到线程中进行,一个线程对应一个EventLoop。
1.3 功能模块整体划分
项目实现目标:带有协议支持的Reactor模型高性能服务器。模块划分如下:
- Server模块:实现Reactor模型的TCP服务器。
- 协议模块:对于自主实现的Reactor模型服务器提供应用层协议支持,项目中支持的Http协议。
2. Reactor简介
2.1 Reactor模型分析
在高性能的I/O设计中,Reactor模型用于同步I/O。
优点:
- 响应快,不必为单个同步时间所阻塞(虽然Reactor本身依然是同步的);
- 可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销。
- 可扩展性:可以方便地通过增加Reactor实例个数来充分利用CPU资源。
- 可复用性,Reactor模型本身与具体事件处理逻辑无关,具有很高的复用性。
- Rector模型基于事件驱动,特别适合处理海量的I/O。
2.2 多Reactor多线程分析:多I/O多路复用+线程池(业务处理)
- 在主Reactor中处理新连接请求事件,有新连接到来则分发到⼦Reactor中监控。
- 在⼦Reactor中进⾏客⼾端通信监控,有事件触发,则接收数据分发给Worker线程池。
- Worker线程池分配独⽴的线程进⾏具体的业务处理。⼯作线程处理完毕后,将响应交给⼦Reactor线程进⾏数据响应。
- 优点:充分的利用了CPU多核资源,主从Reactor各自完成各自的任务。
3. 日志宏的编写
在实现各个模块之前先将日志实现方便打印找到程序问题所在:
#pragma once#include <iostream>#include <time.h>#include <stdarg.h>#include <sys/types.h>#include <sys/stat.h>#include <fcntl.h>#include <unistd.h>#include <stdlib.h>#define SIZE 1024#define Info 0#define Debug 1#define Warning 2#define Error 3#define Fatal 4#define Screen 1#define Onefile 2#define Classfile 3#define LogFile "log.txt"class Log{public: Log() { printMethod = Screen; path = "./log/"; } void Enable(int method) { printMethod = method; } std::string levelToString(int level) { switch (level) { case Info: return "Info"; case Debug: return "Debug"; case Warning: return "Warning"; case Error: return "Error"; case Fatal: return "Fatal"; default: return "None"; } } void printLog(int level, const std::string &logtxt) { switch (printMethod) { case Screen: std::cout << logtxt << std::endl; break; case Onefile: printOneFile(LogFile, logtxt); break; case Classfile: printClassFile(level, logtxt); break; default: break; } } void printOneFile(const std::string &logname, const std::string &logtxt) { std::string _logname = path + logname; int fd = open(_logname.c_str(), O_WRONLY | O_CREAT | O_APPEND, 0666); // "log.txt" if (fd < 0) return; write(fd, logtxt.c_str(), logtxt.size()); close(fd); } void printClassFile(int level, const std::string &logtxt) { std::string filename = LogFile; filename += "."; filename += levelToString(level); // "log.txt.Debug/Warning/Fatal" printOneFile(filename, logtxt); } ~Log() { } void operator()(int level, const char *format, ...) { time_t t = time(nullptr); struct tm *ctime = localtime(&t); char leftbuffer[SIZE]; snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%d-%d-%d %d:%d:%d]", levelToString(level).c_str(), ctime->tm_year + 1900, ctime->tm_mon + 1, ctime->tm_mday, ctime->tm_hour, ctime->tm_min, ctime->tm_sec); va_list s; va_start(s, format); char rightbuffer[SIZE]; vsnprintf(rightbuffer, sizeof(rightbuffer), format, s); va_end(s); // 格式:默认部分+自定义部分 char logtxt[SIZE * 2]; snprintf(logtxt, sizeof(logtxt), "%s %s", leftbuffer, rightbuffer); // printf("%s", logtxt); // 暂时打印 printLog(level, logtxt); }private: int printMethod; std::string path;};Log lg;
4. Server模块
该模块主要实现TCP服务器,该模块包含了:Buffer模块、TimeQueue模块、Any模块、Socket模块、Acceptor模块、Poller模块、Channel模块、Connection模块、LoopThreadPool模块、EventLoop模块、TcpServer模块。
以下将上述模块的基本功能实现。
4.1 Buffer模块
4.1.1 Buffer的功能
4.1.2 Buffer的实现思想
Buffer实现思想如下:
- 想要实现缓冲区首先要有一块内存空间,使用vector,vector的底层使用的就是一个线性的内存空间。
- 一个读偏移记录当前读取数据的位置。一个写偏移记录当前的写入位置。
- 写入数据:从写偏移的位置开始写入,如果后续空间足够直接写,反之就扩容:这里的扩容比较特殊,可以从整体空闲空间(当数据被读取,读偏移会向后移动,前面的空间是空闲的状态)和写偏移后的空闲空间两种情况考虑,如果整体空间足够,将现有数据移动到起始位置。如果不够,扩容,从当前写位置开始扩容足够的大小。数据写入成功后,写偏移记得向后偏移。
- 读取数据:从当前读偏移开始读取,前提是有数据可读。可读数据的大小–写偏移和读偏移之间的数据。
4.1.3 Buffer的实现
(1)文字描述Buffer类接口的具体设计:
(2)Buffer类的设计接口函数:
#include <vector>#include <cstdint> #define BUFFER_DEFAULT_SIZE 1024 // Buffer 默认起始大小class Buffer{public: Buffer() :_reader_idx(0) ,_writer_idx(0), _buffer(BUFFER_DEFAULT_SIZE) {} // 获取当前写入起始地址 void *WirtePosition(); // 获取当前读取起始地址 void *ReadPosition(); // 获取缓冲区末尾空闲空间大小--写偏移之后的空闲空间 uint64_t TailIdleSize(); // 获取缓冲区起始空闲空间大小--读偏移之前的空闲空间 uint64_t HeadIdleSize(); // 获取可读数据大小 uint16_t ReadAbleSize(); // 将读偏移向后移动 void MoveReadOffset(uint64_t len); // 将写偏移向后移动 void MoveWriteOffset(uint64_t len); // 确保可写空间足够(整体空闲空间够了就移动数据。否则就扩容) void EnsureWriteSpace(uint64_t len); // 写入数据 void Write(void *data, uint64_t len); // 读取数据 void Read(void *buf, uint64_t len); // 清空缓冲区 void Clear(); private: std::vector<char> _buffer; // 使用vector进行内存空间管理 uint64_t _reader_idx; // 读偏移 uint64_t _writer_idx; // 写偏移};
(3)Buffer类的接口函数实现:
#pragma once#include <iostream>#include <vector>#include <string>#include <cassert>#include <cstring>using std::cout;using std::endl;#define BUFFER_DEFAULT_SIZE 1024class Buffer{private: std::vector<char> _buffer; uint64_t _read_index; uint64_t _write_index;public: Buffer() :_read_index(0) ,_write_index(0) ,_buffer(BUFFER_DEFAULT_SIZE) {} char* Begin() { return &(*_buffer.begin()); } // 获取当前写入起始地址, _buffer的空间起始地址,加上写偏移量 char* WritePosition() { return Begin() + _write_index; } // 获取当前读取起始地址 char* ReadPosition() { return Begin() + _read_index; } // 获取缓冲区末尾空闲空间大小-->写偏移之后的空闲空间, 总体空间大小减去写偏移 uint64_t TailSize() { return _buffer.size() - _write_index; } // 获取缓冲区起始空闲空间大小-->读偏移之前的空闲空间 uint64_t BeginSize() { return _read_index; } // 获取可读数据大小 = 写偏移 - 读偏移 uint64_t ReadAbleSize(){ return _write_index - _read_index; } // 将读偏移向后移动 void MoveRead(uint64_t len) { assert(len <= ReadAbleSize()); _read_index += len; } // 将写偏移向后移动 void MoveWrite(uint64_t len) { assert(len <= TailSize()); _write_index += len; } // 确保可写空间足够(整体空闲空间够了就移动数据,否则就扩容) void EnsureWriteSpace(uint64_t len) { if(TailSize() >= len) { return; } else { if(TailSize() + BeginSize() >= len) { uint64_t size = ReadAbleSize(); std::copy(ReadPosition(), ReadPosition() + size, Begin()); //把可读数据拷贝到起始位置 _read_index = 0; _write_index = size; } else { _buffer.resize(_write_index + 3 * len); } } } // 写入数据 void Write(const void* data, uint64_t len) { if(len == 0) { return; } EnsureWriteSpace(len); const char* tmp = static_cast<const char*>(data); std::copy(tmp, tmp + len, WritePosition()); } void WriteAndPush(const void* data, uint64_t len) { Write(data, len); MoveWrite(len); } void WriteString(const std::string& data) { Write(data.c_str(), data.size()); } void WriteStringAndPush(const std::string& data) { WriteString(data); MoveWrite(data.size()); } void WriteBuffer(Buffer& data) { Write(data.ReadPosition(), data.ReadAbleSize()); } void WriteBufferAndPush(Buffer& data) { WriteBuffer(data); MoveWrite(data.ReadAbleSize()); } // 读取数据 void Read(void* buff, uint64_t len) { assert(ReadAbleSize() >= len); std::copy(ReadPosition(), ReadPosition() + len, (char*)buff); } void ReadAndPop(void* buff, uint64_t len) { Read(buff, len); MoveRead(len); } std::string ReadString(uint64_t len) { assert(len <= ReadAbleSize()); std::string str; str.resize(len); Read(&str[0], len); return str; } std::string ReadStringAndPop(uint64_t len) { assert(len <= ReadAbleSize()); std::string str = ReadString(len); MoveRead(len); return str; } //寻找换行字符 char* FindCRLF() { char* res = (char*)memchr(ReadPosition(), '/n', ReadAbleSize()); return res; } //通常获取一行数据,这种情况针对是 std::string GetLine() { char* pos = FindCRLF(); if(pos == nullptr) { return ""; } return ReadString(pos - ReadPosition() + 1); } std::string GetLineAndPop() { std::string str = GetLine(); MoveRead(str.size()); return str; } void Clear() { _read_index = 0; _write_index = 0; }};
4.2 Socket模块
4.2.1 Socket的功能
4.2.2 Socket的实现
(1)Socket类的设计接口函数:
// 套接字类#define MAX_LISTEN 1024class Sock{public: Sock(); Sock(int fd); ~Sock(); // 创建套接字 bool Socket(); // 绑定地址信息 bool Bind(const std::string &ip, uint64_t port); // 开始监听 bool Listen(int backlog = MAX_LISTEN); // 向服务器发起连接 bool Connect(const std::string &ip, uint64_t port); // 获取新连接 int Accept(); // 接收数据 ssize_t Recv(void* buf, size_t len, int flag = 0); // 0 阻塞 // 发送数据 ssize_t Send(void* buf, size_t len, int flag = 0); // 关闭套接字 void Close(); // 创建一个服务器连接 bool CreateServer(uint64_t port, const std::string &ip = "0.0.0.0"); // 接收全部 // 创建一个客户端连接 bool CreateClient(uint64_t port, const std::string &ip); // 设置套接字选项 -- 开启地址端口重用 void ReuseAddress(); // 设置套接字阻塞属性 -- 设置为非阻塞 void NonBlock(); private: int _sockfd;};
(2)Socket类接口函数实现:
#pragma once#include <iostream>#include <string>#include <unistd.h>#include <cstring>#include <sys/types.h>#include <sys/stat.h>#include <sys/socket.h>#include <arpa/inet.h>#include <netinet/in.h>#include "Log.hpp"enum{ SocketErr = 2, BindErr, ListenErr,};const int backlog = 10;class Sock{public: Sock() {} Sock(int sockfd) :_fd(sockfd) {} bool Socket() { _fd = socket(AF_INET, SOCK_STREAM, 0); if(_fd < 0) { std::cerr << "socket error" << std::endl; return false; } int opt = 1; setsockopt(_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)); return true; } bool Bind(const std::string& ip, const uint16_t& port) { sockaddr_in local; local.sin_family = AF_INET; local.sin_port = htons(port); local.sin_addr.s_addr = inet_addr(ip.c_str()); int n = bind(_fd, (const sockaddr*)&local, sizeof(local)); if(n < 0) { std::cerr << "bind error" << std::endl; return false; } return true; } bool Listen() { int n = listen(_fd, backlog); if(n < 0) { std::cerr << "listen error" << std::endl; return false; } return true; } int Accept(std::string& clientip, uint16_t& clientport) { sockaddr_in client; socklen_t len = 0; //std::cout << "sssss" << std::endl; int fd = accept(_fd, (sockaddr*)&client, &len); if(fd < 0) { std::cerr << "accept error" << std::endl; exit(-1); } char buffer[64]; inet_ntop(AF_INET, &client.sin_addr, buffer, sizeof(buffer)); clientip = buffer; clientport = ntohs(client.sin_port); return fd; } int Accept() { sockaddr_in client; socklen_t len = 0; //std::cout << "sssss" << std::endl; int fd = accept(_fd, (sockaddr*)&client, &len); if(fd < 0) { std::cerr << "accept error" << std::endl; exit(-1); } return fd; } bool Connect(const std::string& serverip, const uint16_t serverport) { sockaddr_in server; server.sin_family = AF_INET; server.sin_port = htons(serverport); server.sin_addr.s_addr = inet_addr(serverip.c_str()); int n = connect(_fd, (sockaddr*)&server, sizeof(server)); if(n < 0) { std::cerr << "conncet error" << std::endl; return false; } return true; } ssize_t Recv(void* buf, size_t len, int flag = 0) { ssize_t n = recv(_fd, buf, len, flag); if(n <= 0) { if(errno == EAGAIN || errno == EINTR) { return 0; //表示这次接收没有接收到数据 } lg(Fatal, "SOCKET RECV FAILED!!"); return -1; } return n; } ssize_t NonBlockRecv(void* buf, size_t len) { return Recv(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示当前接收为非阻塞。 } //发送数据 ssize_t Send(const void* buf, size_t len, int flag = 0) { ssize_t n = send(_fd, buf, len, flag); if(n <= 0) { if(errno == EAGAIN || errno == EINTR) { return 0; //表示这次接收没有接收到数据 } lg(Fatal, "SOCKET SEND FAILED!!"); } return n; } ssize_t NonBlockSend(void* buf, size_t len) { if (len == 0) { return 0; } return Send(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示当前发送为非阻塞。 } //创建一个服务端连接 bool CreateServer(uint16_t port, const std::string& ip = "0.0.0.0", bool block_flag = false) { if (Socket() == false) return false; if (block_flag) NonBlock(); if (Bind(ip, port) == false) return false; if (Listen() == false) return false; return true; } //创建一个客户端连接 bool CreateClient(uint16_t port, const std::string &ip) { //1. 创建套接字,2.指向连接服务器 if (Socket() == false) return false; if (Connect(ip, port) == false) return false; return true; } void Close() { close(_fd); } int Getfd() { return _fd; } //设置套接字阻塞属性-- 设置为非阻塞 void NonBlock() { //int fcntl(int fd, int cmd, ... /* arg */ ); int flag = fcntl(_fd, F_GETFL, 0); fcntl(_fd, F_SETFL, flag | O_NONBLOCK); } ~Sock() { Close(); }private: int _fd;};
4.3 TimeQueue模块
4.3.1 TimeQueue的功能
模块介绍:实现固定时间,执行定时任务的模块 — 定时任务管理器。向该模块添加一个任务,任务将在固定时间后被执行,同时也可以对定时任务进行刷新,延迟该任务执行,当然也可以通过接口取消定时任务。
时间轮是一种 实现延迟功能(定时器)的巧妙算法。如果一个系统存在大量的任务调度,时间轮可以高效的利用线程资源来进行批量化调度。把大批量的调度任务全部都绑定时间轮上,通过时间轮进行所有任务的管理,触发以及运行。能够高效地管理各种延时任务,周期任务,通知任务等。
4.3.2 时间轮的思想
- 如上图所示,时间轮的实现通过定义数组模拟,并且有一个秒针指向数组的起始位置,这个指针向后走,走到哪里代表哪里的任务要被执行,假设我们要一个任务5秒后执行,只需要将任务添加到_second_hand + 5的位置,秒针每秒向后走一步,5秒后秒针指向对应的位置,定时任务执行。
- 需要注意的是,在同一时间,可能会有大批量的定时任务。因此我们只需要在数组对应的位置下拉一个数组即可。这样就可以在同一时刻添加多个任务了。
4.3.3 定时器任务TimerTask类
(1)类的具体功能:
(2)具体实现如下:
using TaskFunc = std::function<void()>;using ReleaseFunc = std::function<void()>;class TimerTask{private: uint64_t _id; // 定时器任务对象ID uint32_t _timeout; //定时任务的超时时间 bool _canceled; // false-表示没有被取消, true-表示被取消 TaskFunc _task_cb; //定时器对象要执行的定时任务 ReleaseFunc _release; //用于删除TimerWheel中保存的定时器对象信息public: TimerTask(uint64_t id, uint32_t delay, const TaskFunc &cb) :_id(id) ,_timeout(delay) ,_task_cb(cb) ,_canceled(false) {} ~TimerTask() { if(_canceled == false) { _task_cb(); } _release(); } void Cancel() { _canceled = true; } void SetRelease(const ReleaseFunc &cb) { _release = cb; } uint32_t DelayTime() { return _timeout; }};
4.3.4 TimerWheel 类
(1)类的具体功能:
- 该模块主要是对Connection对象的生命周期进行管理,对非活跃连接进行超时后的释放。
- 该模块内部包含有一个timerfd。以下是timefd相关函数的认识:
- 该模块内部含有一个Channel对象:实现对timerfd的事件就绪回调处理。
(2)具体实现如下:
class TimerWheel{private: using WeakTask = std::weak_ptr<TimerTask>; using PtrTask = std::shared_ptr<TimerTask>; int _tick; // 当前的秒针,走到哪里释放哪里,释放哪里,就相当于执行哪里的任务 int _capacity; // 表盘最大数量---其实就是最大延迟时间 std::vector<std::vector<PtrTask>> _wheel; std::unordered_map<uint64_t, WeakTask> _timers; EventLoop *_loop; int _timerfd; // 定时器描述符--可读事件回调就是读取计数器,执行定时任务 std::unique_ptr<Channel> _timer_channel;private: void RemoveTimer(uint64_t id) { auto it = _timers.find(id); if (it != _timers.end()) { _timers.erase(it); } } static int CreateTimerfd() { int timerfd = timerfd_create(CLOCK_MONOTONIC, 0); if (timerfd < 0) { lg(Fatal, "TIMERFD CREATE FAILED!"); abort(); } // int timerfd_settime(int fd, int flags, struct itimerspec *new, struct itimerspec *old); struct itimerspec itime; itime.it_value.tv_sec = 1; itime.it_value.tv_nsec = 0; // 第一次超时时间为1s后 itime.it_interval.tv_sec = 1; itime.it_interval.tv_nsec = 0; // 第一次超时后,每次超时的间隔时 timerfd_settime(timerfd, 0, &itime, NULL); return timerfd; } int ReadTimefd() { uint64_t times; // 有可能因为其他描述符的事件处理花费事件比较长,然后在处理定时器描述符事件的时候,有可能就已经超时了很多次 // read读取到的数据times就是从上一次read之后超时的次数 int ret = read(_timerfd, ×, 8); if (ret < 0) { lg(Fatal, "READ TIMEFD FAILED!"); abort(); } return times; } // 这个函数应该每秒钟被执行一次,相当于秒针向后走了一步 void RunTimerTask() { _tick = (_tick + 1) % _capacity; _wheel[_tick].clear(); // 清空指定位置的数组,就会把数组中保存的所有管理定时器对象的shared_ptr释放掉 } void OnTime() { // 根据实际超时的次数,执行对应的超时任务 int times = ReadTimefd(); for (int i = 0; i < times; i++) { RunTimerTask(); } } void TimerAddInLoop(uint64_t id, uint32_t delay, const TaskFunc &cb) { PtrTask pt(new TimerTask(id, delay, cb)); pt->SetRelease(std::bind(&TimerWheel::RemoveTimer, this, id)); int pos = (_tick + delay) % _capacity; _wheel[pos].push_back(pt); _timers[id] = WeakTask(pt); } void TimerRefreshInLoop(uint64_t id) { // 通过保存的定时器对象的weak_ptr构造一个shared_ptr出来,添加到轮子中 auto it = _timers.find(id); if (it == _timers.end()) { return; // 没找着定时任务,没法刷新,没法延迟 } PtrTask pt = it->second.lock(); // lock获取weak_ptr管理的对象对应的shared_ptr int delay = pt->DelayTime(); int pos = (_tick + delay) % _capacity; _wheel[pos].push_back(pt); } void TimerCancelInLoop(uint64_t id) { auto it = _timers.find(id); if (it == _timers.end()) { return; // 没找着定时任务,没法刷新,没法延迟 } PtrTask pt = it->second.lock(); if (pt) pt->Cancel(); }public: TimerWheel(EventLoop *loop) :_capacity(60) ,_tick(0) ,_wheel(_capacity) ,_loop(loop) ,_timerfd(CreateTimerfd()) ,_timer_channel(new Channel(_loop, _timerfd)) { _timer_channel->SetReadCallback(std::bind(&TimerWheel::OnTime, this)); _timer_channel->EnableRead(); // 启动读事件监控 } /*定时器中有个_timers成员,定时器信息的操作有可能在多线程中进行,因此需要考虑线程安全问题*/ /*如果不想加锁,那就把对定期的所有操作,都放到一个线程中进行*/ void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb); // 刷新/延迟定时任务 void TimerRefresh(uint64_t id); void TimerCancel(uint64_t id); /*这个接口存在线程安全问题--这个接口实际上不能被外界使用者调用,只能在模块内,在对应的EventLoop线程内执行*/ bool HasTimer(uint64_t id) { auto it = _timers.find(id); if (it == _timers.end()) { return false; } return true; }};void TimerWheel::TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb) { _loop->RunInLoop(std::bind(&TimerWheel::TimerAddInLoop, this, id, delay, cb));}//刷新/延迟定时任务void TimerWheel::TimerRefresh(uint64_t id) { _loop->RunInLoop(std::bind(&TimerWheel::TimerRefreshInLoop, this, id));}void TimerWheel::TimerCancel(uint64_t id){ _loop->RunInLoop(std::bind(&TimerWheel::TimerCancelInLoop, this, id));}
4.4 Any模块
4.4.1 Any的功能
- Connection模块中需要设置协议处理的上下⽂来控制处理节奏。但是应⽤层协议有很多,这个协议接收解析上下⽂就不能有明显的协议倾向,它可以是任意协议的上下⽂信息,因此就需要⼀个通⽤的类型来保存各种不同的数据结构。
- Any内部设计⼀个模板容器holder类,可以保存各种类型数据。因为在Any类中⽆法定义这个holder对象或指针,因为Any也不知道这个类要保存什么类型的数据,因此⽆法传递类型参数。所以,定义⼀个基类placehoder,让holder继承于placeholde,⽽Any类保存⽗类指针即可。当需要保存数据时,则new⼀个带有模板参数的⼦类holder对象出来保存数据,然后让Any类中的⽗类指针,指向这个⼦类对象就搞定了。
4.4.2 Any的实现
具体实现如下:
class Any{private: class holder { public: virtual ~holder() {} virtual const std::type_info &type() = 0; virtual holder *clone() = 0; }; template <class T> class placeholder : public holder { public: placeholder(const T &val) : _val(val) {} // 获取子类对象保存的数据类型 virtual const std::type_info &type() { return typeid(T); } // 针对当前的对象自身,克隆出一个新的子类对象 virtual holder *clone() { return new placeholder(_val); } public: T _val; }; holder *_content;public: Any() : _content(NULL) {} template <class T> Any(const T &val) : _content(new placeholder<T>(val)) {} Any(const Any &other) : _content(other._content ? other._content->clone() : NULL) {} ~Any() { delete _content; } Any &swap(Any &other) { std::swap(_content, other._content); return *this; } // 返回子类对象保存的数据的指针 template <class T> T *get() { // 想要获取的数据类型,必须和保存的数据类型一致 assert(typeid(T) == _content->type()); return &((placeholder<T> *)_content)->_val; } // 赋值运算符的重载函数 template <class T> Any &operator=(const T &val) { // 为val构造一个临时的通用容器,然后与当前容器自身进行指针交换,临时对象释放的时候,原先保存的数据也就被释放 Any(val).swap(*this); return *this; } Any &operator=(const Any &other) { Any(other).swap(*this); return *this; }};
4.5 Channel模块
4.5.1 Channel的功能
模块介绍:该模块的主要功能是对每一个描述符上的IO事件进行管理,实现对描述符可读,可写,错误等事件的管理操作。以及,Poller模块对描述符进行IO事件监控 的事件就绪后,根据事件,回调不同的函数。
4.5.2 Channel的实现
(1)Channel类的设计接口函数:
class Channel{public: Channel(); void SetReadCallback(const EventCallback &cb); void SetWriteCallback(const EventCallback &cb); void SetErrorCallback(const EventCallback &cb); void SetCloseCallback(const EventCallback &cb); void SetEventCallback(const EventCallback &cb); bool ReadAble(); // 当前是否监控了可读 bool WriteAble(); // 当前是否监控了可写 void EnableRead(); // 启动读事件监控 void EnableWrite(); // 启动写事件监控 void DisableRead(); // 关闭读事件监控 void DisableWrite(); // 关闭写事件监控 void Remove(); // 移除监控 void HandleEvent(); // 事件处理,一旦触发了事件,就调用这个函数,自己触发了什么事件如何处理自己决定 private: int _fd; EventLoop* _loop; uint32_t _events; // 当前需要监控的事件 uint32_t _revents; // 当前连接触发的事件 using EventCallback = std::function<void()>; EventCallback _read_callback; // 可读事件被触发的回调函数 EventCallback _write_callback; // 可写事件被触发的回调函数 EventCallback _error_callback; // 错误事件被触发的回调函数 EventCallback _close_callback; // 连接断开事件被触发的回调函数 EventCallback _event_callback; // 任意事件被触发的回调函数};
(2)Channel类接口函数实现:
class Poller;class EventLoop;class Channel{private: using EventCallback = std::function<void()>; EventCallback _read_callback; //读事件 EventCallback _write_callback; //写事件 EventCallback _error_callback; //错误事件 EventCallback _close_callback; //连接断开事件 EventCallback _event_callback; //任意事件 int _sockfd; uint32_t _events; //监控事件 uint32_t _revents; //连接触发事件 EventLoop* _loop;public: Channel(EventLoop* loop, int sockfd) :_sockfd(sockfd) ,_events(0) ,_revents(0) ,_loop(loop) {} int Getfd(){ return _sockfd; } //获取想要监控的事件 uint32_t GetEvents() { return _events; } //设置实际就绪的事件 void SetEvents(uint32_t events) { _revents = events; } void SetReadCallback(const EventCallback& cb) { _read_callback = cb; } void SetWriteCallback(const EventCallback& cb) { _write_callback = cb; } void SetErrorCallback(const EventCallback& cb) { _error_callback = cb; } void SetCloseCallback(const EventCallback& cb) { _close_callback = cb; } void SetEventCallback(const EventCallback& cb) { _event_callback = cb; } //当前是否监控了可读 bool Readable() { return (_events & EPOLLIN); } //当前是否监控了可写 bool Writeable() { return (_events & EPOLLOUT); } void EnableRead() { _events |= EPOLLIN; UpData(); } void EnableWrite() { _events |= EPOLLOUT; UpData(); } void DisableRead() { _events &= ~EPOLLIN; UpData(); } void DisableWrite() { _events &= ~EPOLLOUT; UpData(); } void DisableAll() { _events = 0; UpData(); } void Remove(); void UpData(); void HanderEvent() { if((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)) { if(_read_callback) { _read_callback(); } } if(_revents & EPOLLOUT) { if(_write_callback) { _write_callback(); } } else if(_revents & EPOLLERR) { if(_error_callback) { _error_callback(); } } else if(_revents & EPOLLHUP) { if(_close_callback) { _close_callback(); } } if(_event_callback) { _event_callback(); } }};void Channel::UpData() { _loop->UpdateEvent(this); }void Channel::Remove() { _loop->RemoveEvent(this); }
Channel模块需要和EventLoop模块一起才可以进行测试,因此Channel类的代码只进行了编译测试,测试发现并没有语法上的错误,后面有什么逻辑的错误只能等后续完善该类的时候再来进行测试了。
4.6 Acceptor模块
4.6.1 Acceptor的功能
(1)模块介绍:对Socket和Channel模块的整体封装,实现对一个监听套接字的整体管理。
- 该模块中包含一个Socket对象,实现监听套接字的操作。
- 该模块中包含一个Channel对象,实现监听套接字IO事件就绪的处理。
(2)Accept模块处理流程:
- 向Channel提供可读事件的IO事件处理回调函数 — 获取新连接。
- 为新连接构建一个Connection对象,通过该对象设置各种回调。
4.6.2 Acceptor的实现
(1)Acceptor类的设计接口函数:
// 监听套接字管理类class Acceptor{public: Acceptor(); void SetAcceptCallback(); private: /*监听套接字的读事件回调处理函数 -- 获取新连接,调用_accept_callback函数进行新连接管理*/ void HandleRead(); //启动服务器 int CreateServer(int port) Sock _socket; // 用于创建监听套接字 EventLoop *_loop; // 用于对监听套接字进行事件监控 Channel _channel; // 用于对监控套接字进行事件管理 using AcceptCallback = std::function<void(int)>; AcceptCallback _accept_callback;};
(2)Acceptor类接口函数实现:
class Acceptor{private: Sock _socket; EventLoop* _loop; Channel _channel; using AcceptCallback = std::function<void(int)>; AcceptCallback _accept_callback;private: /*监听套接字的读事件回调处理函数---获取新连接,调用_accept_callback函数进行新连接处理*/ void HandleRead() { int newfd = _socket.Accept(); if (newfd < 0) { return ; } if (_accept_callback) { _accept_callback(newfd); } } int Createsockfd(int port) { bool ret = _socket.CreateServer(port); assert(ret == true); return _socket.Getfd(); }public: Acceptor(EventLoop* loop, int port) :_loop(loop) ,_socket(Createsockfd(port)) ,_channel(loop, _socket.Getfd()) { _channel.SetReadCallback(std::bind(&Acceptor::HandleRead, this)); } void SetAcceptCallback(const AcceptCallback &cb) { _accept_callback = cb; } void Listen() { _channel.EnableRead(); }};
4.7 Poller模块
4.7.1 Poller的功能
模块介绍:对epoll进行封装,主要实现epoll的IO事件添加,修改,移除,获取活跃连接功能。
4.7.2 Poller的实现
(1)文字描述Poller类接口的具体设计:
(2)Poller类的设计接口函数:
//Poller描述符监控类#define MAX_EPOLLEVENTSclass Poller{public: Poller(); // 添加或修改监控事件 void UpdateEvent(Channel *channel); // 移除监控 void RemoveEvent(Channel *channel); // 开始监控, 返回活跃连接 void Poll(std::vector<Channel*> *active); private: // 对epoll的直接操作 void Update(Channel *channel, int op); bool HasChannel(Channel *Channel); private: int _epfd; struct epoll_event _evs[MAX_EPOLLEVENTS]; std::unordered_map<int, Channel *> _channels;};
(3)Poller类接口函数实现:
#define MAX_EPOLLEVENTS 1024class Poller{private: int _epfd; epoll_event _revs[MAX_EPOLLEVENTS]; std::unordered_map<int, Channel*> _channels; bool HasChannel(Channel* channel) { auto iter = _channels.find(channel->Getfd()); if(iter == _channels.end()) { return false; } return true; } void UpData(Channel* channel, int op) { epoll_event ev; ev.events = channel->GetEvents(); ev.data.fd = channel->Getfd(); int n = epoll_ctl(_epfd, op, channel->Getfd(), &ev); if(n < 0) { lg(Fatal, "epoll_ctl error!"); } }public: Poller() { _epfd = epoll_create(MAX_EPOLLEVENTS); if(_epfd < 0) { lg(Fatal, "epoll_create1 error!"); abort(); } } void UpDataEvent(Channel* channel) { bool ret = HasChannel(channel); if(ret == false) { _channels.insert(std::make_pair(channel->Getfd(), channel)); return UpData(channel, EPOLL_CTL_ADD); } UpData(channel, EPOLL_CTL_MOD); } void RemoveEvent(Channel* channel) { auto iter = _channels.find(channel->Getfd()); if (iter != _channels.end()) { _channels.erase(iter); } UpData(channel, EPOLL_CTL_DEL); } void Poll(std::vector<Channel*>& active) { int n = epoll_wait(_epfd, _revs, MAX_EPOLLEVENTS, -1); if(n < 0) { if(errno == EINTR) { return; } lg(Fatal, "epoll_wait error!"); abort(); } else { for(int i = 0; i < n; i++) { auto iter = _channels.find(_revs[i].data.fd); assert(iter != _channels.end()); iter->second->SetEvents(_revs[i].events); active.push_back(iter->second); } } }};
4.8 EventLoop模块
4.8.1 EventLoop的功能
模块介绍:EventLoop模块对Poller,TimerQueue,Socket模块进行了封装。也是Reactor模型模块。
4.8.2 EventLoop的实现
(1)文字描述EventLoop类接口的具体设计:
- EventLoop模块必须是一个对象对应一个线程,线程内部运行EventLoop的启动函数。
- EventLoop模块为了保证整个服务器的线程安全问题,因此要求使用者对于Connection的所有操作一定要在其对应的EventLoop线程内完成。
- EventLoop模块保证自己内部所监控的所有描述符都要是活跃连接,非活跃连接就要及时的释放避免资源浪费。
- EventLoop模块内部包含一个eventfd:内核提供的事件fd,专门用于事件通知。
- EventLoop模块内部含有一个Poller对象,用于进行描述符的IO事件管理。
- EventLoop模块内部包含有一个TimeQueue对象,用于进行定时任务的管理。
- EventLoop模块中包含一个任务队列,组件使用者要对Connection进行的所有操作,都加入到任务队列中并由EventLoop模块进行管理,并在EventLoop对应的线程中进行执行。
- 每一个Connection对象都会绑定到一个EventLoop上,这样一来对连接的所有操作就能保证在一个线程中进行。
- 通过Poller模块对当前模块管理内的所有描述符进行IO事件监控,当有描述符事件就绪后,通过描述符对应的Channel进行事件的处理。
- 所有就绪的描述符IO事件处理完毕后,对任务队列中的所有操作进行顺序执行。
- epoll的事件监控,有可能会因为没有事件到来而持续阻塞。导致任务队列中的任务不能得到及时的处理。对此的处理方式是创建一个eventfd,添加到Poller的事件监控中,每当向任务队列添加任务的时候,通过向eventdf写入数据来唤醒epoll的阻塞。
(2)EventLoop类的设计接口函数:
// EventLoop事件监控处理类class EventLoop{public: using Functor = std::function<void()>; EventLoop(); void RunInLoop(const Functor &cb); // 判断将要执行的任务是否处于当前线程中,如果是则执行,否则压入队列 void QueueInLoop(const Functor &cb); // 将操作压入任务池 bool IsInLoop(); // 用于判断当前线程是否是EventLoop对应的线程 void UpdateEvent(Channel *channel); // 添加/修改描述符的事件监控 void RemoveEvent(Channel *channel); // 移除描述符的监控 void Start(); // 三步走--事件监控-》就绪事件处理-》执行任务 void RunAllTask(); private: std::thread::id _thread_id; //线程ID int _event_fd; //eventfd唤醒IO事件监控有可能导致的阻塞 std::unique_ptr<Channel> _event_channel; Poller _poller; std::vector<Functor> _tasks; std::mutex _mutex; //实现任务池操作的线程安全 TimerWheel _timer_wheel; //定时器模块};
(3)EventLoop类接口函数实现:
class EventLoop{private: using Functor = std::function<void()>; std::thread::id _thread_id; int _event_fd; std::unique_ptr<Channel> _event_channel; Poller _poller; std::vector<Functor> _tasks; std::mutex _mutex; //实现任务池操作的线程安全 TimerWheel _timer_wheel;//定时器模块public: EventLoop() :_event_fd(CreateEventfd()) ,_event_channel(new Channel(this, _event_fd)) ,_thread_id(std::this_thread::get_id()) ,_timer_wheel(this) { _event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventfd, this)); _event_channel->EnableRead(); } void Start() { while(1) { std::vector<Channel*> actives; _poller.Poll(actives); for(auto& channel : actives) { channel->HanderEvent(); } RunAllTask(); } } //用于判断当前线程是否是EventLoop对应的线程; bool IsInLoop() { return (_thread_id == std::this_thread::get_id()); } void AssertInLoop() { assert(_thread_id == std::this_thread::get_id()); } void RunInLoop(const Functor& cb) { if(IsInLoop()) { cb(); } else { QueueInLoop(cb); } } void QueueInLoop(const Functor& cb) { { std::unique_lock<std::mutex> _lock(_mutex); _tasks.push_back(cb); } WeakEventfd(); } //添加/修改描述符的事件监控 void UpdateEvent(Channel* channel) { _poller.UpDataEvent(channel); } void RemoveEvent(Channel* channel) { _poller.RemoveEvent(channel); } void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb) { return _timer_wheel.TimerAdd(id, delay, cb); } void TimerRefresh(uint64_t id) { return _timer_wheel.TimerRefresh(id); } void TimerCancel(uint64_t id) { return _timer_wheel.TimerCancel(id); } bool HasTimer(uint64_t id) { return _timer_wheel.HasTimer(id); }public: void RunAllTask() { std::vector<Functor> functor; { std::unique_lock<std::mutex> _lock(_mutex); _tasks.swap(functor); } for (auto& f : functor) { f(); } } static int CreateEventfd() { int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); if(efd < 0) { lg(Fatal, "eventfd error!"); abort(); } return efd; } void ReadEventfd() { uint64_t res = 0; int ret = read(_event_fd, &res, sizeof(res)); if(ret < 0) { //EINTR -- 被信号打断; EAGAIN -- 表示无数据可读 if (errno == EINTR || errno == EAGAIN) { return; } lg(Fatal, "READ EVENTFD FAILED!"); abort(); } } void WeakEventfd() { uint64_t val = 1; int ret = write(_event_fd, &val, sizeof(val)); if(ret < 0) { //EINTR -- 被信号打断; EAGAIN -- 表示无数据可读 if (errno == EINTR || errno == EAGAIN) { return; } lg(Fatal, "write EVENTFD FAILED!"); abort(); } }};
4.9 Connection模块
4.9.1 Connection的功能
模块介绍:该模块是一个对Buffer/Socket/Channel模块的整体封装,实现了对套接字的整体管理。每一个进行数据通信的套接字(accept获取到的新连接)都会构造一个Connetction对象进行管理。
4.9.2 Connection的实现
(1)模块分析:
- 该模块内部包含由组件使用者提供的回调函数:连接建立完成回调,事件回调,新数据回调,关闭回调。
- 该模块包含两个组件使用者提供的接口,数据发送接口和连接关闭接口。
- 该模块中包含两个用户态缓冲区:用户态接收缓冲区和用户态发送缓冲区。
- 该模块中包含一个Socket对象,完成描述符面向系统的IO操作。
- 该模块中包含一个Channel对象,完成描述符IO事件就绪的处理。
(2)该模块的处理流程:
- 向Channel提供可读,可写,错误等不同事件的IO事件回调函数,将Channel和对应的描述符添加到Poller事件监控中。
- 当描述符在Poller模块中就绪了IO可读事件后,调用该描述符对应Channel中保存的读事件处理函数,进行数据读取,读取的过程本质上是将socket接收缓冲区中的数据 读到Connection管理的用户态接收数据中。
- 业务处理完毕后,通过Connection提供的数据发送接口,将数据写入到Connection的发送缓冲区中。
- 启动描述符在Poll模块中的IO事件监控,事件就绪后,调用Channel中保存的写事件处理函数,将发送缓冲区中的数据通过Sockert进行数据的真正发送。
(3)Connection类的设计接口函数:
// DISCONECTED -- 连接关闭状态 CONNECTING -- 连接建立成功-待处理状态// CONNECTED -- 连接建立完成,各种设置已完成,可以通信状态 DISCONNECTING -- 待关闭状态typedef enum{ DISCONECTED, CONNECTING, CONNECTED, DISCONNECTING} ConnStatu;using PtrConnection = std::shared_ptr<Connection>;class Connection{public: Connection(EventLoop *loop, uint64_t conn_id, int sockfd); ~Connection(); int Fd(); // 获取管理的文件描述符 int Id(); // 获取连接ID bool Connected(); // 是否处于CONNECTED状态 void SetContext(const Any &context); // 设置上下文--连接建立完成时进行调用 Any *GetContext(); // 获取上下文,返回的是指针 void SetConnectedCallback(const ConnectedCallback &cb); void SetMessageCallback(const MessageCallback &cb); void SetClosedCallback(const ClosedCallback &cb); void SetAnyEventCallback(const AnyEventCallback &cb); void Established(); // 连接建立就绪后,进行channel回调设置,启动读监控,调用_connect_callback void Send(char *data, size_t len); // 发送数据,将数据发送到发送缓冲区,启动写事件监控 void Shutdown(); // 提供给组件使用者的关闭接口--并不实际关闭,需要判断有没有数据待处理 void EnableInactiveRelease(int sec); // 启动非活跃销毁,并定义多长时间无通信就是非活跃,添加定时任务 void CancelInactiveRelease(); // 取消非活跃销毁 // 切换协议--重置上下文以及阶段性处理函数 void Upgrade(const Context, const ConnectedCallback &conn, const ClosedCallback &closed, const AnyEventCallback &event);private: /*五个channel的事件回调函数*/ void HandleRead(); // 描述符可读事件触发后调用的函数,接收socket数据放到接收缓冲区中,然后调用_message_callback void HandleWrite(); // 描述符可写事件触发后调用的函数,将发送缓冲区中的数据进行发送 void HandleClose(); // 描述符触发挂断事件 void HandleError(); // 描述符触发出错事件 void HandleEvent(); // 描述符触发任意事件 void EstablishedInLoop(); // 连接获取之后,所处的状态要进行各种设置(给channel设置事件回调,启动读监控) void ReleaseInLoop(); // 这个接口才是实际的释放接口 void SendInLoop(char *data, size_t len); void ShutdownInLoop(); void EnableInactiveReleaseInLoop(int sec); void CancelInactiveReleaseInLoop(); void UpgradeInLoop(const Context &context, const ConnectedCallback &conn, const MessageCallback &msg, const ClosedCallback &closed, const AnyEventCallback &event);private: uint64_t _conn_id; // 连接的唯一ID,便于连接的管理和查找 // uint64_t _timer_id; // 定时器ID,必须是唯一的,这块是为了简化操作使用conn_id作为定时器 int _sockfd; // 连接关联的文件描述符 bool _enable_inactive_release; // 连接是否启动非活跃的判断标志,默认为false EventLoop *_loop; // 连接所关联的一个EventLoop ConnStatu _statu; // 连接状态 Socket _socket; // 套接字操作管理 Channel _channel; // 连接的事件管理 Buffer _in_buffer; // 输入缓冲区--存放从socket中读取到的数据 Buffer _out_buffer; // 输出缓冲区--存放要发送给对端的数据 Any _context; // 请求的接收处理上下文 /*这四个回调函数,是让服务器模块来设置的(其实服务器模块的处理回调也是组件使用者设置的)*/ /*换句话来说,这几个回调都是组件使用者使用的*/ using ConnectedCallback = std::function<void(const PtrConnection &)>; using MessageCallback = std::function<void(const PtrConnection &, Buffer *)>; using ClosedCallback = std::function<void(const PtrConnection &)>; using AnyEventCallback = std::function<void(const PtrConnection &)>; ConnectedCallback _connected_callback; MessageCallback _message_callback; ClosedCallback _closef_callback; AnyEventCallback _event_callback; /*组件内的连接关闭回调--组件内设置的,因为服务器组件内会把所有的连接管理起来,一旦某个连接要关闭*/ /*就应该从管理的地方移除掉自己的信息*/ ClosedCallback _server_closed_callback;};
(4)Connection类接口函数实现:
class Connection;//DISCONECTED -- 连接关闭状态; CONNECTING -- 连接建立成功-待处理状态//CONNECTED -- 连接建立完成,各种设置已完成,可以通信的状态; DISCONNECTING -- 待关闭状态typedef enum { DISCONNECTED, CONNECTING, CONNECTED, DISCONNECTING}ConnStatu;using PtrConnection = std::shared_ptr<Connection>;class Connection : public std::enable_shared_from_this<Connection>{private: uint64_t _conn_id; // 连接的唯一ID,便于连接的管理和查找 int _sockfd; bool _enable_inactive_release; // 连接是否启动非活跃销毁的判断标志,默认为false EventLoop* _loop; // 连接所关联的一个EventLoop ConnStatu _statu; Sock _socket; Channel _channel; Buffer _in_buffer; Buffer _out_buffer; Any _context; //这几个回调都是组件使用者使用的 using ConnectedCallback = std::function<void(const PtrConnection &)>; using MessageCallback = std::function<void(const PtrConnection &, Buffer *)>; using ClosedCallback = std::function<void(const PtrConnection &)>; using AnyEventCallback = std::function<void(const PtrConnection &)>; ConnectedCallback _connected_callback; MessageCallback _message_callback; ClosedCallback _closed_callback; AnyEventCallback _event_callback; //组件内的连接关闭回调--组件内设置的,因为服务器组件内会把所有的连接管理起来,一旦某个连接要关闭 //就应该从管理的地方移除掉自己的信息 ClosedCallback _server_closed_callback;private: void HanderRead() { char buf[65536]; ssize_t ret = _socket.NonBlockRecv(buf, sizeof(buf)); if(ret < 0) { return ShutdownInLoop(); } _in_buffer.WriteAndPush(buf, ret); if(_in_buffer.ReadAbleSize() > 0) { return _message_callback(shared_from_this(), &_in_buffer); } } void HanderWrite() { ssize_t ret = _socket.NonBlockSend(_out_buffer.ReadPosition(), _out_buffer.ReadAbleSize()); if(ret < 0) { // 发送错误就该关闭连接了, if (_in_buffer.ReadAbleSize() > 0) { _message_callback(shared_from_this(), &_in_buffer); } return Release(); // 这时候就是实际的关闭释放操作了。 } _out_buffer.MoveRead(ret); if (_out_buffer.ReadAbleSize() == 0) { _channel.DisableWrite(); // 没有数据待发送了,关闭写事件监控 // 如果当前是连接待关闭状态,则有数据,发送完数据释放连接,没有数据则直接释放 if (_statu == DISCONNECTING) { return Release(); } } } void HanderClose() { //一旦连接挂断了,套接字就什么都干不了了,因此有数据待处理就处理一下,完毕关闭连接 if (_in_buffer.ReadAbleSize() > 0) { _message_callback(shared_from_this(), &_in_buffer); } return Release(); } void HanderError() { HanderClose(); } //描述符触发任意事件: 1. 刷新连接的活跃度--延迟定时销毁任务; 2. 调用组件使用者的任意事件回调 void HandleEvent() { if(_enable_inactive_release == true) { _loop->TimerRefresh(_conn_id); } if(_event_callback) { _event_callback(shared_from_this()); } } //连接获取之后,所处的状态下要进行各种设置(启动读监控,调用回调函数) void EstablishedInLoop() { assert(_statu == CONNECTING);//当前的状态必须一定是上层的半连接状态 _statu = CONNECTED; _channel.EnableRead(); if(_connected_callback) { _connected_callback(shared_from_this()); } } //这个接口才是实际的释放接口 void ReleaseInLoop() { _statu = DISCONNECTED; _channel.Remove(); _socket.Close(); // 如果当前定时器队列中还有定时销毁任务,则取消任务 if (_loop->HasTimer(_conn_id)) CancelInactiveReleaseInLoop(); // 调用关闭回调函数,避免先移除服务器管理的连接信息导致Connection被释放,再去处理会出错,因此先调用用户的回调函数 if (_closed_callback) _closed_callback(shared_from_this()); // 移除服务器内部管理的连接信息 if (_server_closed_callback) _server_closed_callback(shared_from_this()); } //这个接口并不是实际的发送接口,而只是把数据放到了发送缓冲区,启动了可写事件监控 void SendInLoop(Buffer &buf) { if(_statu == DISCONNECTED) { return; } _out_buffer.WriteBufferAndPush(buf); if (_channel.Writeable() == false) { _channel.EnableWrite(); } } //这个关闭操作并非实际的连接释放操作,需要判断还有没有数据待处理,待发送 void ShutdownInLoop() { _statu = DISCONNECTING;// 设置连接为半关闭状态 if(_in_buffer.ReadAbleSize() > 0) { if(_message_callback) { _message_callback(shared_from_this(), &_in_buffer); } } //要么就是写入数据的时候出错关闭,要么就是没有待发送数据,直接关闭 if(_out_buffer.ReadAbleSize() > 0) { if(_channel.Writeable() == false) { _channel.EnableWrite(); } } if(_out_buffer.ReadAbleSize() == 0) { Release(); } } //启动非活跃连接超时释放规则 void EnableInactiveReleaseInLoop(int sec) { // 1. 将判断标志 _enable_inactive_release 置为true _enable_inactive_release = true; // 2. 如果当前定时销毁任务已经存在,那就刷新延迟一下即可 if(_loop->HasTimer(_conn_id)) { return _loop->TimerRefresh(_conn_id); } // 3. 如果不存在定时销毁任务,则新增 _loop->TimerAdd(_conn_id, sec, std::bind(&Connection::Release, this)); } void CancelInactiveReleaseInLoop() { _enable_inactive_release = false; if(_loop->HasTimer(_conn_id)) { _loop->TimerCancel(_conn_id); } } void UpgradeInLoop(const Any &context, const ConnectedCallback &conn, const MessageCallback &msg, const ClosedCallback &closed, const AnyEventCallback &event) { _context = context; _connected_callback = conn; _message_callback = msg; _closed_callback = closed; _event_callback = event; }public: Connection(EventLoop* loop, uint64_t conn_id, int sockfd) :_conn_id(conn_id) ,_sockfd(sockfd) ,_enable_inactive_release(false) ,_loop(loop) ,_statu(CONNECTING) ,_socket(sockfd) ,_channel(loop, _sockfd) { _channel.SetReadCallback(std::bind(&Connection::HanderRead, this)); _channel.SetWriteCallback(std::bind(&Connection::HanderWrite, this)); _channel.SetCloseCallback(std::bind(&Connection::HanderClose, this)); _channel.SetErrorCallback(std::bind(&Connection::HanderError, this)); _channel.SetEventCallback(std::bind(&Connection::HandleEvent, this)); } int Getfd() { return _sockfd; } int Getid() { return _conn_id; } //是否处于CONNECTED状态 bool Connected() { return (_statu == CONNECTED); } //设置上下文--连接建立完成时进行调用 void SetContext(const Any &context) { _context = context; } //获取上下文,返回的是指针 Any* GetContext() { return &_context; } void SetConnectedCallback(const ConnectedCallback &cb) { _connected_callback = cb; } void SetMessageCallback(const MessageCallback &cb) { _message_callback = cb; } void SetClosedCallback(const ClosedCallback &cb) { _closed_callback = cb; } void SetAnyEventCallback(const AnyEventCallback &cb) { _event_callback = cb; } void SetSrvClosedCallback(const ClosedCallback &cb) { _server_closed_callback = cb; } void Established() { _loop->RunInLoop(std::bind(&Connection::EstablishedInLoop, this)); } void Send(const char* data, size_t len) { Buffer buf; buf.WriteAndPush(data, len); _loop->RunInLoop(std::bind(&Connection::SendInLoop, this, std::move(buf))); } void Shutdown() { _loop->RunInLoop(std::bind(&Connection::ShutdownInLoop, this)); } void Release() { _loop->QueueInLoop(std::bind(&Connection::ReleaseInLoop, this)); } //启动非活跃销毁,并定义多长时间无通信就是非活跃,添加定时任务 void EnableInactiveRelease(int sec) { _loop->RunInLoop(std::bind(&Connection::EnableInactiveReleaseInLoop, this, sec)); } //取消非活跃销毁 void CancelInactiveRelease() { _loop->RunInLoop(std::bind(&Connection::CancelInactiveReleaseInLoop, this)); } void Upgrade(const Any &context, const ConnectedCallback &conn, const MessageCallback &msg, const ClosedCallback &closed, const AnyEventCallback &event) { _loop->AssertInLoop(); _loop->RunInLoop(std::bind(&Connection::UpgradeInLoop, this, context, conn, msg, closed, event)); }};
4.10 LoopThreadPool模块
4.10.1 LoopThreadPool的功能
- LoopThread模块的功能就是将EventLoop模块与thread整合到一起。
- EventLoop模块实例化的对象,在构造的时候会初始化_thread_id。在后续的操作中,通过当前线程ID和EventLoop模块中的_thread_id进行一个比较,相同就表示在同一个线程,不同就表示当前运行的线程并不是EventLoop线程。因此,我们必须先创建线程,然后在线程的入口函数中,去实例化EventLoop对象。
- LoopThreadPool模块的功能主要是对所有的LoopThread进行管理及分配。
- 在服务器中,主Reactor负责新连接的获取,从属线程负责新连接的事件监控及处理,因此当前的线程池,游有可能从属线程数量为0。也就是实现单Reactor服务器,一个线程既负责获取连接,也负责连接的处理。该模块就是对0个或者多个LoopThread对象进行管理。
关于线程分配,当主线程获取了一个新连接,需要将新连接挂到从属线程上进行事件监控及处理。假设有0个从属线程,则直接分配给主线程的EventLoop进行处理。假设有多个从属线程,采用轮转的思想,进行线程的分配(将对应线程的EventLoop获取到,设置给对应的Connection)。
在实现LoopThreadPool类之前需要实现LoopThread类。
4.10.2 LoopThread的功能
EventLoop模块在实例化对象的时候,必须在线程内部,EventLoop在实例化对象时会设置自己的thread_id,如果我们先创建了多个EventLoop对象,然后创建了多个线程,将各个线程的id,重新给EventLoop进行设置存在问题:那就是在构造EventLoop对象,到设置新的thread_id期间将是不可控的。
因此我们必须先创建线程,然后在线程的入口函数中,去实例化EventLoop对象构造一个新的模块:LoopThread。该模块的功能是将EventLoop与thread整合到一起:
- 创建线程
- 在线程中实例化EventLoop对象
LoopThread功能:可以向外部返回所实例化的EventLoop。
4.10.3 LoopThread的实现
具体实现如下:
class LoopThread{private: std::mutex _mutex; std::condition_variable _cond; EventLoop* _loop; // EventLoop指针变量,这个对象需要在线程内实例化 std::thread _thread; // EventLoop对应的线程 /*实例化 EventLoop 对象,唤醒_cond上有可能阻塞的线程,并且开始运行EventLoop模块的功能*/ void ThreadEntry() { EventLoop loop; { std::unique_lock<std::mutex> lock(_mutex);//加锁 _loop = &loop; _cond.notify_all(); } loop.Start(); }public: LoopThread() :_loop(nullptr) ,_thread(std::thread(&LoopThread::ThreadEntry, this)) {} EventLoop* GetLoop() { EventLoop* loop; { std::unique_lock<std::mutex> lock(_mutex); _cond.wait(lock, [&](){ return _loop != NULL; }); loop = _loop; } return loop; }};
4.10.4 LoopThreadPool的实现
LoopThreadPool类接口函数实现:
class LoopThreadPool{private: int _thread_count; int _next_idx; EventLoop* _baseloop; std::vector<LoopThread*> _threads; std::vector<EventLoop*> _loops;public: LoopThreadPool(EventLoop* baseloop) :_baseloop(baseloop) ,_thread_count(0) ,_next_idx(0) {} void SetThreadCount(int count) { _thread_count = count; } void Create() { if(_thread_count > 0) { _threads.resize(_thread_count); _loops.resize(_thread_count); for(int i = 0; i < _thread_count; i++) { _threads[i] = new LoopThread(); _loops[i] = _threads[i]->GetLoop(); } } } EventLoop* NextLoop() { if(_thread_count == 0) { return _baseloop; } _next_idx = (_next_idx + 1) % _thread_count; return _loops[_next_idx]; }};
4.11 TcpServer模块
4.11.1 TcpServer模块的功能
TcpServer模块功能:对所有模块的整合,通过TcpServer模块实例化的对象,可以非常简单的完成一个服务器的搭建。
(1)TcpServer模块的管理:
- Acceptor对象,创建一个监听套接字。
- EventLoop对象, baseloop对象, 实现对监听套接字的事件监控。
- std::unordered_ map <uint64_ t, PtrConnection> conns, 实现对所有新建连接的管理。
- LoopThreadPool对象,创建loop线程池, 对新建连接进行事件监控及处理。
(2)TcpServer模块的主要功能:
- 设置从属线程池数量。
- 启动服务器。
- 设置各种回调函数(连接建立完成,消息,关闭,任意), 用户设置给TcpServer, TcpServer设置给获取的新连接。
- 是否启动非活跃连接超时销毁功能。
- 添加定时任务功能。
(3)TcpServer模块的工作流程:
- 在TcpServer中实例化一个Acceptor对象, 以及一个EventLoop对象(baseloop)。
- 将Acceptor挂到baseloop上进行事件监控。
- 一旦Acceptor对象就绪了可读事件,则执行读事件回调函数获取新建连接。
- 对新连接创建一个Connection进行管理。
- 对连接对应的Connection设置功能回调(连接完成回调,消息回调,关闭回调,任意事件回调)。
- 启动Connection的非活跃连接的超时销毁规则。
- 将新连接对应的Connection挂到LoopThreadPool中的从属线程对应的Eventloop中进行事件监控。
- 一旦Connection对应的连接就绪了可读事件,则这时候执行读事件回调函数,读取数据,读取完毕后调用TcpServer设置的消息回调。
4.11.2 TcpServer模块的实现
(1)TcpServer类的设计接口函数:
// TcpServer服务器管理模块(即全部模块的整合)class TcpServer{private: uint64_t _next_id; // 这是一个自动增长的连接ID int _port; int _timeout; // 这是非活跃连接的统计时间--多长时间无通信就是非活跃连接 bool _enable_inactive_release; // 是否启动非活跃连接超时销毁的判断标志 EventLoop _baseloop; // 这是主线程的EventLoop对象,负责监听事件的处理 Acceptor _acceptor; // 这是监听套接字的管理对象 LoopThreadPool _pool; // 这是从属EventLoop线程池 std::unordered_map<uint64_t, PtrConnection> _conns; // 保管所有连接对应的share_ptr对象,这里面的对象被删除,就意味这某一个连接被删除 using ConnectedCallback = std::function<void(const PtrConnection &)>; using MessageCallback = std::function<void(const PtrConnection &, Buffer *)>; using ClosedCallback = std::function<void(const PtrConnection &)>; using AnyEventCallback = std::function<void(const PtrConnection &)>; using Functor = std::function<void()>; ConnectedCallback _connected_callback; MessageCallback _message_callback; ClosedCallback _closed_callback; AnyEventCallback _event_callback;private: void NewConnection(int fd); // 为新连接构造一个Connection进行管理 void RemoveConnection(); // 从管理Connection的_conns移除连接信息public: TcpServer(); void SetThreadCount(int count); void SetConnectedCallback(const ConnectedCallback &cb) { _connected_callback = cb; } void SetMessageCallback(const MessageCallback &cb) { _message_callback = cb; } void SetClosedCallback(const ClosedCallback &cb) { _closed_callback = cb; } void SetAnyEventCallback(const AnyEventCallback &cb) { _event_callback = cb; } void EnableInactiveRelease(int timeout); void RunAfter(const Functor &task, int delay); // 用于添加一个定时任务 void Start();};
(2)TcpServer类的设计接口函数实现:
class TcpServer{private: uint64_t _next_id; //这是一个自动增长的连接ID, int _port; int _timeout; bool _enable_inactive_release; EventLoop _base_loop; Acceptor _acceptor; LoopThreadPool _pool; //这是从属EventLoop线程池 std::unordered_map<uint64_t, PtrConnection> _conns; //保存管理所有连接对应的shared_ptr对象 using ConnectedCallback = std::function<void(const PtrConnection&)>; using MessageCallback = std::function<void(const PtrConnection&, Buffer *)>; using ClosedCallback = std::function<void(const PtrConnection&)>; using AnyEventCallback = std::function<void(const PtrConnection&)>; using Functor = std::function<void()>; ConnectedCallback _connected_callback; MessageCallback _message_callback; ClosedCallback _closed_callback; AnyEventCallback _event_callback;private: void NewConnection(int fd) { _next_id++; PtrConnection conn(new Connection(_pool.NextLoop(), _next_id, fd)); conn->SetMessageCallback(_message_callback); conn->SetClosedCallback(_closed_callback); conn->SetConnectedCallback(_connected_callback); conn->SetAnyEventCallback(_event_callback); conn->SetSrvClosedCallback(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1)); if(_enable_inactive_release) { conn->EnableInactiveRelease(_timeout); //启动非活跃超时销毁 } conn->Established();//就绪初始化 _conns.insert(std::make_pair(_next_id, conn)); } void RunAfterInLoop(const Functor &task, int delay) { _next_id++; _base_loop.TimerAdd(_next_id, delay, task); } void RemoveConnectionInLoop(const PtrConnection& conn) { int id = conn->Getid(); auto iter = _conns.find(id); if(iter != _conns.end()) { _conns.erase(id); } } void RemoveConnection(const PtrConnection& conn) { _base_loop.RunInLoop(std::bind(&TcpServer::RemoveConnectionInLoop, this, conn)); }public: TcpServer(int port) :_port(port) ,_next_id(0) ,_enable_inactive_release(false) ,_acceptor(&_base_loop, port) ,_pool(&_base_loop) { _acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1)); _acceptor.Listen(); //将监听套接字挂到baseloop上 } void SetThreadCount(int count) { return _pool.SetThreadCount(count); } void SetConnectedCallback(const ConnectedCallback&cb) { _connected_callback = cb; } void SetMessageCallback(const MessageCallback&cb) { _message_callback = cb; } void SetClosedCallback(const ClosedCallback&cb) { _closed_callback = cb; } void SetAnyEventCallback(const AnyEventCallback&cb) { _event_callback = cb; } void EnableInactiveRelease(int timeout) { _timeout = timeout; _enable_inactive_release = true; } //用于添加一个定时任务 void RunAfter(const Functor &task, int delay) { _base_loop.RunInLoop(std::bind(&TcpServer::RunAfterInLoop, this, task, delay)); } void Start() { _pool.Create(); _base_loop.Start(); }};class NetWork{public: NetWork() { lg(Info, "SIGPIPE INIT"); signal(SIGPIPE, SIG_IGN); }};static NetWork nw;
5. Http协议模块
5.1 Util模块
(1)该模块为工具模块,主要提供Http协议模块所用到的一些工具函数,比如Url编码解码,文件读写。
(2)具体功能实现如下:
#pragma once#include "../server.hpp"#include <fstream>#include <ostream>#include <sys/stat.h>std::unordered_map<int, std::string> _statu_msg = { {100, "Continue"}, {101, "Switching Protocol"}, {102, "Processing"}, {103, "Early Hints"}, {200, "OK"}, {201, "Created"}, {202, "Accepted"}, {203, "Non-Authoritative Information"}, {204, "No Content"}, {205, "Reset Content"}, {206, "Partial Content"}, {207, "Multi-Status"}, {208, "Already Reported"}, {226, "IM Used"}, {300, "Multiple Choice"}, {301, "Moved Permanently"}, {302, "Found"}, {303, "See Other"}, {304, "Not Modified"}, {305, "Use Proxy"}, {306, "unused"}, {307, "Temporary Redirect"}, {308, "Permanent Redirect"}, {400, "Bad Request"}, {401, "Unauthorized"}, {402, "Payment Required"}, {403, "Forbidden"}, {404, "Not Found"}, {405, "Method Not Allowed"}, {406, "Not Acceptable"}, {407, "Proxy Authentication Required"}, {408, "Request Timeout"}, {409, "Conflict"}, {410, "Gone"}, {411, "Length Required"}, {412, "Precondition Failed"}, {413, "Payload Too Large"}, {414, "URI Too Long"}, {415, "Unsupported Media Type"}, {416, "Range Not Satisfiable"}, {417, "Expectation Failed"}, {418, "I'm a teapot"}, {421, "Misdirected Request"}, {422, "Unprocessable Entity"}, {423, "Locked"}, {424, "Failed Dependency"}, {425, "Too Early"}, {426, "Upgrade Required"}, {428, "Precondition Required"}, {429, "Too Many Requests"}, {431, "Request Header Fields Too Large"}, {451, "Unavailable For Legal Reasons"}, {501, "Not Implemented"}, {502, "Bad Gateway"}, {503, "Service Unavailable"}, {504, "Gateway Timeout"}, {505, "HTTP Version Not Supported"}, {506, "Variant Also Negotiates"}, {507, "Insufficient Storage"}, {508, "Loop Detected"}, {510, "Not Extended"}, {511, "Network Authentication Required"}};std::unordered_map<std::string, std::string> _mime_msg = { {".aac", "audio/aac"}, {".abw", "application/x-abiword"}, {".arc", "application/x-freearc"}, {".avi", "video/x-msvideo"}, {".azw", "application/vnd.amazon.ebook"}, {".bin", "application/octet-stream"}, {".bmp", "image/bmp"}, {".bz", "application/x-bzip"}, {".bz2", "application/x-bzip2"}, {".csh", "application/x-csh"}, {".css", "text/css"}, {".csv", "text/csv"}, {".doc", "application/msword"}, {".docx", "application/vnd.openxmlformats-officedocument.wordprocessingml.document"}, {".eot", "application/vnd.ms-fontobject"}, {".epub", "application/epub+zip"}, {".gif", "image/gif"}, {".htm", "text/html"}, {".html", "text/html"}, {".ico", "image/vnd.microsoft.icon"}, {".ics", "text/calendar"}, {".jar", "application/java-archive"}, {".jpeg", "image/jpeg"}, {".jpg", "image/jpeg"}, {".js", "text/javascript"}, {".json", "application/json"}, {".jsonld", "application/ld+json"}, {".mid", "audio/midi"}, {".midi", "audio/x-midi"}, {".mjs", "text/javascript"}, {".mp3", "audio/mpeg"}, {".mpeg", "video/mpeg"}, {".mpkg", "application/vnd.apple.installer+xml"}, {".odp", "application/vnd.oasis.opendocument.presentation"}, {".ods", "application/vnd.oasis.opendocument.spreadsheet"}, {".odt", "application/vnd.oasis.opendocument.text"}, {".oga", "audio/ogg"}, {".ogv", "video/ogg"}, {".ogx", "application/ogg"}, {".otf", "font/otf"}, {".png", "image/png"}, {".pdf", "application/pdf"}, {".ppt", "application/vnd.ms-powerpoint"}, {".pptx", "application/vnd.openxmlformats-officedocument.presentationml.presentation"}, {".rar", "application/x-rar-compressed"}, {".rtf", "application/rtf"}, {".sh", "application/x-sh"}, {".svg", "image/svg+xml"}, {".swf", "application/x-shockwave-flash"}, {".tar", "application/x-tar"}, {".tif", "image/tiff"}, {".tiff", "image/tiff"}, {".ttf", "font/ttf"}, {".txt", "text/plain"}, {".vsd", "application/vnd.visio"}, {".wav", "audio/wav"}, {".weba", "audio/webm"}, {".webm", "video/webm"}, {".webp", "image/webp"}, {".woff", "font/woff"}, {".woff2", "font/woff2"}, {".xhtml", "application/xhtml+xml"}, {".xls", "application/vnd.ms-excel"}, {".xlsx", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"}, {".xml", "application/xml"}, {".xul", "application/vnd.mozilla.xul+xml"}, {".zip", "application/zip"}, {".3gp", "video/3gpp"}, {".3g2", "video/3gpp2"}, {".7z", "application/x-7z-compressed"}};class Util{public: // 字符串分割函数,将src字符串按照sep字符进行分割,得到的各个字串放到arry中,最终返回字串的数量 static size_t Split(const std::string &src, const std::string &sep, std::vector<std::string> *arry) { size_t offset = 0; // 有10个字符,offset是查找的起始位置,范围应该是0~9,offset==10就代表已经越界了 while (offset < src.size()) { // 在src字符串偏移量offset处,开始向后查找sep字符/字串,返回查找到的位置 size_t pos = src.find(sep, offset); if (pos == std::string::npos) // 没有找到特定的字符 { // 将剩余的部分当作一个字串,放入arry中 if (pos == src.size()) { break; } arry->push_back(src.substr(offset)); return arry->size(); } if (pos == offset) { offset = pos + sep.size(); continue; // 当前字串是一个空的,没有内容 } arry->push_back(src.substr(offset, pos - offset)); offset = pos + sep.size(); } return arry->size(); } // 读取文件的所有内容,将读取的内容放到一个Buf中 static bool ReadFile(const std::string &filename, std::string *buf) { std::ifstream out(filename, std::ios::binary); if (out.is_open() == false) { LOG(FATAL, "Open ReadFile error"); return false; } size_t out_size = 0; out.seekg(0, out.end); // 跳转读写位置到末尾 out_size = out.tellg(); // 获取当前读写位置相对于起始位置的偏移量,从末尾偏移刚好就是文件大小 out.seekg(0, out.beg); // 跳转到起始位置 buf->resize(out_size); // 开辟文件大小的空间 out.read(&(*buf)[0], out_size); if (out.good() == false) { printf("Read %s File error!!", filename.c_str()); out.close(); return false; } out.close(); return true; } // 向文件写入数据 static bool WriteFile(const std::string& filename, std::string& buf) { std::ofstream in(filename, std::ios::binary); if (in.is_open() == false) { LOG(FATAL, "Open WriteFile error"); return false; } in.write(buf.c_str(), buf.size()); if (in.good() == false) { LOG(FATAL, "Write File error"); in.close(); return false; } in.close(); return true; } // URL编码,避免URL中资源路径与查询字符串中的特殊字符与HTTP请求中特殊字符产生歧义 // 编码格式:将特殊字符的ascii值,转换为两个16进制字符,前缀% C++ -> C%2B%2B // 不编码的特殊字符: RFC3986文档规定 . - _ ~ 字母,数字属于绝对不编码字符 // RFC3986文档规定,编码格式 %HH // W3C标准中规定,查询字符串中的空格,需要编码为+, 解码则是+转空格 static std::string UrlEncode(const std::string &url, bool convert_space_to_plus) { std::string res; for (auto &ch : url) { if (ch == '.' || ch == '-' || ch == '_' || ch == '~' || isalnum(ch)) { res += ch; continue; } if (ch == ' ' && convert_space_to_plus == true) { res += '+'; continue; } // 剩下的字符都是需要编码成为 %HH 格式 char tmp[4] = {0}; // snprintf 与 printf比较类似,都是格式化字符串,只不过一个是打印,一个是放到一块空间中 snprintf(tmp, 4, "%%%02X", ch); res += tmp; } return res; } static char HEXTOI(char c) { if (c >= '0' && c <= '9') { return c - '0'; } else if (c >= 'a' && c <= 'z') { return c - 'a' + 10; } else if (c >= 'A' && c <= 'Z') { return c - 'A' + 10; } return -1; } // Url解码 static std::string UrlDecode(const std::string &url, bool convert_plus_to_space) { std::string res; for (int i = 0; i < url.size(); i++) { if (url[i] == '+' && convert_plus_to_space == true) { res += ' '; continue; } if (url[i] == '%' && (i + 2) < url.size()) { char v1 = HEXTOI(url[i + 1]); char v2 = HEXTOI(url[i + 2]); char v = v1 * 16 + v2; res += v; i += 2; continue; } res += url[i]; } return res; } // 响应状态码的描述信息获取 static std::string StatuDesc(int statu) { auto it = _statu_msg.find(statu); if (it != _statu_msg.end()) { return it->second; } return "Unknow"; } // 根据文件后缀名获取文件mime static std::string ExtMime(const std::string& filename) { // a.b.txt 先获取文件扩展名 size_t pos = filename.find_last_of('.'); if(pos == std::string::npos) { return "application/octet-stream"; } // 根据扩展名,获取mime std::string ext = filename.substr(pos); auto it = _mime_msg.find(ext); if(it == _mime_msg.end()) { return "application/octet-stream"; } return it->second; } // 判断一个文件是否是一个目录 static bool IsDirectory(const std::string &filename) { struct stat st; int ret = stat(filename.c_str(), &st); if(ret < 0) { return false; } return S_ISDIR(st.st_mode); } // 判断一个文件是否是一个普通文件 static bool IsRegular(const std::string &filename) { struct stat st; int ret = stat(filename.c_str(), &st); if(ret < 0) { return false; } return S_ISREG(st.st_mode); } //http请求的资源路径有效性判断 // /index.html --- 前边的/叫做相对根目录 映射的是某个服务器上的子目录 // 想表达的意思就是,客户端只能请求相对根目录中的资源,其他地方的资源都不予理会 // /../login, 这个路径中的..会让路径的查找跑到相对根目录之外,这是不合理的,不安全的 static bool ValidPath(const std::string &path) { //思想:按照/进行路径分割,根据有多少子目录,计算目录深度,有多少层,深度不能小于0 std::vector<std::string> subdir; Split(path, "/", &subdir); int level = 0; for(auto& dir : subdir) { if(dir == "..") { level--; // 任意一层走出相对根目录,就认为有问题 if(level < 0) { return false; } continue; } level++; } return true; }};
5.2 HttpRequest模块
(1)这个模块是Http请求数据模块,用于保存Http请求数据被解析后的各项请求元素信息。具体功能如下图:
(2)具体实现如下:
#pragma once#include <iostream>#include <regex>#include <unordered_map>class HttpRequest{public: std::string _method; //请求方法 std::string _path; //资源路径 std::string _version; //协议版本 std::string _body; //请求正文 std::smatch _matches; //资源路径的正则提取数据 std::unordered_map<std::string, std::string> _headers; //头部字段 std::unordered_map<std::string, std::string> _params; //查询字符串public: HttpRequest() :_version("HTTP/1.1") {} void ReSet() { _method.clear(); _path.clear(); _version = "HTTP/1.1"; _body.clear(); std::smatch match; _matches.swap(match); _headers.clear(); _params.clear(); } //插入头部字段 void SetHeader(const std::string& key, const std::string& val) { _headers.insert(std::make_pair(key, val)); } bool HasHeader(const std::string& key) const { auto iter = _headers.find(key); if(iter == _headers.end()) { return false; } return true; } std::string GetHeader(const std::string& key) const { auto iter = _headers.find(key); if(iter == _headers.end()) { return ""; } return iter->second; } //插入查询字符串 void SetParam(const std::string& key, const std::string& val) { _params.insert(std::make_pair(key, val)); } bool HasParam(const std::string& key) const { auto it = _params.find(key); if (it == _params.end()) { return false; } return true; } // 获取指定的查询字符串 std::string GetParam(const std::string& key) const { auto it = _params.find(key); if (it == _params.end()) { return ""; } return it->second; } //获取正文长度 size_t ContentLength() const { bool ret = HasHeader("Content-Length"); if(ret == false) { return 0; } std::string clen = GetHeader("Content-Length"); return std::stol(clen); } // 判断是否是短链接 bool Close() const { // 没有Connection字段,或者有Connection但是值是close,则都是短链接,否则就是长连接 if (HasHeader("Connection") == true && GetHeader("Connection") == "keep-alive") { return false; } return true; }};
5.3 HttpResponse模块
(1)这个模块是Http响应数据块,用于业务处理后设置并保存Http响应数据的各项元素信息,最终会被按照Http协议响应格式组织成为响应信息发送给客户端。具体功能如下图:
(2)具体实现如下:
#pragma once#include <iostream>#include <unordered_map>class HttpResponse{public: int _statu; bool _redirect_flag; std::string _body; std::string _redirect_url; std::unordered_map<std::string, std::string> _headers;public: HttpResponse() :_redirect_flag(false) ,_statu(200) {} HttpResponse(int statu) :_redirect_flag(false) ,_statu(statu) {} void Reset() { _statu = 200; _redirect_flag = false; _body.clear(); _redirect_url.clear(); _headers.clear(); } // 插入头部字段 void SetHeader(const std::string& key, const std::string& val) { _headers.insert(std::make_pair(key, val)); } // 判断是否存在指定头部字段 bool HasHeader(const std::string& key) { auto it = _headers.find(key); if (it == _headers.end()) { return false; } return true; } // 获取指定头部字段的值 std::string GetHeader(const std::string& key) { auto it = _headers.find(key); if (it == _headers.end()) { return ""; } return it->second; } void SetContent(const std::string& body, const std::string& type = "text.html") { _body = body; SetHeader("Content-Type", type); } void SetRedirect(const std::string& url, int statu = 302) { _statu = statu; _redirect_flag = true; _redirect_url = url; } // 判断是否是短链接 bool Close() { // 没有Connection字段,或者有Connection但是值是close,则都是短链接,否则就是长连接 if (HasHeader("Connection") == true && GetHeader("Connection") == "keep-alive") { return false; } return true; }};
5.4 HttpContext模块
(1)该模块是一个Http请求接收的上下文模块,主要是为了防止在一次接收的数据中,不是一个完整的Http请求,需要在下次接收到新数据后继续根据上下文进行解析,最终得到一个HttpRequest请求信息对象,因此在请求数据的接收以及解析部分需要一个上下文来进行控制接收和处理节奏。
(2)正则库的简单概述:
注意在正则表达式的相关函数要在gcc版本较高的版本下才能正常运行,否则会出现编译成功,但是运行失败的情况。
(3)具体实现如下:
#pragma once#include <iostream>#include "HttpRequest.hpp"#include "Util.hpp"#include "../Buffer.hpp"typedef enum { RECV_HTTP_ERROR, RECV_HTTP_LINE, RECV_HTTP_HEAD, RECV_HTTP_BODY, RECV_HTTP_OVER}HttpRecvStatu;#define MAX_LINE 8192class HttpContext{private: int _resp_statu; //响应状态码 HttpRecvStatu _recv_statu; //当前接收及解析的阶段状态 HttpRequest _request; //已经解析得到的请求信息private: bool ParseHttpLine(const std::string& line) { std::smatch matches; std::regex e("(GET|HEAD|POST|PUT|DELETE) ([^?]*)(?://?(.*))? (HTTP/1//.[01])(?:/n|/r/n)?", std::regex::icase); bool ret = std::regex_match(line, matches, e); if(ret == false) { _recv_statu = RECV_HTTP_ERROR; _resp_statu = 400; // BAD REQUEST return false; } //0 : GET /baidu/login?user=xiaoming&pass=123123 HTTP/1.1 //1 : GET //2 : /bitejiuyeke/login //3 : user=xiaoming&pass=123123 //4 : HTTP/1.1 //请求方法的获取 _request._method = matches[1]; std::transform(_request._method.begin(), _request._method.end(), _request._method.begin(), ::toupper); //资源路径的获取,需要进行URL解码操作,但是不需要+转空格 _request._path = Util::UrlDecode(matches[2], false); //协议版本的获取 _request._version = matches[4]; //查询字符串的获取与处理 std::vector<std::string> query_string_arry; std::string query_string = matches[3]; Util::Split(query_string, "&", &query_string_arry); //针对各个字串,以 = 符号进行分割,得到key 和val, 得到之后也需要进行URL解码 for(auto& ch : query_string_arry) { size_t pos = ch.find("="); if(pos == std::string::npos) { _recv_statu = RECV_HTTP_ERROR; _resp_statu = 400; //BAD REQUEST return false; } std::string key = Util::UrlDecode(ch.substr(0, pos), true); std::string val = Util::UrlDecode(ch.substr(pos + 1), true); _request.SetParam(key, val); } return true; } bool RecvHttpLine(Buffer* buf) { if (_recv_statu != RECV_HTTP_LINE) { return false; } //1. 获取一行数据,带有末尾的换行 std::string line = buf->GetLineAndPop(); //2. 需要考虑的一些要素:缓冲区中的数据不足一行, 获取的一行数据超大 if(line.size() == 0) { // 缓冲区中的数据不足一行,则需要判断缓冲区的可读数据长度,如果很长了都不足一行,这是有问题的 if(buf->ReadAbleSize() > MAX_LINE) { _recv_statu = RECV_HTTP_ERROR; _resp_statu = 414; // URI TOO LONG return false; } // 缓冲区中数据不足一行,但是也不多,就等等新数据的到来 return true; } if(line.size() > MAX_LINE) { _recv_statu = RECV_HTTP_ERROR; _resp_statu = 414;//URI TOO LONG return false; } bool ret = ParseHttpLine(line); if(ret == false) { return false; } //首行处理完毕,进入头部获取阶段 _recv_statu = RECV_HTTP_HEAD; return true; } bool RecvHttpHead(Buffer* buf) { if(_recv_statu != RECV_HTTP_HEAD) { return false; } while(1) { std::string line = buf->GetLineAndPop(); if(line.size() == 0) { // 缓冲区中的数据不足一行,则需要判断缓冲区的可读数据长度,如果很长了都不足一行,这是有问题的 if(buf->ReadAbleSize() > MAX_LINE) { _recv_statu = RECV_HTTP_ERROR; _resp_statu = 414; // URI TOO LONG return false; } // 缓冲区中数据不足一行,但是也不多,就等等新数据的到来 return true; } if(line.size() > MAX_LINE) { _recv_statu = RECV_HTTP_ERROR; _resp_statu = 414;//URI TOO LONG return false; } if(line == "/n" || line == "/r/n") { break; } bool ret = ParseHttpHead(line); if(ret == false) { return false; } } //头部处理完毕,进入正文获取阶段 _recv_statu = RECV_HTTP_BODY; return true; } bool ParseHttpHead(std::string& line) { if(line.back() == '/n') line.pop_back(); if(line.back() == '/r') line.pop_back(); auto pos = line.find(": "); if(pos == std::string::npos) { _recv_statu = RECV_HTTP_ERROR; _resp_statu = 400; // return false; } std::string key = line.substr(0, pos); std::string val = line.substr(pos + 2); _request.SetHeader(key, val); return true; } bool RecvHttpBody(Buffer* buf) { if(_recv_statu != RECV_HTTP_BODY) { return false; } //1. 获取正文长度 size_t content_length = _request.ContentLength(); if(content_length == 0) { //没有正文,则请求接收解析完毕 _recv_statu = RECV_HTTP_OVER; return true; } //2. 当前已经接收了多少正文,其实就是往 _request._body 中放了多少数据了 size_t real_len = content_length - _request._body.size();//实际还需要接收的正文长度 //3. 接收正文放到body中,但是也要考虑当前缓冲区中的数据,是否是全部的正文 // 3.1 缓冲区中数据,包含了当前请求的所有正文,则取出所需的数据 if(buf->ReadAbleSize() >= real_len) { _request._body.append(buf->ReadPosition(), real_len); buf->MoveRead(real_len); _recv_statu = RECV_HTTP_OVER; return true; } // 3.2 缓冲区中数据,无法满足当前正文的需要,数据不足,取出数据,然后等待新数据到来 _request._body.append(buf->ReadPosition(), buf->ReadAbleSize()); buf->MoveRead(buf->ReadAbleSize()); return true; }public: HttpContext() :_resp_statu(200) ,_recv_statu(RECV_HTTP_LINE) {} void ReSet() { _resp_statu = 200; _recv_statu = RECV_HTTP_LINE; _request.ReSet(); } int RespStatu() { return _resp_statu; } HttpRecvStatu RecvStatu() { return _recv_statu; } HttpRequest& Request() { return _request; } //接收并解析HTTP请求 void RecvHttpRequest(Buffer* buf) { //不同的状态,做不同的事情,但是这里不要break //因为处理完请求行后,应该立即处理头部,而不是退出等新数据 switch (_recv_statu) { case RECV_HTTP_LINE: RecvHttpLine(buf); case RECV_HTTP_HEAD: RecvHttpHead(buf); case RECV_HTTP_BODY: RecvHttpBody(buf); } }};
5.5 HttpServer模块
(1)主要目标:最终给组件使用者提供的Http服务器模块,用于以简单的接口实现Http服务器的搭建。
- HttpServer模块内部包含有⼀个TcpServer对象:TcpServer对象实现服务器的搭建 。
- HttpServer模块内部包含有两个提供给TcpServer对象的接口:连接建⽴成功设置上下⽂接口,数据处理接口。
- HttpServer模块内部包含有⼀个hash-map表存储请求与处理函数的映射表:组件使⽤者向HttpServer设置哪些请求应该使⽤哪些函数进⾏处理,等TcpServer收到对应的请求就会使⽤对应的函数进⾏处理。
(2)具体实现如下:
#pragma once#include "HttpRequest.hpp"#include "HttpResponse.hpp"#include "../server.hpp"#include "HttpContext.hpp"class HttpServer{private: using Handler = std::function<void(const HttpRequest &, HttpResponse *)>; using Handlers = std::vector<std::pair<std::regex, Handler>>; Handlers _get_route; Handlers _post_route; Handlers _put_route; Handlers _delete_route; std::string _basedir; // 静态资源根目录 TcpServer _server;private: void ErrorHandler(const HttpRequest &req, HttpResponse *rsp) { // 1. 组织一个错误展示页面 std::string body; body += "<html>"; body += "<head>"; body += "<meta http-equiv='Content-Type' content='text/html;charset=utf-8'>"; body += "</head>"; body += "<body>"; body += "<h1>"; body += std::to_string(rsp->_statu); body += " "; body += Util::StatuDesc(rsp->_statu); body += "</h1>"; body += "</body>"; body += "</html>"; // 2. 将页面数据,当作响应正文,放入rsp中 rsp->SetContent(body, "text/html"); } // 将HttpResponse中的要素按照http协议格式进行组织,发送 void WriteReponse(const PtrConnection &conn, const HttpRequest &req, HttpResponse &rsp) { // 1. 先完善头部字段 if (req.Close() == true) { rsp.SetHeader("Connection", "close"); } else { rsp.SetHeader("Connection", "keep-alive"); } if (rsp._body.empty() == false && rsp.HasHeader("Content-Length") == false) { rsp.SetHeader("Content-Length", std::to_string(rsp._body.size())); } if (rsp._body.empty() == false && rsp.HasHeader("Content-Type") == false) { rsp.SetHeader("Content-Type", "application/octet-stream"); } if (rsp._redirect_flag == true) { rsp.SetHeader("Location", rsp._redirect_url); } // 2. 将rsp中的要素,按照http协议格式进行组织 std::stringstream rsp_str; rsp_str << req._version << " " << std::to_string(rsp._statu) << " " << Util::StatuDesc(rsp._statu) << "/r/n"; for (auto &head : rsp._headers) { rsp_str << head.first << ": " << head.second << "/r/n"; } rsp_str << "/r/n"; rsp_str << rsp._body; // 3. 发送数据 conn->Send(rsp_str.str().c_str(), rsp_str.str().size()); } bool IsFileHandler(const HttpRequest &req) { // 1. 必须设置了静态资源根目录 if (_basedir.empty()) { return false; } // 2. 请求方法,必须是GET / HEAD请求方法 if (req._method != "GET" && req._method != "HEAD") { return false; } // 3. 请求的资源路径必须是一个合法路径 if (Util::ValidPath(req._path) == false) { return false; } std::string req_path = _basedir + req._path; // 为了避免直接修改请求的资源路径,因此定义一个临时对象 if (req._path.back() == '/') { req_path += "index.html"; } if (Util::IsRegular(req_path) == false) { return false; } return true; } //静态资源的请求处理 --- 将静态资源文件的数据读取出来,放到rsp的_body中, 并设置mime void FileHandler(const HttpRequest &req, HttpResponse *rsp) { std::string req_path = _basedir + req._path; if (req._path.back() == '/') { req_path += "index.html"; } bool ret = Util::ReadFile(req_path, &rsp->_body); if (ret == false) { return; } std::string mime = Util::ExtMime(req_path); rsp->SetHeader("Content-Type", mime); } //功能性请求的分类处理 void Dispatcher(HttpRequest &req, HttpResponse *rsp, Handlers &handlers) { //在对应请求方法的路由表中,查找是否含有对应资源请求的处理函数,有则调用,没有则发挥404 //思想:路由表存储的时键值对 -- 正则表达式 & 处理函数 //使用正则表达式,对请求的资源路径进行正则匹配,匹配成功就使用对应函数进行处理 // /numbers/(/d+) /numbers/12345 for(auto& handler : handlers) { const std::regex &re = handler.first; const Handler &functor = handler.second; bool ret = std::regex_match(req._path, req._matches, re); if (ret == false) { continue; } return functor(req, rsp); // 传入请求信息,和空的rsp,执行处理函数 } rsp->_statu = 404; } void Route(HttpRequest &req, HttpResponse *rsp) { //1. 对请求进行分辨,是一个静态资源请求,还是一个功能性请求 // 静态资源请求,则进行静态资源的处理 // 功能性请求,则需要通过几个请求路由表来确定是否有处理函数 // 既不是静态资源请求,也没有设置对应的功能性请求处理函数,就返回405 if (IsFileHandler(req) == true) { //是一个静态资源请求, 则进行静态资源请求的处理 return FileHandler(req, rsp); } if(req._method == "GET" || req._method == "HEAD") { return Dispatcher(req, rsp, _get_route); } else if(req._method == "POST") { return Dispatcher(req, rsp, _post_route); } else if(req._method == "PUT") { return Dispatcher(req, rsp, _put_route); } else if(req._method == "DELETE") { return Dispatcher(req, rsp, _delete_route); } rsp->_statu = 405;// Method Not Allowed } // 设置上下文 void OnConnected(const PtrConnection &conn) { conn->SetContext(HttpContext()); lg(Info, "NEW CONNECTION %p", conn.get()); } //缓冲区数据解析+处理 void OnMessage(const PtrConnection &conn, Buffer *buffer) { while(buffer->ReadAbleSize() > 0) { // 1. 获取上下文 HttpContext *context = conn->GetContext()->get<HttpContext>(); // 2. 通过上下文对缓冲区数据进行解析,得到HttpRequest对象 // 1. 如果缓冲区的数据解析出错,就直接回复出错响应 // 2. 如果解析正常,且请求已经获取完毕,才开始去进行处理 context->RecvHttpRequest(buffer); HttpRequest &req = context->Request(); HttpResponse rsp(context->RespStatu()); if(context->RespStatu() >= 400) { //进行错误响应,关闭连接 ErrorHandler(req, &rsp);//填充一个错误显示页面数据到rsp中 WriteReponse(conn, req, rsp);//组织响应发送给客户端 context->ReSet(); buffer->MoveRead(buffer->ReadAbleSize());//出错了就把缓冲区数据清空 conn->Shutdown();//关闭连接 return; } if (context->RecvStatu() != RECV_HTTP_OVER) { // 当前请求还没有接收完整,则退出,等新数据到来再重新继续处理 return; } // 3. 请求路由 + 业务处理 Route(req, &rsp); // 4. 对HttpResponse进行组织发送 WriteReponse(conn, req, rsp); // 5. 重置上下文 context->ReSet(); // 6. 根据长短连接判断是否关闭连接或者继续处理 if (rsp.Close() == true) { conn->Shutdown(); // 短链接则直接关闭v } } }public: HttpServer(int port, int timeout = DEFALT_TIMEOUT) :_server(port) { _server.EnableInactiveRelease(timeout); _server.SetConnectedCallback(std::bind(&HttpServer::OnConnected, this, std::placeholders::_1)); _server.SetMessageCallback(std::bind(&HttpServer::OnMessage, this, std::placeholders::_1, std::placeholders::_2)); } void SetBaseDir(const std::string &path) { assert(Util::IsDirectory(path) == true); _basedir = path; } //设置/添加,请求(请求的正则表达)与处理函数的映射关系 void Get(const std::string &pattern, const Handler &handler) { _get_route.push_back(std::make_pair(std::regex(pattern), handler)); } void Post(const std::string &pattern, const Handler &handler) { _post_route.push_back(std::make_pair(std::regex(pattern), handler)); } void Put(const std::string &pattern, const Handler &handler) { _put_route.push_back(std::make_pair(std::regex(pattern), handler)); } void Delete(const std::string &pattern, const Handler &handler) { _delete_route.push_back(std::make_pair(std::regex(pattern), handler)); } void SetThreadCount(int count) { _server.SetThreadCount(count); } void Listen() { _server.Start(); }};
6. 对服务器整体测试
(1)服务器运行代码:
#include "HttpServer.hpp"#include "HttpResponse.hpp"#include "HttpRequest.hpp"#define WWWROOT "./wwwroot/"std::string RequestStr(const HttpRequest &req) { std::stringstream ss; ss << req._method << " " << req._path << " " << req._version << "/r/n"; for (auto &it : req._params) { ss << it.first << ": " << it.second << "/r/n"; } for (auto &it : req._headers) { ss << it.first << ": " << it.second << "/r/n"; } ss << "/r/n"; ss << req._body; return ss.str();}void Hello(const HttpRequest &req, HttpResponse *rsp) { rsp->SetContent(RequestStr(req), "text/plain");}void Login(const HttpRequest &req, HttpResponse *rsp) { rsp->SetContent(RequestStr(req), "text/plain");}void PutFile(const HttpRequest &req, HttpResponse *rsp) { std::string pathname = WWWROOT + req._path; Util::WriteFile(pathname, req._body);}void DelFile(const HttpRequest &req, HttpResponse *rsp) { rsp->SetContent(RequestStr(req), "text/plain");}int main(){ HttpServer server(8080); server.SetThreadCount(3); server.SetBaseDir(WWWROOT);//设置静态资源根目录,告诉服务器有静态资源请求到来,需要到哪里去找资源文件 server.Get("/hello", Hello); server.Post("/login", Login); server.Put("/1234.txt", PutFile); server.Delete("/1234.txt", DelFile); server.Listen(); return 0;}
6.1 长连接测试
(1)客户端代码:
/*长连接测试1: 创建一个客户端持续给服务器发送数据,直到超过时间看是否正常*/#include "../../server.hpp" int main(){ Sock cli_sock; cli_sock.CreateClient(8080, "127.0.0.1"); std::string req = "GET /hello HTTP/1.1/r/nConnection: keep-alive/r/nContent-Length: 0/r/n/r/n"; while (1) { assert(cli_sock.Send(req.c_str(), req.size()) != -1); char buf[1024] = {0}; assert(cli_sock.Recv(buf, 1023)); lg(Info, "[%s]", buf); sleep(3); } cli_sock.Close(); return 0;}
(2)运行结果:
6.2 超时连接测试
(1)客户端代码:
/*超时连接测试1:创建一个客户端,给服务器发送一次数据后,不动了,查看服务器是否会正常的超时关闭连接*/#include "../../server.hpp"int main(){ Sock cli_sock; cli_sock.CreateClient(8080, "127.0.0.1"); std::string req = "GET /hello HTTP/1.1/r/nConnection: keep-alive/r/nContent-Length: 0/r/n/r/n"; while(1) { assert(cli_sock.Send(req.c_str(), req.size()) != -1); char buf[1024] = {0}; assert(cli_sock.Recv(buf, 1023)); lg(Info, "[%s]", buf); sleep(15); } cli_sock.Close(); return 0;}
(2)运行结果:
6.3 错误请求测试
(1)客户端代码:
/*给服务器发送一个数据,告诉服务器要发送1024字节的数据,但是实际发送的数据不足1024,查看服务器处理结果*//* 1. 如果数据只发送一次,服务器将得不到完整请求,就不会进行业务处理,客户端也就得不到响应,最终超时关闭连接 2. 连着给服务器发送了多次 小的请求, 服务器会将后边的请求当作前边请求的正文进行处理,而后便处理的时候有可能就会因为处理错误而关闭连接*/#include "../../server.hpp"int main(){ Sock cli_sock; cli_sock.CreateClient(8080, "127.0.0.1"); std::string req = "GET /hello HTTP/1.1/r/nConnection: keep-alive/r/nContent-Length: 100/r/n/r/nbitejiuyeke"; while(1) { assert(cli_sock.Send(req.c_str(), req.size()) != -1); assert(cli_sock.Send(req.c_str(), req.size()) != -1); assert(cli_sock.Send(req.c_str(), req.size()) != -1); char buf[1024] = {0}; assert(cli_sock.Recv(buf, 1023)); lg(Info, "[%s]", buf); sleep(3); } cli_sock.Close(); return 0;}
(2)运行结果:
6.4 业务处理超时测试
(1)客户端代码:
/* 业务处理超时,查看服务器的处理情况 当服务器达到了一个性能瓶颈,在一次业务处理中花费了太长的时间(超过了服务器设置的非活跃超时时间) 1. 在一次业务处理中耗费太长时间,导致其他的连接也被连累超时,其他的连接有可能会被拖累超时释放 假设现在 12345描述符就绪了, 在处理1的时候花费了30s处理完,超时了,导致2345描述符因为长时间没有刷新活跃度 1. 如果接下来的2345描述符都是通信连接描述符,如果都就绪了,则并不影响,因为接下来就会进行处理并刷新活跃度 2. 如果接下来的2号描述符是定时器事件描述符,定时器触发超时,执行定时任务,就会将345描述符给释放掉 这时候一旦345描述符对应的连接被释放,接下来在处理345事件的时候就会导致程序崩溃(内存访问错误) 因此这时候,在本次事件处理中,并不能直接对连接进行释放,而应该将释放操作压入到任务池中, 等到事件处理完了执行任务池中的任务的时候,再去释放*/#include "../../server.hpp"int main(){ signal(SIGCHLD, SIG_IGN); for (int i = 0; i < 10; i++) { pid_t pid = fork(); if (pid < 0) { lg(Fatal, "FORK ERROR"); return -1; }else if (pid == 0) { Sock cli_sock; cli_sock.CreateClient(8080, "127.0.0.1"); std::string req = "GET /hello HTTP/1.1/r/nConnection: keep-alive/r/nContent-Length: 0/r/n/r/n"; while(1) { assert(cli_sock.Send(req.c_str(), req.size()) != -1); char buf[1024] = {0}; assert(cli_sock.Recv(buf, 1023)); lg(Info, "[%s]", buf); } cli_sock.Close(); exit(0); } } while(1) sleep(1); return 0;}
(2)服务端修改代码:
(3)运行结果:
6.5 同时多条请求测试
(1)客户端代码:
/*一次性给服务器发送多条数据,然后查看服务器的处理结果*//*每一条请求都应该得到正常处理*/#include "../../server.hpp"int main(){ Sock cli_sock; cli_sock.CreateClient(8080, "127.0.0.1"); std::string req = "GET /hello HTTP/1.1/r/nConnection: keep-alive/r/nContent-Length: 0/r/n/r/n"; req += "GET /hello HTTP/1.1/r/nConnection: keep-alive/r/nContent-Length: 0/r/n/r/n"; req += "GET /hello HTTP/1.1/r/nConnection: keep-alive/r/nContent-Length: 0/r/n/r/n"; while(1) { assert(cli_sock.Send(req.c_str(), req.size()) != -1); char buf[1024] = {0}; assert(cli_sock.Recv(buf, 1023)); lg(Info, "[%s]", buf); sleep(3); } cli_sock.Close(); return 0;}
(2)运行结果:
6.6 大文件传输测试
(1)客户端代码:
/*大文件传输测试,给服务器上传一个大文件,服务器将文件保存下来,观察处理结果*//* 上传的文件,和服务器保存的文件一致*/#include "../HttpServer.hpp"int main(){ Sock cli_sock; cli_sock.CreateClient(8080, "127.0.0.1"); std::string req = "PUT /1234.txt HTTP/1.1/r/nConnection: keep-alive/r/n"; std::string body; Util::ReadFile("./hello.txt", &body); req += "Content-Length: " + std::to_string(body.size()) + "/r/n/r/n"; assert(cli_sock.Send(req.c_str(), req.size()) != -1); assert(cli_sock.Send(body.c_str(), body.size()) != -1); char buf[1024] = {0}; assert(cli_sock.Recv(buf, 1023)); lg(Info, "[%s]", buf); sleep(3); cli_sock.Close(); return 0;}
(2)运行结果: