[计网底层小探索]:实现并部署多线程并发Tcp服务器框架(基于生产者消费者模型的线程池结构)

服务器 0

在这里插入图片描述

文章目录

  • 一.网络层与传输层协议
    • sockaddr结构体继承体系(Linux体系)
    • 贯穿计算机系统的网络通信架构图示:
  • 二.实现并部署多线程并发Tcp服务器框架
    • 线程池模块
    • 序列化反序列化工具模块
    • 通信信道建立模块
    • 服务器主体模块
    • 任务回调模块(根据具体应用场景可重构)
    • Tips:DebugC++代码过程中遇到的问题记录

在这里插入图片描述

一.网络层与传输层协议

  • 网络层与传输层内置于操作系统的内核中,网络层一般使用ip协议,传输层常用协议为Tcp协议和Udp协议,Tcp协议和Udp协议拥有各自的特点和应用场景:
    在这里插入图片描述

sockaddr结构体继承体系(Linux体系)

  • sockaddr_in结构体用于存储网络通信主机进程的ip和端口号等信息
    在这里插入图片描述

贯穿计算机系统的网络通信架构图示:

在这里插入图片描述

二.实现并部署多线程并发Tcp服务器框架

小项目的完整文件的gittee链接

  • Tcp服务器架构:
    在这里插入图片描述

线程池模块

#pragma once#include <iostream>#include <pthread.h>#include "log.hpp"#include <semaphore.h>#include <vector>#include <cstdio>template<class T>class RingQueue{private:    pthread_mutex_t Clock_;    pthread_mutex_t Plock_;    sem_t Psem_;    sem_t Csem_;    std::vector<T> Queue_;    int Pptr_;    int Cptr_;    int capacity_;public:    RingQueue(int capacity = 10) : Queue_(capacity),Pptr_(0),Cptr_(0),capacity_(capacity){        sem_init(&Psem_,0,capacity);        sem_init(&Csem_,0,0);        pthread_mutex_init(&Clock_,nullptr);        pthread_mutex_init(&Plock_,nullptr);    }    ~RingQueue(){        sem_destroy(&Psem_);        sem_destroy(&Csem_);        pthread_mutex_destroy(&Clock_);        pthread_mutex_destroy(&Plock_);    }    T Pop(){        sem_wait(&Csem_);        pthread_mutex_lock(&Clock_);        T tem = Queue_[Cptr_];        Cptr_++;        Cptr_ %= capacity_;        pthread_mutex_unlock(&Clock_);        sem_post(&Psem_);        return tem;    }    void Push(T t){        sem_wait(&Psem_);        pthread_mutex_lock(&Plock_);        Queue_[Pptr_] = t;        Pptr_++;        Pptr_%= capacity_;        pthread_mutex_unlock(&Plock_);        sem_post(&Csem_);    }};
#pragma once#include "sem_cp.cpp"#include <pthread.h>#include <iostream>#include <string>#include <mutex>#include "CalTask.cpp"template<class Task>class Thread_Pool{    struct Thread_Data{        int Thread_num;        pthread_t tid;    };private:    RingQueue<Task> Queue_;  //线程安全的环形队列    std::vector<Thread_Data> thread_arr; //管理线程的容器    static std::mutex lock_;            //单例锁    static Thread_Pool<Task> * ptr_;    //单例指针private:    Thread_Pool(int capacity_Of_queue = 20) : Queue_(capacity_Of_queue){}    Thread_Pool(const Thread_Pool<Task>& Tp) = delete;    Thread_Pool<Task>& operator=(const Thread_Pool<Task> & Tp) = delete;public:    ~Thread_Pool(){}    //获取线程池单例-->注意C++的类模板静态成员函数需要在类体外进行定义    static Thread_Pool<Task> * Getinstance();    //创建多线程    void Create_thread(int thread_num = 10){        Thread_Data T_data;        for(int i = 0 ; i < thread_num ; ++i){            //注意线程池对象的this指针传递给线程            pthread_create(&T_data.tid,nullptr,Routine,this);            T_data.Thread_num = i + 1;            thread_arr.push_back(T_data);        }    }    //线程等待    void Thread_join(){        for(int i = 0 ;i < thread_arr.size() ; ++i){            pthread_join(thread_arr[i].tid,nullptr);        }    }    //向线程池中加入任务    void Push(Task T){        Queue_.Push(T);    }    void Push(Task && T){        Queue_.Push(std::forward<Task>(T));    }private:    //线程函数-->该函数没有在类外调用,所以无须在类体外定义    static void* Routine(void * args){        Thread_Pool<Task> * Pool = static_cast<Thread_Pool<Task> *>(args);        while(true){            std::cout << "Thread prepare to work/n" << std::endl;            Task Thread_Task = Pool->Queue_.Pop();            //要求Task类重载()-->用于执行具体任务            Thread_Task();        }        return nullptr;    }};//初始化静态指针template<class Task>Thread_Pool<Task> * Thread_Pool<Task>::ptr_ = nullptr;template<class Task>std::mutex Thread_Pool<Task>::lock_;//注意C++的类模板静态成员函数需要在类体外进行定义template<class Task>Thread_Pool<Task> * Thread_Pool<Task>::Getinstance(){    if(ptr_ == nullptr){        lock_.lock();        if(ptr_ == nullptr){            ptr_ = new Thread_Pool<Task>;        }        lock_.unlock();    }    return ptr_;}

序列化反序列化工具模块

  • 序列反序列化是保证通信过程中数据完整性的关键步骤,保证数据语义完整,结构完整

在这里插入图片描述

#pragma once#include <iostream>#include <string>// 自定义序列化反序列化协议const std::string blank_space_sep = " ";const std::string protocol_sep = "/n";//封装报文std::string Encode(std::string &content){    //报文正文字节数    std::string package = std::to_string(content.size());    package += protocol_sep;    package += content;    //用分隔符封装正文    package += protocol_sep;    return package;}//解析报文package-->"正文长度"/n"正文"/nbool Decode(std::string &package, std::string& content){    size_t pos = package.find(protocol_sep);    if(pos == std::string::npos) return false;    //解析报文正文长度    size_t Len = std::atoi(package.substr(0,pos).c_str());    //确定报文是否完整    size_t total_Len = pos + Len + 2;    if(package.size() != total_Len) return false;    //获取正文内容    content = package.substr(pos+1,Len);    package.erase(0,total_Len);    return true;}//用户层协议请求结构体class Request{public:    int x;    int y;    char op; public:    Request(int data1 , int data2 , char op)        : x(data1),y(data2),op(op){}    Request(){}public:    //请求结构体 序列化 成报文正文字符串 "x op y"    bool Serialize(std::string& out){        std::string content = std::to_string(x);        content += blank_space_sep;        content += op;        content += blank_space_sep;        content += std::to_string(y);        out = content;        return true;        // 等价的jason代码        // Json::Value root;        // root["x"] = x;        // root["y"] = y;        // root["op"] = op;        // // Json::FastWriter w;        // Json::StyledWriter w;        // out = w.write(root);        // return true;    }    //报文正文字符串 反序列化 成请求结构体    // "x op y"    bool Deserialize(const std::string &in) {        size_t left = in.find(blank_space_sep);        if(left == std::string::npos)return false;        x = std::stoi(in.substr(0,left).c_str());        std::size_t right = in.rfind(blank_space_sep);        if (right == std::string::npos)return false;        y = std::atoi(in.substr(right + 1).c_str());        if(left + 2 != right) return false;        op = in[left+1];        return true;        // 等价的jason代码        // Json::Value root;        // Json::Reader r;        // r.parse(in, root);        // x = root["x"].asInt();        // y = root["y"].asInt();        // op = root["op"].asInt();        // return true;    }    void DebugPrint()    {        std::cout << "新请求构建完成:  " << x << op << y << "=?" << std::endl;    }};//用户层协议请求回应结构体class Response{public:    int result;    int code; public:    Response(int res , int c)        : result(res),code(c){}    Response(){}public:    //请求回应结构体 序列化 成报文正文字符串 "result code"    bool Serialize(std::string& out){        std::string s = std::to_string(result);        s += blank_space_sep;        s += std::to_string(code);        out = s;        return true;        // 等价的jason代码        // Json::Value root;        // root["result"] = result;        // root["code"] = code;        // // Json::FastWriter w;        // Json::StyledWriter w;        // out = w.write(root);        // return true;    }    //"result code"    //报文正文字符串 反序列化 成请求回应结构体    bool Deserialize(const std::string &in)     {        std::size_t pos = in.find(blank_space_sep);        if (pos == std::string::npos)return false;        if(pos == 0 || pos == in.size() - 1) return false;        result = std::stoi(in.substr(0, pos).c_str());        code = std::stoi(in.substr(pos+1).c_str());        return true;        // 等价的jason代码        // Json::Value root;        // Json::Reader r;        // r.parse(in, root);        // result = root["result"].asInt();        // code = root["code"].asInt();        // return true;    }    void DebugPrint()    {        std::cout << "结果响应完成, result: " << result << ", code: "<< code << std::endl;    }};

通信信道建立模块

#pragma once#include <iostream>#include <string>#include <sys/types.h>   #include <sys/socket.h>#include "log.hpp"#include <memory.h>#include <arpa/inet.h>#include <netinet/in.h>namespace MySocket{    //Tcp通讯构建器    class TcpServer{        enum{            UsageError = 1,            SocketError,            BindError,            ListenError,        };    private:        int socketfd_ = -1;        std :: string ip_;        uint16_t port_;        int backlog_ = 10;    public:        TcpServer(const std::string& ip = "172.19.29.44", uint16_t port = 8081) : ip_(ip) , port_(port){}        ~TcpServer(){if(socketfd_ > 0) close(socketfd_);}    public:        //确定通信协议,建立文件描述符        void BuildSocket(){            socketfd_ = socket(AF_INET,SOCK_STREAM,0);            if(socketfd_ < 0){                lg(Fatal,"socket error,%s/n",strerror(errno));                exit(SocketError);            }        }        //文件描述符与服务器ip : 端口号绑定        void SocketBind(){            struct sockaddr_in addr;            memset(&addr,0,sizeof(addr));            addr.sin_port = htons(port_);            addr.sin_family = AF_INET;            addr.sin_addr.s_addr = inet_addr(ip_.c_str());            if(bind(socketfd_,(const sockaddr*)&addr,sizeof(addr)) < 0){                lg(Fatal,"socket bind error,%s/n",strerror(errno));                exit(BindError);            }            lg(Info,"socket bind success/n");        }        //启动服务监听,等待客户端的连接        void Socklisten(){            if(socketfd_ <= 0){                lg(Fatal,"socket error,%s/n",strerror(errno));                exit(SocketError);            }            if(listen(socketfd_,backlog_) < 0){                lg(Fatal, "listen error, %s: %d", strerror(errno), errno);                exit(ListenError);            }        }        //服务器接收客户端的连接-->并创建用于通信的文件描述符-->一个客户端连接对应一个文件描述符        int SockAccept(std::string& cilent_ip, uint16_t& cilent_port){            struct sockaddr_in client_addr;  // 输出型参数,用于获取用户的ip : 端口号            memset(&client_addr,0,sizeof(client_addr));            socklen_t Len = sizeof(client_addr);            int newfd = accept(socketfd_,(struct sockaddr*)&client_addr,&Len);            if(newfd < 0){                lg(Warning, "accept error, %s: %d", strerror(errno), errno);                return -1;            }            //提取客户端信息-->输出参数            char ipstr[64];            cilent_ip = inet_ntop(AF_INET,&client_addr.sin_addr,ipstr,sizeof(ipstr));            cilent_ip = ipstr;            cilent_port = ntohs(client_addr.sin_port);            return newfd;        }    public:        int Get_Server_fd(){            return socketfd_;        }        void Close_fd(){            if(socketfd_ > 0){                close(socketfd_);                socketfd_ = -1;            }        }    };};

服务器主体模块

在这里插入图片描述

#pragma once#include "ThreadPool.cpp"#include "TcpServer.cpp"#include "CalTask.cpp"#include "log.hpp"#include <signal.h>//构建计算器服务器class CalServer{    const int size = 2048;private:    Thread_Pool<CalTask> * Pool_ptr_;    MySocket::TcpServer Socket_;    int Socket_fd_ = -1;public:    CalServer(const std::string& de_ip = "172.19.29.44",uint16_t de_port = 8081)        : Socket_(de_ip,de_port)    {        Pool_ptr_ = Thread_Pool<CalTask>::Getinstance();        if(Pool_ptr_ == nullptr){            lg(Fatal,"Pool_ptr_ is nullptr/n");            return;        }        Pool_ptr_->Create_thread();    }    ~CalServer(){}public:    //建立Tcp连接条件    bool Init(){        Socket_.BuildSocket();        Socket_fd_ = Socket_.Get_Server_fd();        if(Socket_fd_ < 0){            lg(Fatal,"BuildSocket failed/n");            return true;        }        Socket_.SocketBind();        Socket_.Socklisten();        lg(Info, "init server .... done");        return true;    }    //启动服务器    void Start(){        signal(SIGCHLD, SIG_IGN);        signal(SIGPIPE, SIG_IGN);        char ReadBuffer[size];        while(true){            //接受用户请求            std::string client_ip;            uint16_t client_port;            int client_fd = Socket_.SockAccept(client_ip,client_port);            if(client_fd < 0){                lg(Warning,"SockAccept error/n");                continue;            }            lg(Info, "accept a new link, sockfd: %d, clientip: %s, clientport: %d", client_fd, client_ip.c_str(), client_port);            int n = read(client_fd,ReadBuffer,sizeof(ReadBuffer));            ReadBuffer[n] = 0;              std::string TaskStr(ReadBuffer);            printf("receives mess from client : %s",ReadBuffer);            if(n < 0){                lg(Warning,"read error/n");                break;            }            CalTask task(client_fd,client_ip,client_port,TaskStr);            Pool_ptr_->Push(task);        }    }};

任务回调模块(根据具体应用场景可重构)

#pragma once#include <string>#include "ThreadPool.cpp"#include "Protocol.cpp"enum{    Div_Zero = 1,    Mod_Zero,    Other_Oper};class CalTask{private:    int socketfd_;                //网络通信文件描述符    std :: string ip_;            //客户端ip    uint16_t port_;               //客户端端口号    std::string package_;         //客户请求字符串public:    CalTask(int socketfd,const std::string& ip , uint16_t & port,std::string & str)        : socketfd_(socketfd),ip_(ip),port_(port),package_(str){}    CalTask(){}//类一定要有默认构造函数    ~CalTask(){}public:    //执行计算任务并将结果发送给用户    void operator() (){        std::cout << "Task Running ... /n" << std::endl;        std::string content;        //将用户发送的报文进行解包获取正文        bool r = Decode(package_, content);        if (!r)return;        //将报文正文进行反序列化        Request req;        r = req.Deserialize(content);        if (!r)return ;        req.DebugPrint();        content = "";         //构建计算结果                                 Response resp = CalculatorHelper(req);        resp.DebugPrint();        //计算结果序列化成字符串        resp.Serialize(content);        //字符串正文封装成报文发送给用户        std::string ResStr = Encode(content);        write(socketfd_,ResStr.c_str(),ResStr.size());        if(socketfd_ > 0)close(socketfd_);    }private:    Response CalculatorHelper(const Request &req){        //构建请求回应结构体        Response resp(0, 0);        switch (req.op){        case '+':            resp.result = req.x + req.y;            break;        case '-':            resp.result = req.x - req.y;            break;        case '*':            resp.result = req.x * req.y;            break;        case '/':{            if (req.y == 0)                resp.code = Div_Zero;            else                resp.result = req.x / req.y;        }        break;        case '%':{            if (req.y == 0)                resp.code = Mod_Zero;            else                resp.result = req.x % req.y;        }        break;        default:            resp.code = Other_Oper;            break;        }        return resp;    }};

Tips:DebugC++代码过程中遇到的问题记录

  • 使用C++类模板时,若在类模板中定义了静态成员函数,且该静态成员函数在类外被调用,则该静态成员函数必须定义在类外,不然链接器无法找到函数体.
  • 注意类模板静态成员的声明格式需要加关键字temlpate<>
  • 声明类模板静态成员无需特化模版类型参数
  • 跨主机并发通信测试:
    在这里插入图片描述
    在这里插入图片描述

也许您对下面的内容还感兴趣: