boost asio库的一些记录(个人用)

发布时间 2023-06-16 20:06:21作者: koodu

BOOST asio

#include<iostream>
#include<boost/asio.hpp>
#include<boost/date_time/posix_time/posix_time.hpp>
int main()
{
    boost::asio::io_service io; //上下文,事件轮询处理框架(类似libevent的event_base)
    boost::asio::deadline_timer t(io,boost::posix_time::seconds(5));//定时事件
    t.wait();//等待定时事件执行结束, 阻塞在这    
    std::cout<<"aaaaa"<<std::endl;
    return 0;
}

boost库设定定时事件

async_wait()异步等待

  1 #include<iostream>
  2 #include<boost/asio.hpp>
  3 #include<boost/date_time/posix_time/posix_time.hpp>
  4 
  5 void func1(const boost::system::error_code&){
  6     std::cout<<"任务执行"<<std::endl;
  7 }
  8 int main()
  9 {
 10     boost::asio::io_service io;
 11     boost::asio::deadline_timer t(io,boost::posix_time::seconds(5));//定时事件
        //异步等待,不会在此阻塞,继续执行下面代码,时间到后执行回调函数
 12     t.async_wait(func1);//定时等待事件到时间后执行func1 函数,登记函数到事件池
 13     std::cout<<"1111"<<std::endl;//主线程中不等待该定时事件到时
 14     io.run();//while(1) 循环 ioservice 不断循环事件池,若事件池里没有事件则run退出,有则等待执行 事件5秒后执行func1 事件池空则退出run                               
 15     std::cout<<"2222"<<std::endl;
 16     return 0;
 17 }

111
任务执行
222

boost库的另一个关于时间的头文件

  1 #include<iostream>
  2 #include<boost/asio.hpp>
  3 #include<boost/asio/steady_timer.hpp>
  4 //#include<boost/date_time/posix_time/posix_time.hpp>
  5 
  6 void func1(const boost::system::error_code&){
  7     std::cout<<"任务执行"<<std::endl;
  8 }
  9 int main()
 10 {
 11     boost::asio::io_service io;
 12     boost::asio::steady_timer t(io);//定时事件
 13     t.expires_from_now(std::chrono::seconds(5));//定时5s
 14     t.wait();//main阻塞
 15     std::cout<<"eeeee"<<std::endl;                                                                                                                                  
 16     return 0;
 17 }

使用boost::bind绑定回调函数
  1 #include<iostream>
  2 #include<boost/asio.hpp>
  3 //#include<boost/asio/steady_timer.hpp>
  4 #include<boost/date_time/posix_time/posix_time.hpp>
  5 #include<boost/bind.hpp>                                                                 
  7 void print(const boost::system::error_code&,boost::asio::deadline_timer* t,int* count)
  8 {
  9     if(*count<5){
 10         std::cout<<*count<<"   "<<std::endl;
 11         ++(*count);
 12         //t为定时事件对象,执行本函数时,t已到时失效了,t->expires_at()获取到时时间,又重新设定到时时间
 13         t->expires_at(t->expires_at()+boost::posix_time::seconds(1));
 14         //t到时后,执行下面绑定的函数   bind绑定函数名和相关参数
 15         t->async_wait(boost::bind(print,boost::asio::placeholders::error,t,count));
 16     }
 17 }
 18 int main()
 19 {
 20     boost::asio::io_service io;
 21     int count=0;
 22     boost::asio::deadline_timer t(io,boost::posix_time::seconds(1));//定时事件1s
 23     t.async_wait(boost::bind(print,boost::asio::placeholders::error,&t,&count));
 24     io.run();
 25     std::cout<<"end  count is:"<<count<<std::endl;
 26     return 0;
 27 }

 t.async_wait([&t,&count](boost::asio::placeholders::error err){print(err,&t,&count);});//到时后执行lambda函数,lambda函数里执行print()
 //改为使用lambda,上面代码语法有错

io_service的io.run();执行结束后,io的状态就改变了,后面再io.run()不执行了

async_write()异步写,生命周期

对于异步,在其调用回调函数前,asio对涉及的socket等的生命周期不做保证,因此用户必须保证相关对象的生命

enable_shared_from_this

shared_from_this();在bind()里/在(类成员函数里)调用本类执行该函数的对象

(获得自身对象):在类的构造函数里不能调用shared_from_this(),因为构造还没完

在多线程中,有一个线程专门执行io_service的run();其他线程要把事件加载到执行io.run()的线程,使用io_service.poet()将要加载的事件打包成lambda函数

boost库(网络通信上库,使用按照库函数,标志点,端口通信)
//客户端
int send_data_by_send(){
    std::string raw_ip_address = "127.0.0.1";
    unsigned short port_num = 3333;
    try {
        asio::ip::tcp::endpoint
            ep(asio::ip::address::from_string(raw_ip_address),
                port_num);//建立网络通信端点
        asio::io_service ios;
        // Step 1. Allocating and opening the socket.
        //socket放到io_service,协议是endpoint.protocol()
        asio::ip::tcp::socket sock(ios, ep.protocol());//socket IP v4
        sock.connect(ep);
        std::string buf = "Hello World!";
  //send要求参数是MutableBufferSequence,asio::buffer()返回mutable_buffer_1,是MutableBufferSequence的适配器,可作为MutableBufferSequence的参数
        int send_length = sock.send(asio::buffer(buf.c_str(), buf.length(endpoint)));
        if (send_length <= 0) {
            cout << "send failed" << endl;
            return 0;
        }
    }
    catch (system::system_error& e) {
        std::cout << "Error occured! Error code = " << e.code()
            << ". Message: " << e.what();
        return e.code().value();
    }
    return 0;
}
std::cin.getline(char*,length);// 从标准输入流中读取数据到char*
------------------------------------------------------------
char reply[MAX_LENGTH];  //读取服务端发送的数据
        size_t reply_length = boost::asio::read(sock,
            boost::asio::buffer(reply, request_length));//const_buffer_1
        std::cout << "Reply is: ";
        std::cout.write(reply, reply_length);//将reply的数据输出到标准输出流
服务器端进行通信相关 生成一个acceptor的socket,用来接收新的连接。
int accept_new_connection(){
    const int BACKLOG_SIZE = 30;
    unsigned short port_num = 3333;
    asio::ip::tcp::endpoint ep(asio::ip::address_v4::any(),
        port_num);
    asio::io_context  ios;
    try {
        asio::ip::tcp::acceptor acceptor(ios, ep.protocol());// 生成一个acceptor的socket,用来接收新的连接
        acceptor.bind(ep);
        acceptor.listen(BACKLOG_SIZE);
        for (;;) {
            asio::ip::tcp::socket sock(ios);//与建立连接的客户端交互通信的socket
            acceptor.accept(sock);
        }
    }
    catch (system::system_error& e) {
        std::cout << "Error occured! Error code = " << e.code()
            << ". Message: " << e.what();
        return e.code().value();
    }
}
void session(socket_ptr sock) {
    try {
        for (;;) {
            char data[max_length];
            memset(data, '\0', max_length);
            boost::system::error_code error;
            size_t length = sock->read_some(boost::asio::buffer(data, max_length), error);
            if (error == boost::asio::error::eof) {
                std::cout << "connection closed by peer" << endl;
                break;
            }
            else if (error) {
                throw boost::system::system_error(error);
            }
            //获取对端的端点,address
            cout << "receive from " << sock->remote_endpoint().address().to_string() << endl;
            cout << "receive message is " << data << endl;
            //回传信息值
            boost::asio::write(*sock, boost::asio::buffer(data, length));
        }
    }
    catch (std::exception& e) {
        std::cerr << "Exception in thread: " << e.what() << "\n" << std::endl;
    }
}
buffer
boost::asio提供了asio::mutable_buffer 和 asio::const_buffer这两个结构,他们是一段连续的空间,首字节存储了后续数据的长度。
asio::mutable_buffer用于写服务,asio::const_buffer用于读服务。但是这两个结构都没有被asio的api直接使用。
对于api的buffer参数,asio提出了MutableBufferSequence和ConstBufferSequence概念,他们是由多个asio::mutable_buffer和asio::const_buffer组成的。也就是说boost::asio为了节省空间,将一部分连续的空间组合起来,作为参数交给api使用。
我们可以理解为MutableBufferSequence的数据结构为std::vector<asio::mutable_buffer>

asio的buffer()函数,该函数接收多种形式的字节流,该函数返回asio::mutable_buffers_1 o或者asio::const_buffers_1结构的对象。
如果传递给buffer()的参数是一个只读类型,则函数返回asio::const_buffers_1 类型对象。
如果传递给buffer()的参数是一个可写类型,则返回asio::mutable_buffers_1 类型对象。
asio::const_buffers_1和asio::mutable_buffers_1是asio::mutable_buffer和asio::const_buffer的适配器,提供了符合MutableBufferSequence和ConstBufferSequence概念的接口,所以他们可以作为boost::asio的api函数的参数使用。
write_some可以每次向指定的空间写入固定的字节数,如果写缓冲区满了,就只写一部分,返回写入的字节数。sock.write_some(asio::buffer());
send函数会一次性将buffer中的内容发送给对端,如果有部分字节因为发送缓冲区满无法发送,则阻塞等待,直到发送缓冲区可用,则继续发送完成。 int send_length = sock.send(asio::buffer(buf.c_str(), buf.length()));
write函数,可以一次性将所有数据发送给对端,如果发送缓冲区满了则阻塞,直到发送缓冲区可用,将数据发送完成。
int send_length  = asio::write(sock, asio::buffer(buf.c_str(), buf.length()));
同步读read_some sock.read_some(asio::buffer(char*,lenth));
receive 可以一次性同步接收对方发送的数据int receive_length =  sock.receive(asio::buffer(buffer_receive, BUFF_SIZE));
read   int receive_length = asio::read(sock, asio::buffer(buffer_receive, BUFF_SIZE));

由于读写中,没有数据,会不断阻塞。为了后面继续执行,通过异步方式进行,当有数据读写时,调用回调函数处理

boost异步写async
void Session::WriteToSocketErr(const std::string& buf) {//发送数据
    _send_node = make_shared<MsgNode>(buf.c_str(), buf.length());
    //异步发送数据,因为异步所以不会一下发送完
    this->_socket->async_write_some(asio::buffer(_send_node->_msg, 
        _send_node->_total_len),//缓冲区满后,不会阻塞等待缓冲区可以写入数据,而是执行其他事,缓冲区可写数据时,执行绑定的回调函数继续发送
        std::bind(&Session::WriteCallBackErr,
            this, std::placeholders::_1, std::placeholders::_2, _send_node));
}
//std::size_t bytes_transferred获取前面发送了多少数据
void Session::WriteCallBackErr(const boost::system::error_code& ec, 
    std::size_t bytes_transferred, std::shared_ptr<MsgNode> msg_node) {
    if (bytes_transferred + msg_node->_cur_len 
        < msg_node->_total_len) {//数据没发送完
        _send_node->_cur_len += bytes_transferred;
        //继续发送,绑定回调函数
        this->_socket->async_write_some(asio::buffer(_send_node->_msg+_send_node->_cur_len,
            _send_node->_total_len-_send_node->_cur_len),
            std::bind(&Session::WriteCallBackErr,
                this, std::placeholders::_1, std::placeholders::_2, _send_node));
    }
}



//最大报文接收大小                        要发送的数据通过MsgNode发送(MsgNode接收要发送的数据)
const int RECVSIZE = 1024;
class  MsgNode {
public :
    MsgNode(const char* msg,  int total_len): _total_len(total_len), _cur_len(0){
        _msg = new char[total_len];
        memcpy(_msg, msg, total_len);
    }
    MsgNode(int total_len) :_total_len(total_len), _cur_len(0) {
        _msg = new char[total_len];
    }
    ~MsgNode(){
        delete[]_msg;
    }
    //消息首地址
    char* _msg;
    //总长度
    int _total_len;
    //当前长度
    int _cur_len;
};

要发送的总数据保存在MsgNode里,指针_send_node指向该类,通过服务端的socket调用async_write_some异步发送MsgNode类里的数据,调用bind绑定的类函数继续执行发送(本对象) std::placeholders::_1,std::placeholders::_1(保留执行回调函数的第一、二个参数),把存储数据的MsgNode指针传给WriteCallBackErr函数,std::size_t bytes_transferred为成功发送数据的数量,判断发送的数量与总数量的关系,没全部发送则继续socket调用async_write_some异步发送,继续bind()本函数(回调)

在WriteCallBackErr里使用shared_ptr 增加Msg存活时间(增加共享指针引用),writetosocket函数结束后,MsgNode对象会释放掉,会自动释放掉MsgNode,指针也失效

异步发送,对于多个要发送数据的线程,执行发送函数,发送顺序没保证,改为通过队列方式发送

对于每个要发送的消息,将其放到MsgNode队列里,通过_send_pending判断当前数据是否发送完

class Session{
public:
    void WriteCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred);
    void WriteToSocket(const std::string &buf);
private:
    std::queue<std::shared_ptr<MsgNode>> _send_queue;
    std::shared_ptr<asio::ip::tcp::socket> _socket;
    bool _send_pending;
};
void Session::WriteToSocket(const std::string& buf){
    _send_queue.emplace(new MsgNode(buf.c_str(), buf.length())); //将要发送的数据装进类对象插入发送队列,按顺序发送
    if (_send_pending) {//pending状态true说明上一次有未发送完的数据
        return;//return掉,没有继续发送,但是插入到了发送队列里,在发送完前面的后,从队列里取出继续发送
    }
    //异步发送数据,因为异步所以不会一下发送完
    this->_socket->async_write_some(asio::buffer(buf), std::bind(&Session::WriteCallBack, this, std::placeholders::_1, std::placeholders::_2));
    _send_pending = true;//没发送完,true,不发送新数据
}
void Session::WriteCallBack(const boost::system::error_code & ec,  std::size_t bytes_transferred){
    if (ec.value() != 0) {
        std::cout << "Error , code is " << ec.value() << " . Message is " << ec.message();
        return;
    }
    auto & send_data = _send_queue.front();//取出队首元素即当前未发送完数据继续发送
    send_data->_cur_len += bytes_transferred;
    //数据未发送完, 则继续发送
    if (send_data->_cur_len < send_data->_total_len) {
        this->_socket->async_write_some(asio::buffer(send_data->_msg + send_data->_cur_len, send_data->_total_len-send_data->_cur_len),
            std::bind(&Session::WriteCallBack,
            this, std::placeholders::_1, std::placeholders::_2));
        return;
    }
    //如果发送完,则pop出队首元素
    _send_queue.pop();
    //如果队列为空,则说明所有数据都发送完,将pending设置为false
    if (_send_queue.empty()) {
        _send_pending = false;
    }
    //如果队列不是空,则继续将队首元素发送(发送等待发送的数据)
    if (!_send_queue.empty()) {
        auto& send_data = _send_queue.front();
        this->_socket->async_write_some(asio::buffer(send_data->_msg + send_data->_cur_len, send_data->_total_len - send_data->_cur_len),
            std::bind(&Session::WriteCallBack,
                this, std::placeholders::_1, std::placeholders::_2));
    }
}

改为async_send()一次发送完数据(内部多次调用async_write_some)( 异步)

//不能与async_write_some混合使用
void Session::WriteAllToSocket(const std::string& buf) {
    //插入发送队列
    _send_queue.emplace(new MsgNode(buf.c_str(), buf.length()));
    //pending状态说明上一次有未发送完的数据
    if (_send_pending) {
        return;
    }
    //异步发送数据,因为异步所以不会一下发送完
    this->_socket->async_send(asio::buffer(buf), 
        std::bind(&Session::WriteAllCallBack, this,
            std::placeholders::_1, std::placeholders::_2));
    _send_pending = true;
}
void Session::WriteAllCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred){
    if (ec.value() != 0) {
        std::cout << "Error occured! Error code = "
            << ec.value()
            << ". Message: " << ec.message();
        return;
    }
    //如果发送完,则pop出队首元素
    _send_queue.pop();
    //如果队列为空,则说明所有数据都发送完,将pending设置为false
    if (_send_queue.empty()) {
        _send_pending = false;
    }
    //如果队列不是空,则继续将队首元素发送
    if (!_send_queue.empty()) {
        auto& send_data = _send_queue.front();
        this->_socket->async_send(asio::buffer(send_data->_msg + send_data->_cur_len, send_data->_total_len - send_data->_cur_len),
            std::bind(&Session::WriteAllCallBack,
                this, std::placeholders::_1, std::placeholders::_2));
    }
}
异步读(没读完,不接收新的读,在读中有新数据到达,丢弃)
class Session {
public:
    void ReadFromSocket();
    void ReadCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred);
private:
    std::shared_ptr<asio::ip::tcp::socket> _socket;
    std::shared_ptr<MsgNode> _recv_node;
    bool _recv_pending;
};
void Session::ReadFromSocket() {
    if (_recv_pending) {
        return;
    }
    //可以调用构造函数直接构造,但不可用已经构造好的智能指针赋值
    /*auto _recv_nodez = std::make_unique<MsgNode>(RECVSIZE);
    _recv_node = _recv_nodez;*/
    _recv_node = std::make_shared<MsgNode>(RECVSIZE);//初始化一个MsgNode对象存储读到的数据
    _socket->async_read_some(asio::buffer(_recv_node->_msg, _recv_node->_total_len), std::bind(&Session::ReadCallBack, this,std::placeholders::_1, std::placeholders::_2));
    _recv_pending = true;//正在读,置为true
}
void Session::ReadCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred){
    _recv_node->_cur_len += bytes_transferred;
    //没读完继续读
    if (_recv_node->_cur_len < _recv_node->_total_len) {
        _socket->async_read_some(asio::buffer(_recv_node->_msg+_recv_node->_cur_len,
            _recv_node->_total_len - _recv_node->_cur_len), std::bind(&Session::ReadCallBack, this,
            std::placeholders::_1, std::placeholders::_2));
        return;
    }
    //判断为已读完
    //将数据投递到队列里交给逻辑线程处理,此处略去
    //如果读完了则将标记置为false
    _recv_pending = false;
    //指针置空
    _recv_node = nullptr;    
}
不管读没读完,都调用bind()绑定的函数,在bind()函数里判断读没读完,读完则置为false;

上面可改为使用async_receive()

async_read_some和async_receive不能混合使用,否则会出现逻辑问题。

void Session::ReadAllFromSocket(const std::string& buf) {
    if (_recv_pending) {
        return;
    }
    //可以调用构造函数直接构造,但不可用已经构造好的智能指针赋值
    /*auto _recv_nodez = std::make_unique<MsgNode>(RECVSIZE);
    _recv_node = _recv_nodez;*/
    _recv_node = std::make_shared<MsgNode>(RECVSIZE);
    _socket->async_receive(asio::buffer(_recv_node->_msg, _recv_node->_total_len), std::bind(&Session::ReadAllCallBack, this,
        std::placeholders::_1, std::placeholders::_2));
    _recv_pending = true;
}
void Session::ReadAllCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred) {
    _recv_node->_cur_len += bytes_transferred;
    //将数据投递到队列里交给逻辑线程处理,此处略去
    //如果读完了则将标记置为false
    _recv_pending = false;
    //指针置空
    _recv_node = nullptr;
}
简单读写
session:
void Session::Start(){//服务器开启后,就监听客户端发来的消息
    memset(_data, 0, max_length);
    //读取客户端发送的消息,读到的信息到handle_read中处理(不用管能一次读到多少数据,读到多少后面发送多少)
    _socket.async_read_some(boost::asio::buffer(_data, max_length),
        std::bind(&Session::handle_read, this, placeholders::_1,
            placeholders::_2)
    );
}
void Session::handle_read(const boost::system::error_code& error, size_t bytes_transfered) {
    if (!error) {
        //读到多少数据就发送多少数据,async_write 发送后到handle_write处理,继续读数据
        cout << "server receive data is " << _data << endl;
        boost::asio::async_write(_socket, boost::asio::buffer(_data, bytes_transfered), 
            std::bind(&Session::handle_write, this, placeholders::_1));
    }
    else {
        delete this;
    }
}
void Session::handle_write(const boost::system::error_code& error) {
    if (!error) {
        memset(_data, 0, max_length);
        //继续读数据 ,读后又到handle_read中处理读到多少数据
        _socket.async_read_some(boost::asio::buffer(_data, max_length),
                                std::bind(&Session::handle_read,
            this, placeholders::_1, placeholders::_2));
    }
    else {
        delete this;
    }
}
利用C11模拟伪闭包实现连接的安全回收

主要通过智能指针增加引用的方式增加Session的生命周期,同时把session的指针 指针放到map里

同时为了使函数结束后指针不被释放,将session放入map中,保证session不被自动释放。

方便Server管理Session,因为我们后期会做一些重连踢人等业务逻辑,我们在Server类中添加成员变量,该变量为一个map类型,key为Session的uid,value为该Session的智能指针。

在map中,用pair<uuid,session>管理具体的指针

队列全双工
void CSession::Send(char* msg, int max_length) {//要发送消息
    bool pending = false;
    std::lock_guard<std::mutex> lock(_send_lock);
    if (_send_que.size() > 0) {//如果队列里有数据要发生,就将消息放到队列里,同时pending=true
        pending = true;
    }
    _send_que.push(make_shared<MsgNode>(msg, max_length));//将消息放到队列里
    if (pending) {
        return;
    }
    //原先队列里没有消息要发送,即发送消息
    boost::asio::async_write(_socket, boost::asio::buffer(msg, max_length), 
        std::bind(&CSession::HandleWrite, this, std::placeholders::_1, shared_from_this()));
}
处理沾包

处理粘包的方式主要采用应用层定义收发包格式的方式

在发送数据时,将要发送的数据格式化:

格式变为消息长度+消息内容的方式(整体存储的数据)(将实际要发送多少数据的记录也一同发送出去)

如要发送的消息10个字节,用2个字节存10(2qwertyuiop)

class MsgNode
{
    friend class CSession;
public:
    //发送的消息变为max_len + HEAD_LENGTH(实际要发送有效消息的长度+记录要发出去的有效消息的长度)
    MsgNode(char * msg, short max_len):_total_len(max_len + HEAD_LENGTH),_cur_len(0){
        _data = new char[_total_len+1]();
        memcpy(_data, &max_len, HEAD_LENGTH);//如要发送的消息10个字节,用2个字节存10
        memcpy(_data+ HEAD_LENGTH, msg, max_len);
        _data[_total_len] = '\0';
    }
    MsgNode(short max_len):_total_len(max_len),_cur_len(0) {
        _data = new char[_total_len +1]();
    }
    ~MsgNode() {
        delete[] _data;
    }
    void Clear() {
        ::memset(_data, 0, _total_len);
        _cur_len = 0;
    }
private:
    short _cur_len;
    short _total_len;
    char* _data;
};

max_len + HEAD_LENGTH HEAD_LENGTH为头部大小

_recv_msg_node用来存储接受的消息体信息
_recv_head_node用来存储接收的头部信息
_b_head_parse表示是否处理完头部信息

不全代码的理解:

void CSession::HandleRead(const boost::system::error_code& error, size_t  bytes_transferred, std::shared_ptr<CSession> shared_self){
    if (!error) {
//已经移动的字符数(意思是读到的总数据长度(bytes_transferred加上头部已读的多,在读到的数据中减去头部需要的,减去的部分即为copy_len))
        int copy_len = 0;
        while (bytes_transferred>0) {//读到的数据
            if (!_b_head_parse) {//头部数据长度还未读完
                //收到的数据又不足头部大小,把数据在头部尾部全复制给头部
                if (bytes_transferred + _recv_head_node->_cur_len < HEAD_LENGTH) {
                    memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data+ copy_len, bytes_transferred);
                   //更新头部已接收信息的长度
                    _recv_head_node->_cur_len += bytes_transferred;
                    ::memset(_data, 0, MAX_LENGTH);
                    //继续接收数据
                    _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), 
                        std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
                    return;//后面代码不执行
                }
                //头部还未读完,但收到的数据比头部多
                //头部剩余未复制的长度
                int head_remain = HEAD_LENGTH - _recv_head_node->_cur_len;//头部还需要读的数据长度
                memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data+copy_len, head_remain);
                //更新已处理的data长度和剩余未处理的长度
                copy_len += head_remain;//从读出的数据中已使用掉的长度
                bytes_transferred -= head_remain;
                //获取头部数据
                short data_len = 0;
                memcpy(&data_len, _recv_head_node->_data, HEAD_LENGTH);
                cout << "data_len is " << data_len << endl;
                //头部长度非法
                if (data_len > MAX_LENGTH) {
                    std::cout << "invalid data length is " << data_len << endl;
                    _server->ClearSession(_uuid);
                    return;
                }
                _recv_msg_node = make_shared<MsgNode>(data_len);//根据头部表示的有效数据长度初始化MsgNode长度
                //消息的长度小于头部规定的长度,说明数据未收全,则先将部分消息放到接收节点里
                if (bytes_transferred < data_len) {//剩余读取到的数据长度
                    memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
                    _recv_msg_node->_cur_len += bytes_transferred;
                    ::memset(_data, 0, MAX_LENGTH);
                    //继续读完成实际内容的读取
                    _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), 
                        std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
                    //头部处理完成
                    _b_head_parse = true;//该消息头部已完成
                    return;
                }
                消息的长度大于头部规定的长度
                memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, data_len);
                _recv_msg_node->_cur_len += data_len;
                copy_len += data_len;
                bytes_transferred -= data_len;
                _recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';
                cout << "receive data is " << _recv_msg_node->_data << endl;
                //此处可以调用Send发送测试
                Send(_recv_msg_node->_data, _recv_msg_node->_total_len);
                //继续轮询剩余未处理数据
                _b_head_parse = false;//新的数据其头部置为false;
                _recv_head_node->Clear(); //void Clear() ::memset(_data, 0, _total_len);_cur_len = 0;}
                if (bytes_transferred <= 0) {
                    ::memset(_data, 0, MAX_LENGTH);
                    _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), 
                        std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
                    return;
                }
                //继续while() 到新数据的头部那继续
                continue;
            }
            //已经处理完头部,处理上次未接受完的消息数据
            //接收的数据仍不足剩余未处理的
            int remain_msg = _recv_msg_node->_total_len - _recv_msg_node->_cur_len;
            if (bytes_transferred < remain_msg) {
                memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
                _recv_msg_node->_cur_len += bytes_transferred;
                ::memset(_data, 0, MAX_LENGTH);
                _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), 
                    std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
                return;
            }
            memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, remain_msg);
            _recv_msg_node->_cur_len += remain_msg;
            bytes_transferred -= remain_msg;
            copy_len += remain_msg;
            _recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';
            cout << "receive data is " << _recv_msg_node->_data << endl;
            //此处可以调用Send发送测试
            Send(_recv_msg_node->_data, _recv_msg_node->_total_len);
            //继续轮询剩余未处理数据
            _b_head_parse = false;
            _recv_head_node->Clear();
            if (bytes_transferred <= 0) {
                ::memset(_data, 0, MAX_LENGTH);
                _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
                    std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
                return;
            }
            continue;
        }
    }
    else {
        std::cout << "handle read failed, error is " << error.what() << endl;
        Close();
        _server->ClearSession(_uuid);
    }
}

客户端修改

客户端的发送也要遵循先发送数据2个字节的数据长度,再发送数据消息的结构。
接收时也是先接收两个字节数据获取数据长度,再根据长度接收消息。

字节序
protobuf

............