一 多进程版TCP服务器
1.1 核心功能
对于之前编写的 字符串回响程序 来说,如果只有一个客户端进行连接并通信,是没有问题的,但如果有多个客户端发起连接请求,并尝试进行通信,服务器是无法应对的
原因在于 服务器是一个单进程版本,处理连接请求 和 业务处理 是串行化执行的,如果想处理下一个连接请求,需要把当前的业务处理完成。
具体表现为下面这种情况:
为什么客户端B会显示当前已经连接成功?
这是因为是客户端是主动发起连接请求的一方,在请求发出后,如果出现连接错误,客户端就认为已经连接成功了,但实际上服务器还没有处理这个连接请求.
这显然是服务器的问题,处理连接请求 与 业务处理 应该交给两个不同的执行流完成,可以使用多进程或者多线程解决,这里先采用多进程的方案
所以当前需要实现的网络程序核心功能为:当服务器成功处理连接请求后,fork
新建一个子进程,用于进行业务处理,原来的进程专注于处理连接请求。
1.2 创建子进程
注:当前的版本的修改只涉及 StartServer()
函数
创建子进程使用 fork()
函数,它的返回值含义如下
ret == 0
表示创建子进程成功,接下来执行子进程的代码ret > 0
表示创建子进程成功,接下来执行父进程的代码ret < 0
表示创建子进程失败
子进程创建成功后,会继承父进程的文件描述符表,能轻而易举的获取客户端的 socket
套接字,从而进行网络通信
当然不止文件描述符表,得益于 写时拷贝 机制,子进程还会共享父进程的变量,当发生修改行为时,才会自己创建。
注意: 当子进程取走客户端的 socket
套接字进行通信后,父进程需要将其关闭(因为它不需要了),避免文件描述符泄漏
StartServer()
服务器启动函数 — 位于server.hpp
的TcpServer
类
// 进程创建、等待所需要的头文件#include <unistd.h>#include <sys/wait.h>#include <sys/types.h> //启动服务器 void StartServer(){ // 忽略 SIGCHLD 信号 //signal(SIGCHLD, SIG_IGN); while(!_quit){ //1 处理连接请求 struct sockaddr_in client; socklen_t len = sizeof(client); int sock = accept(_listensock,(struct sockaddr*)&client,&len); //2 如果连接失败 继续尝试连接 if(sock == -1){ std::cerr<< "Accept Fail!:"<<strerror(errno)<<std::endl; continue; } // 连接成功,获取客户端信息 std::string clientip = inet_ntoa(client.sin_addr); uint16_t clientport = ntohs(client.sin_port); std::cout<<"Server accept"<<clientip + "-"<<clientport<<sock<<" from "<<_listensock << "success!"<<std::endl; //3 创建子进程 pid_t id=fork(); if(id<0){ // 创建子进程失败,暂时不与当前客户端建立通信会话 close(sock); std::cerr<<"Fork Fail!"<<std::endl; } else if( 0 == id){ //进入子进程 // 子进程拥有父进程相同的文件描述符,建议把不用的关闭 close(_listensock); // 执行业务处理函数 //4 这里因为是字节流传递,一般而言我们会自己写一个函数 Service(sock,clientip,clientport); exit(0); } else { // 父进程需要等待子进程 pid_t ret = waitpid(id, nullptr, 0); // 默认为阻塞式等待 //更改为非阻塞 // pid_t ret = waitpid(id,nullptr,WNOHANG); if(ret == id){ std::cout << "Wait " << id << " success!"; } } } }
虽然此时成功创建了子进程,但父进程(处理连接请求)仍然需要等待子进程退出后,才能继续运行,而不能和我们想象中一样单独进行处理连接请求函数,说白了就是 父进程现在处于阻塞等待状态,需要设置为 非阻塞等待.
1.3 设置非阻塞状态
设置父进程为非阻塞的方式有很多,这里来一一列举
方式一:通过参数设置为非阻塞等待(不推荐)
可以直接给 waitpid()
函数的参数3传递 WNOHANG
,表示当前为 非阻塞等待.
pid_t ret = waitpid(id, nullptr, WNOHANG); // 设置为非阻塞式等待
这种方法可行,但不推荐,原因如下:虽然设置成了非阻塞式等待,但父进程终究是需要通过 waitpid()
函数来尝试等待子进程,倘若父进程一直卡在 accept()
函数处,会导致子进程退出后暂时无人收尸,进而导致资源泄漏。
方式二:忽略 SIGCHLD
信号(推荐使用)
这是一个子进程在结束后发出的信号,默认动作是什么都不做;父进程需要检测并回收子进程,我们可以直接忽略该信号,这里的忽略是个特例,只是父进程不对其进行处理,转而由 操作系统 对其负责,自动清理资源并进行回收,不会产生 僵尸进程。
//启动服务器 void StartServer(){ // 忽略 SIGCHLD 信号 signal(SIGCHLD, SIG_IGN); while(!_quit){ //1 处理连接请求 struct sockaddr_in client; socklen_t len = sizeof(client); int sock = accept(_listensock,(struct sockaddr*)&client,&len); //2 如果连接失败 继续尝试连接 if(sock == -1){ std::cerr<< "Accept Fail!:"<<strerror(errno)<<std::endl; continue; } // 连接成功,获取客户端信息 std::string clientip = inet_ntoa(client.sin_addr); uint16_t clientport = ntohs(client.sin_port); std::cout<<"Server accept"<<clientip + "-"<<clientport<<sock<<" from "<<_listensock << "success!"<<std::endl; //3 创建子进程 pid_t id=fork(); if(id<0){ // 创建子进程失败,暂时不与当前客户端建立通信会话 close(sock); std::cerr<<"Fork Fail!"<<std::endl; } else if( 0 == id){ //进入子进程 // 子进程拥有父进程相同的文件描述符,建议把不用的关闭 close(_listensock); // 执行业务处理函数 //4 这里因为是字节流传递,一般而言我们会自己写一个函数 Service(sock,clientip,clientport); exit(0); } // else { // // 父进程需要等待子进程 // //pid_t ret = waitpid(id, nullptr, 0); // 默认为阻塞式等待 // //更改为非阻塞 // pid_t ret = waitpid(id,nullptr,WNOHANG); // if(ret == id){ // std::cout << "Wait " << id << " success!"; // } // } } }
强烈推荐使用该方案,因为操作简单,并且没有后患之忧。
方式三:设置 SIGCHLD
信号的处理动作为子进程回收(不是很推荐)
当子进程退出并发送该信号时,执行父进程回收子进程的操作。
设置 SIGCHLD
信号的处理动作为 回收子进程后,父进程同样不必再考虑回收子进程的问题
注意: 因为现在处于 TcpServer
类中,handler()
函数需要设置为静态(避免隐含的 this
指针),避免不符合 signal()
函数中信号处理函数的参数要求。
// 需要设置为静态 static void handler(int signo){ printf("进程 %d 捕捉到了 %d 号信号/n", getpid(), signo); // 这里的 -1 表示父进程等待时,只要是已经退出了的子进程,都可以进行回收 while (1){ pid_t ret = waitpid(-1, NULL, WNOHANG); if (ret > 0) printf("父进程: %d 已经成功回收了 %d 号进程/n", getpid(), ret); else break; } printf("子进程回收成功/n"); } //启动服务器 void StartServer(){ // 设置 SIGCHLD 信号的处理动作 signal(SIGCHLD, handler); // 忽略 SIGCHLD 信号 // signal(SIGCHLD, SIG_IGN); while(!_quit){ //1 处理连接请求 struct sockaddr_in client; socklen_t len = sizeof(client); int sock = accept(_listensock,(struct sockaddr*)&client,&len); //2 如果连接失败 继续尝试连接 if(sock == -1){ std::cerr<< "Accept Fail!:"<<strerror(errno)<<std::endl; continue; } // 连接成功,获取客户端信息 std::string clientip = inet_ntoa(client.sin_addr); uint16_t clientport = ntohs(client.sin_port); std::cout<<"Server accept"<<clientip + "-"<<clientport<<sock<<" from "<<_listensock << "success!"<<std::endl; //3 创建子进程 pid_t id=fork(); if(id<0){ // 创建子进程失败,暂时不与当前客户端建立通信会话 close(sock); std::cerr<<"Fork Fail!"<<std::endl; } else if( 0 == id){ //进入子进程 // 子进程拥有父进程相同的文件描述符,建议把不用的关闭 close(_listensock); // 执行业务处理函数 //4 这里因为是字节流传递,一般而言我们会自己写一个函数 Service(sock,clientip,clientport); exit(0); } else { // 父进程需要等待子进程 pid_t ret = waitpid(id, nullptr, 0); // 默认为阻塞式等待 //更改为非阻塞 // pid_t ret = waitpid(id,nullptr,WNOHANG); if(ret == id){ std::cout << "Wait " << id << " success!"; } } } }
为什么不是很推荐这种方法?因为这种方法实现起来比较麻烦,不如直接忽略 SIGCHLD
信号
方式四:设置孙子进程(不是很推荐)
众所周知,父进程只需要对子进程负责,至于孙子进程交给子进程负责,如果某个子进程的父进程终止运行了,那么它就会变成 孤儿进程,父进程会变成 1
号进程,也就是由操作系统领养,回收进程的重担也交给了操作系统
可以利用该特性,在子进程内部再创建一个子进程(孙子进程),然后子进程退出,父进程可以直接回收(不必阻塞),子进程(孙子进程)的父进程变成 1
号进程
这种实现方法比较巧妙,而且与我们后面即将学到的 守护进程 有关
注意: 使用这种方式时,父进程是需要等待子进程退出的。
这种方法代码也很简单,我们也不再做过多示例,但依旧不推荐,因为倘若连接请求变多,会导致孤儿进程变多,孤儿进程由操作系统接管,数量变多会给操作系统带来负担
以上就是设置 非阻塞 的四种方式,推荐使用方式二:忽略 SIGCHLD
信号。
至此我们的 字符串回响程序 可以支持多客户端了。
细节补充:当子进程取走 sock
套接字进行网络通信后,父进程就不需要使用 sock
套接字了,可以将其进行关闭,下次连接时继续使用,避免文件描述符不断增长。
StartServer()
服务器启动函数 — 位于server.hpp
服务器头文件中的TcpServer
类
// 启动服务器void StartServer(){ // 忽略 SIGCHLD 信号 signal(SIGCHLD, SIG_IGN); while (!_quit) { // 1.处理连接请求 // ... // 2.如果连接失败,继续尝试连接 // ... // 连接成功,获取客户端信息 // ... // 3.创建子进程 // ... close(sock); // 父进程不再需要资源(建议关闭) }}
这个补丁可以减少资源消耗,建议加上,前面是忘记加了,并且不太好修改,server.hpp
服务器头文件完整代码如下:
// server.hpp#pragma once#include <signal.h>#include<iostream>#include<string>#include<functional>#include<sys/types.h>#include<sys/socket.h>#include<netinet/in.h>#include<arpa/inet.h>#include"err.hpp"#include<cstring>#include<unistd.h>#include<cerrno>#include <sys/types.h>#include <sys/wait.h>namespace My_server{ // 默认端口号 const uint16_t default_port = 8088; //全连接队列的最大长度 const int backlog = 32; using func_t =std::function<std::string(std::string)>; class server { private: /* data */ //套接字 int _listensock; //端口号 uint16_t _port; // 判断服务器是否结束运行 bool _quit; // 外部传入的回调函数 func_t _func; public: server(const func_t &func,const uint16_t &port = default_port) :_func(func) ,_port(port) ,_quit(false) {} ~server(){} //初始化服务器 void InitServer(){ //1 创建套接字 _listensock = socket(AF_INET,SOCK_STREAM,0); if(_listensock == -1){ //绑定失败 std::cerr<<"Create Socket Fail:"<<strerror(errno)<<std::endl; exit(SOCKET_ERR); } std::cout<<"Create Socket Success!" <<_listensock<<std::endl; //2 绑定端口号和IP地址 struct sockaddr_in local; bzero(&local,sizeof(local)); local.sin_family = AF_INET; local.sin_port = htons(_port); local.sin_addr.s_addr = INADDR_ANY; if(bind(_listensock,(const sockaddr*)&local,sizeof(local))){ std::cerr << "Bind IP&&Port Fali" << strerror(errno) << std::endl; exit(BIND_ERR); } //3 开始监听 if(listen(_listensock,backlog)== -1){ std::cerr<<"Listen Fail: "<< strerror(errno) << std::endl; //新增一个报错 exit(LISTEN_ERR); } std::cout<<"Listen Success!"<<std::endl; } // // 需要设置为静态 // static void handler(int signo){ // printf("进程 %d 捕捉到了 %d 号信号/n", getpid(), signo); // // 这里的 -1 表示父进程等待时,只要是已经退出了的子进程,都可以进行回收 // while (1){ // pid_t ret = waitpid(-1, NULL, WNOHANG); // if (ret > 0) // printf("父进程: %d 已经成功回收了 %d 号进程/n", getpid(), ret); // else // break; // } // printf("子进程回收成功/n"); // } //启动服务器 void StartServer(){ // 设置 SIGCHLD 信号的处理动作 //signal(SIGCHLD, handler); // 忽略 SIGCHLD 信号 signal(SIGCHLD, SIG_IGN); while(!_quit){ //1 处理连接请求 struct sockaddr_in client; socklen_t len = sizeof(client); int sock = accept(_listensock,(struct sockaddr*)&client,&len); //2 如果连接失败 继续尝试连接 if(sock == -1){ std::cerr<< "Accept Fail!:"<<strerror(errno)<<std::endl; continue; } // 连接成功,获取客户端信息 std::string clientip = inet_ntoa(client.sin_addr); uint16_t clientport = ntohs(client.sin_port); std::cout<<"Server accept"<<clientip + "-"<<clientport<<sock<<" from "<<_listensock << "success!"<<std::endl; //3 创建子进程 pid_t id=fork(); if(id<0){ // 创建子进程失败,暂时不与当前客户端建立通信会话 close(sock); std::cerr<<"Fork Fail!"<<std::endl; } else if( 0 == id){ //进入子进程 // 子进程拥有父进程相同的文件描述符,建议把不用的关闭 close(_listensock); // 执行业务处理函数 //4 这里因为是字节流传递,一般而言我们会自己写一个函数 Service(sock,clientip,clientport); exit(0); } // else { // // 父进程需要等待子进程 // pid_t ret = waitpid(id, nullptr, 0); // 默认为阻塞式等待 // //更改为非阻塞 // // pid_t ret = waitpid(id,nullptr,WNOHANG); // if(ret == id){ // std::cout << "Wait " << id << " success!"; // } // } close(sock); // 父进程不再需要资源(建议关闭) } } void Service(int sock,std::string &clientip,const uint16_t &clientport){ char buff[1024]; std::string who = clientip + "-" + std::to_string(clientport); while(true){ // 以字符串格式读取,预留/0的位置 ssize_t n = read(sock,buff,sizeof(buff)-1); if(n>0){ //读取成功 buff[n]='/0'; std::cout<<"Server get: "<< buff <<" from "<<who<<std::endl; //实际处理可以交给上层逻辑指定 std::string respond = _func(buff); write(sock,buff,strlen(buff)); } else if(n==0){ //表示当前读到了文件末尾,结束读取 std::cout<<"Client "<<who<<" "<<sock<<" quit!"<<std::endl; close(sock); break; } else{ // 读取出问题(暂时) std::cerr << "Read Fail!" << strerror(errno) << std::endl; close(sock); // 关闭文件描述符 break; } } } }; }
二 多线程版服务器
2.1 核心功能
通过多线程,实现支持多客户端同时通信的服务器
核心功能:服务器与客户端成功连接后,创建一个线程,服务于客户端的业务处理
'这里先通过 原生线程库 模拟实现.
2.2 使用原生线程库
线程的回调函数中需要 Service()
业务处理函数中的所有参数,同时也需要具备访问 Service()
业务处理函数的能力,单凭一个 void*
的参数是无法解决的,为此可以创建一个类,里面可以包含我们所需要的参数。
ThreadData
类 — 位于server.hpp
服务器头文件中。
//包含我们所需参数的类型 class ThreadData{ public: ThreadData(int sock,const std::string&ip,const uint16_t&port,server*ptr) :_sock(sock) ,_clientip(ip) ,_clientport(port) ,_current(ptr) {} public: int _sock; std::string _clientip; uint16_t _clientport; server* _current; };
接下来就可以考虑如何借助多线程了
线程创建后,需要关闭不必要的
socket
套接字吗?
- 不需要,线程之间是可以共享这些资源的,无需关闭
如何设置主线程不必等待次线程退出?
- 可以把次线程进行分离
所以接下来我们需要在连接成功后,创建次线程,利用已有信息构建 ThreadData
对象,为次线程编写回调函数(最终目的是为了执行 Service()
业务处理函数)
注意: 因为当前在类中,线程的回调函数需要使用 static
设置为静态函数。
server.hpp
服务器头文件
// server.hpp#pragma once#include<iostream>#include<string>#include<functional>#include<sys/types.h>#include<sys/socket.h>#include<netinet/in.h>#include<arpa/inet.h>#include"err.hpp"#include<cstring>#include<unistd.h>#include<cerrno>namespace My_server{ // 默认端口号 const uint16_t default_port = 8088; //全连接队列的最大长度 const int backlog = 32; using func_t = std::function<std::string(std::string)>; //前置声明 class server; //包含我们所需参数的类型 class ThreadData{ public: ThreadData(int sock,const std::string&ip,const uint16_t&port,server*ptr) :_sock(sock) ,_clientip(ip) ,_clientport(port) ,_current(ptr) {} public: int _sock; std::string _clientip; uint16_t _clientport; server* _current; }; class server { private: /* data */ //套接字 int _listensock; //端口号 uint16_t _port; // 判断服务器是否结束运行 bool _quit; // 外部传入的回调函数 func_t _func; public: server(const func_t &func,const uint16_t &port = default_port) :_func(func) ,_port(port) ,_quit(false) {} ~server(){} //初始化服务器 void InitServer(){ //1 创建套接字 _listensock = socket(AF_INET,SOCK_STREAM,0); if(_listensock == -1){ //绑定失败 std::cerr<<"Create Socket Fail:"<<strerror(errno)<<std::endl; exit(SOCKET_ERR); } std::cout<<"Create Socket Success!" <<_listensock<<std::endl; //2 绑定端口号和IP地址 struct sockaddr_in local; bzero(&local,sizeof(local)); local.sin_family = AF_INET; local.sin_port = htons(_port); local.sin_addr.s_addr = INADDR_ANY; if(bind(_listensock,(const sockaddr*)&local,sizeof(local))){ std::cerr << "Bind IP&&Port Fali" << strerror(errno) << std::endl; exit(BIND_ERR); } //3 开始监听 if(listen(_listensock,backlog)== -1){ std::cerr<<"Listen Fail: "<< strerror(errno) << std::endl; //新增一个报错 exit(LISTEN_ERR); } std::cout<<"Listen Success!"<<std::endl; } //启动服务器 void StartServer(){ while(!_quit){ //1 处理连接请求 struct sockaddr_in client; socklen_t len = sizeof(client); int sock = accept(_listensock,(struct sockaddr*)&client,&len); //2 如果连接失败 继续尝试连接 if(sock == -1){ std::cerr<< "Accept Fail!:"<<strerror(errno)<<std::endl; continue; } // 连接成功,获取客户端信息 std::string clientip = inet_ntoa(client.sin_addr); uint16_t clientport = ntohs(client.sin_port); std::cout<<"Server accept"<< clientip + "-"<< clientport <<sock<<" from "<<_listensock << "success!"<<std::endl; // 3.创建线程及所需要的线程信息类 ThreadData* td = new ThreadData(sock, clientip, clientport, this); pthread_t p; pthread_create(&p, nullptr, Routine, td); } } // 线程回调函数 static void* Routine(void* args){ // 线程分离 pthread_detach(pthread_self()); ThreadData* td = static_cast<ThreadData*>(args); // 调用业务处理函数 td->_current->Service(td->_sock, td->_clientip, td->_clientport); // 销毁对象 delete td; return nullptr; } void Service(int sock,std::string &clientip,const uint16_t &clientport){ char buff[1024]; std::string who = clientip + "-" + std::to_string(clientport); while(true){ // 以字符串格式读取,预留/0的位置 ssize_t n = read(sock,buff,sizeof(buff)-1); if(n>0){ //读取成功 buff[n]='/0'; std::cout<<"Server get: "<< buff <<" from "<<who<<std::endl; //实际处理可以交给上层逻辑指定 std::string respond = _func(buff); write(sock,buff,strlen(buff)); } else if(n==0){ //表示当前读到了文件末尾,结束读取 std::cout<<"Client "<<who<<" "<<sock<<" quit!"<<std::endl; close(sock); break; } else{ // 读取出问题(暂时) std::cerr << "Read Fail!" << strerror(errno) << std::endl; close(sock); // 关闭文件描述符 break; } } } }; }
因为当前使用了 原生线程库,所以在编译时,需要加上 -lpthread
Makefile
文件
.PHONY:allall:server clientserver:server.cc g++ -o $@ $^ -std=c++11 -lpthread client:client.cc g++ -o $@ $^ -std=c++11 -lpthread.PHONY:cleanclean: rm -rf server client
使用 原生线程库 过于单薄了,并且这种方式存在问题:连接都准备好了,才创建线程,如果创建线程所需要的资源较多,会拖慢服务器整体连接效率
为此可以改用之前实现的 线程池
三 线程池版服务器
3.1 ThreadPool.hpp 线程池头文件
#pragma once#include <vector>#include <string>#include <memory>#include <functional>#include <unistd.h>#include <pthread.h>#include "Task.hpp"#include "Thread.hpp"#include "BlockingQueue.hpp" // CP模型namespace My_pool{ const int THREAD_NUM = 10; template<class T> class ThreadPool { private: ThreadPool(int num = THREAD_NUM) :_num(num) {} ~ThreadPool(){ // 等待线程退出 for(auto &t : _threads) t.join(); } // 删除拷贝构造 ThreadPool(const ThreadPool<T> &) = delete; public: static ThreadPool<T>* getInstance(){ // 双检查 if(_inst == nullptr){ // 加锁 LockGuard lock(&_mtx); if(_inst == nullptr){ // 创建对象 _inst = new ThreadPool<T>(); // 初始化及启动服务 _inst->init(); _inst->start(); } } return _inst; } public: void init(){ // 创建一批线程 for(int i = 0; i < _num; i++) _threads.push_back(Thread(i, threadRoutine, this)); } void start(){ // 启动线程 for(auto &t : _threads) t.run(); } // 提供给线程的回调函数(已修改返回类型为 void) static void threadRoutine(void *args){ // 避免等待线程,直接剥离 pthread_detach(pthread_self()); auto ptr = static_cast<ThreadPool<T>*>(args); while (true){ // 从CP模型中获取任务 T task = ptr->popTask(); task(); // 回调函数 } } // 装载任务 void pushTask(const T& task){ _blockqueue.Push(task); } protected: T popTask(){ T task; _blockqueue.Pop(&task); return task; } private: std::vector<Thread> _threads; int _num; // 线程数量 My_Queue::BlockingQueue<T> _blockqueue; // 阻塞队列 // 创建静态单例对象指针及互斥锁 static ThreadPool<T> *_inst; static pthread_mutex_t _mtx; }; // 初始化指针 template<class T> ThreadPool<T>* ThreadPool<T>::_inst = nullptr; // 初始化互斥锁 template<class T> pthread_mutex_t ThreadPool<T>::_mtx = PTHREAD_MUTEX_INITIALIZER;}
3.2 Thread.hpp
封装实现的线程库头文件
#pragma once #include<iostream>#include<pthread.h>#include<string>//代表线程状态enum class Status{ NEW = 0, RUNNING , EXIT};// 参数。返回值为void* 返回值的函数类型typedef void (*func_t)(void*);class Thread{private: pthread_t _tid; // 线程 ID std::string _name; // 线程名 Status _status; // 线程状态 func_t _func; // 线程回调函数 void* _args; // 传递给回调函数的参数public: Thread(int num=0,func_t func = nullptr,void *args = nullptr) :_tid(num) ,_func(func) ,_status(Status::NEW) ,_args(args) { char name[1024]; snprintf(name,sizeof(name),"thread - %d",num); _name = name; } ~Thread(){} //获取线程名 std::string getName() const{ return _name; } // 获取状态 Status getStatus() const{ return _status; } // 回调方法 static void* runHelper(void *args){ Thread * myThis = static_cast<Thread*>(args); myThis->_func(myThis->_args); return nullptr; } //启动线程 void run(){ int ret = pthread_create(&_tid,nullptr,runHelper,this); if(0 != ret){ std::cerr << "Thread create fail!"<<std::endl; exit(1); } _status = Status::RUNNING; } // 线程等待 void join(){ int ret = pthread_join(_tid,nullptr); if(0 != ret){ if(0 != ret){ std::cerr << "Thread join fail!"<<std::endl; exit(1); } } _status = Status::EXIT; }};
3.3 BlockingQueue.hpp
生产者消费者模型头文件
#pragma once#include <queue>#include <mutex>#include <pthread.h>#include "LockGuard.hpp"namespace My_Queue{ const int DEF_SIZE = 10; template<class T> class BlockingQueue { private: // 任务队列 std::queue<T> _queue; size_t _cap; // 阻塞队列的容量 pthread_mutex_t _mtx; // 互斥锁 pthread_cond_t _pro_cond; // 生产者条件变量 pthread_cond_t _con_cond; // 消费者条件变量 public: BlockingQueue(size_t cap = DEF_SIZE) :_cap(cap) { // 初始化锁与条件变量 pthread_mutex_init(&_mtx,nullptr); pthread_cond_init(&_pro_cond,nullptr); pthread_cond_init(&_con_cond,nullptr); } ~BlockingQueue(){ //销毁锁与条件变量 pthread_mutex_destroy(&_mtx); pthread_cond_destroy(&_pro_cond); pthread_cond_destroy(&_con_cond); } // 生产数据(入队) void Push(const T& inData){ // 加锁(RAII风格) LockGuard lock(&_mtx); // 循环判断条件是否满足 while(IsFull()){ pthread_cond_wait(&_pro_cond, &_mtx); } _queue.push(inData); // 可以加策略唤醒,比如生产一半才唤醒消费者 pthread_cond_signal(&_con_cond); // 自动解锁 } // 消费数据(出队) void Pop(T* outData){ // 加锁(RAII 风格) LockGuard lock(&_mtx); // 循环判读条件是否满足 while(IsEmpty()) { pthread_cond_wait(&_con_cond, &_mtx); } *outData = _queue.front(); _queue.pop(); // 可以加策略唤醒,比如消费完后才唤醒生产者 pthread_cond_signal(&_pro_cond); // 自动解锁 } private: //判断是否为满 bool IsFull(){ return _queue.size() == _cap; } //判断是否为空 bool IsEmpty(){ return _queue.empty(); } };}
3.4 LockGuard.hpp
自动化锁头文件
#pragma once#include<pthread.h>class LockGuard{private: pthread_mutex_t* _pmtx;public: LockGuard(pthread_mutex_t *pmtx) :_pmtx(pmtx) { //加锁 pthread_mutex_lock(_pmtx); } ~LockGuard(){ //解锁 pthread_mutex_unlock(_pmtx); }};
3.5 Task.hpp
任务类
现在需要修改 Task.hpp
任务头文件中的 Task
任务类,将其修改为一个服务于 网络通信中业务处理 的任务类(也就是 Service()
业务处理函数)
在 Service()
业务处理函数中,需要包含 socket
套接字、客户端 IP
、客户端端口号 等必备信息,除此之外,我们还可以将 可调用对象(Service()
业务处理函数) 作为参数传递给 Task
对象.
#pragma once#include <string>#include <functional>namespace My_task{ // Service() 业务处理函数的类型 using cb_t = std::function<void(int, std::string, uint16_t)>; class Task{ public: // 可以再提供一个默认构造(防止部分场景中构建对象失败) Task() {} Task(int sock, const std::string& ip, const uint16_t& port, const cb_t& cb) :_sock(sock) ,_ip(ip) ,_port(port) ,_cb(cb) {} // 重载运算操作,用于回调 [业务处理函数] void operator()(){ // 直接回调 cb [业务处理函数] 即可 _cb(_sock, _ip, _port); } private: int _sock; std::string _ip; uint16_t _port; cb_t _cb; // 回调函数 };}
3.6 server.hpp 头文件
准备工作完成后,接下来就是往 server.hpp
服务器头文件中添加组件了
注意:
- 在构建
Task
对象时,需要使用bind
绑定类内函数,避免参数不匹配- 当前的线程池是单例模式,在
Task
任务对象构建后,通过线程池操作句柄push
对象即可
// server.hpp#pragma once#include<iostream>#include<string>#include<functional>#include<sys/types.h>#include<sys/socket.h>#include<netinet/in.h>#include<arpa/inet.h>#include"err.hpp"#include<cstring>#include<unistd.h>#include<cerrno>#include"ThreadPool.hpp"#include"Task.hpp"namespace My_server{ // 默认端口号 const uint16_t default_port = 1111; //全连接队列的最大长度 const int backlog = 32; using func_t = std::function<std::string(std::string)>; //前置声明 class server; //包含我们所需参数的类型 class ThreadData{ public: ThreadData(int sock,const std::string&ip,const uint16_t&port,server*ptr) :_sock(sock) ,_clientip(ip) ,_clientport(port) ,_current(ptr) {} public: int _sock; std::string _clientip; uint16_t _clientport; server* _current; }; class server { private: /* data */ //套接字 int _listensock; //端口号 uint16_t _port; // 判断服务器是否结束运行 bool _quit; // 外部传入的回调函数 func_t _func; public: server(const func_t &func,const uint16_t &port = default_port) :_func(func) ,_port(port) ,_quit(false) {} ~server(){} //初始化服务器 void InitServer(){ //1 创建套接字 _listensock = socket(AF_INET,SOCK_STREAM,0); if(_listensock == -1){ //绑定失败 std::cerr<<"Create Socket Fail:"<<strerror(errno)<<std::endl; exit(SOCKET_ERR); } std::cout<<"Create Socket Success!" <<_listensock<<std::endl; //2 绑定端口号和IP地址 struct sockaddr_in local; bzero(&local,sizeof(local)); local.sin_family = AF_INET; local.sin_port = htons(_port); local.sin_addr.s_addr = INADDR_ANY; if(bind(_listensock,(const sockaddr*)&local,sizeof(local))){ std::cerr << "Bind IP&&Port Fali" << strerror(errno) << std::endl; exit(BIND_ERR); } //3 开始监听 if(listen(_listensock,backlog)== -1){ std::cerr<<"Listen Fail: "<< strerror(errno) << std::endl; //新增一个报错 exit(LISTEN_ERR); } std::cout<<"Listen Success!"<<std::endl; } //启动服务器 void StartServer(){ while(!_quit){ //1 处理连接请求 struct sockaddr_in client; socklen_t len = sizeof(client); int sock = accept(_listensock,(struct sockaddr*)&client,&len); //2 如果连接失败 继续尝试连接 if(sock == -1){ std::cerr<< "Accept Fail!:"<<strerror(errno)<<std::endl; continue; } // 连接成功,获取客户端信息 std::string clientip = inet_ntoa(client.sin_addr); uint16_t clientport = ntohs(client.sin_port); std::cout<<"Server accept"<< clientip + "-"<< clientport <<sock<<" from "<<_listensock << "success!"<<std::endl; // 3.构建任务对象 注意:使用 bind 绑定 this 指针 My_task::Task t(sock, clientip, clientport, std::bind(&server::Service, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); // 4.通过线程池操作句柄,将任务对象 push 进线程池中处理 //s //std::cout<<std::endl<<"push Task"<<std::endl; My_pool::ThreadPool<My_task::Task>::getInstance()->pushTask(t); } } void Service(int sock,const std::string &clientip,const uint16_t &clientport){ char buff[1024]; std::string who = clientip + "-" + std::to_string(clientport); while(true){ // 以字符串格式读取,预留/0的位置 ssize_t n = read(sock,buff,sizeof(buff)-1); if(n>0){ //读取成功 buff[n]='/0'; std::cout<<"Server get: "<< buff <<" from "<<who<<std::endl; //实际处理可以交给上层逻辑指定 std::string respond = _func(buff); write(sock,buff,strlen(buff)); } else if(n==0){ //表示当前读到了文件末尾,结束读取 std::cout<<"Client "<<who<<" "<<sock<<" quit!"<<std::endl; close(sock); break; } else{ // 读取出问题(暂时) std::cerr << "Read Fail!" << strerror(errno) << std::endl; close(sock); // 关闭文件描述符 break; } } } }; }
接下来编译并运行程序,当服务器启动后(此时无客户端连接),只有一个线程,这是因为我们当前的 线程池 是基于 懒汉模式 实现的,只有当第一次使用时,才会创建线程.
接下来启动客户端,可以看到确实创建了一批次线程(十个)
看似程序已经很完善了,其实隐含着一个大问题:当前线程池中的线程,本质上是在回调一个 while(true)
死循环函数,当连接的客户端大于线程池中的最大线程数时,会导致所有线程始终处于满负载状态,直接影响就是连接成功后,无法再创建通信会话(倘若客户端不断开连接,线程池中的线程就无力处理其他客户端的会话)
说白了就是 线程池 比较适合用于处理短任务,对于当前的场景来说,线程池 不适合建立持久通信会话,应该将其用于处理 read
读取、write
写入 任务.
如果想解决这个问题,有两个方向:Service()
函数中支持一次 [收 / 发],或者多线程+线程池,多线程用于构建通信会话,线程池则用于处理 [收 / 发] 任务
前者实现起来比较简单,无非就是把 Service()
业务处理函数中的 while(true)
循环去掉
Service()
业务处理函数
void Service(int sock,const std::string &clientip,const uint16_t &clientport){ char buff[1024]; std::string who = clientip + "-" + std::to_string(clientport); // 以字符串格式读取,预留/0的位置 ssize_t n = read(sock,buff,sizeof(buff)-1); if(n>0){ //读取成功 buff[n]='/0'; std::cout<<"Server get: "<< buff <<" from "<<who<<std::endl; //实际处理可以交给上层逻辑指定 std::string respond = _func(buff); write(sock,buff,strlen(buff)); } else if(n==0){ //表示当前读到了文件末尾,结束读取 std::cout<<"Client "<<who<<" "<<sock<<" quit!"<<std::endl; close(sock); } else{ // 读取出问题(暂时) std::cerr << "Read Fail!" << strerror(errno) << std::endl; close(sock); // 关闭文件描述符 } }
至于后者就比较麻烦了,需要结合 高级IO 相关知识,这里不再阐述