基于boost的聊天服务器

发布时间 2023-07-27 16:05:14作者: 白伟碧一些小心得

聊天服务器的设计,主要包括消息结构的设计,这个相对于网络更具有一样,通常避免粘包,都会设计成TLV的格式:

消息的头部Header主要包括消息的类型和消息体的长度,通过这样设计的原理是为了避免粘包时可以方便去解析数据

消息主要包括:bind name (client send)

                          chat info   (client send)

                          room info (服务器分发)

设计如下:

structHeader.h

#ifndef FND_STRUCT_HEADER_H
#define FND_STRUCT_HEADER_H
#include <string>
struct Header {
    int bodySize;
    int type;
};

enum MessageType {
    MT_BIND_NAME = 1, // {"name" : "abc"}
    MT_CHAT_INFO = 2, // {"information" "what i say"}
    MT_ROOM_INFO = 3, // {"name" : "abc", "information" : "what i say"}
};

// client send
struct BindName {
    char name[32];
    int nameLen;
};

// client send
struct ChatInformation {
    char information[256];
    int infoLen;
};


// serversend
struct RoomInformation {
    BindName name;
    ChatInformation chat;
};

bool parseMessage(const std::string &input, int *type, std::string &outbuffer);
bool parseMessage2(const std::string &input, int *type, std::string &outbuffer);
bool parseMessage3(const std::string &input, int *type, std::string &outbuffer);
bool parseMessage4(const std::string &input, int *type, std::string &outbuffer);
#endif

其中消息的传输类型主要包含原生的结构体,序列化(boost::serialization), json,protobuf等,

structHeader.cpp(实现)

#include "structHeader.h"
#include "SerilizationObject.h"
#include "JsonObject.h"
#include "Protocal.pb.h"
#include <cstdlib>
#include <cstring>
#include <iostream>

//boost serialization 序列化成字符串 template <typename T> std::string seriliaze(const T &obj) { std::stringstream ss; boost::archive::text_oarchive oa(ss); oa & obj; return ss.str(); } //通过protobuf的方式去序列化 bool parseMessage4(const std::string &input, int *type, std::string &outbuffer) { auto pos = input.find_first_of(" "); if (pos == std::string::npos) return false; if (pos == 0) return false; // "BindName ok" -> substr -> BindName auto command = input.substr(0, pos); if (command == "BindName") { // we try to bind name std::string name = input.substr(pos + 1); if (name.size() > 32) return false; if (type) *type = MT_BIND_NAME; PBindName bindName; bindName.set_name(name); //auto oldname = bindName.name(); auto ok = bindName.SerializeToString(&outbuffer); return ok; } else if (command == "Chat") { // we try to chat std::string chat = input.substr(pos + 1); if (chat.size() > 256) return false; PChat pchat; pchat.set_information(chat); auto ok = pchat.SerializeToString(&outbuffer); if (type) *type = MT_CHAT_INFO; return ok; } return false; } //通过json的方式序列化 bool parseMessage3(const std::string &input, int *type, std::string &outbuffer) { auto pos = input.find_first_of(" "); if (pos == std::string::npos) return false; if (pos == 0) return false; // "BindName ok" -> substr -> BindName auto command = input.substr(0, pos); if (command == "BindName") { // we try to bind name std::string name = input.substr(pos + 1); if (name.size() > 32) return false; if (type) *type = MT_BIND_NAME; ptree tree; tree.put("name", name); outbuffer = ptreeToJsonString(tree); //outbuffer = seriliaze(SBindName(std::move(name))); return true; } else if (command == "Chat") { // we try to chat std::string chat = input.substr(pos + 1); if (chat.size() > 256) return false; ptree tree; tree.put("information", chat); outbuffer = ptreeToJsonString(tree); //outbuffer = seriliaze(SChatInfo(std::move(chat))); if (type) *type = MT_CHAT_INFO; return true; } return false; } //boost serilize通过boost 进行序列化 bool parseMessage2(const std::string &input, int *type, std::string &outbuffer) { auto pos = input.find_first_of(" "); if (pos == std::string::npos) return false; if (pos == 0) return false; // "BindName ok" -> substr -> BindName auto command = input.substr(0, pos); if (command == "BindName") { // we try to bind name std::string name = input.substr(pos + 1); if (name.size() > 32) return false; if (type) *type = MT_BIND_NAME; //SBindName bindInfo(std::move(name)); outbuffer = seriliaze(SBindName(std::move(name))); return true; } else if (command == "Chat") { // we try to chat std::string chat = input.substr(pos + 1); if (chat.size() > 256) return false; outbuffer = seriliaze(SChatInfo(std::move(chat))); // ChatInformation info; // info.infoLen = chat.size(); // std::memcpy(&(info.information), chat.data(), chat.size()); // auto buffer = reinterpret_cast<const char *>(&info); // outbuffer.assign(buffer, buffer + sizeof(info)); if (type) *type = MT_CHAT_INFO; return true; } return false; } // 原生的消息处理方式 cmd messagebody bool parseMessage(const std::string &input, int *type, std::string &outbuffer) { // input should be cmd body auto pos = input.find_first_of(" "); if (pos == std::string::npos) return false; if (pos == 0) return false; // "BindName ok" -> substr -> BindName auto command = input.substr(0, pos); if (command == "BindName") { // we try to bind name std::string name = input.substr(pos + 1); if (name.size() > 32) return false; if (type) *type = MT_BIND_NAME; BindName bindInfo; bindInfo.nameLen = name.size(); std::memcpy(&(bindInfo.name), name.data(), name.size()); auto buffer = reinterpret_cast<const char *>(&bindInfo); outbuffer.assign(buffer, buffer + sizeof(bindInfo)); return true; } else if (command == "Chat") { // we try to chat std::string chat = input.substr(pos + 1); if (chat.size() > 256) return false; ChatInformation info; info.infoLen = chat.size(); std::memcpy(&(info.information), chat.data(), chat.size()); auto buffer = reinterpret_cast<const char *>(&info); outbuffer.assign(buffer, buffer + sizeof(info)); if (type) *type = MT_CHAT_INFO; return true; } return false; }

SerilizationObject.h(boost serilize的方式序列化)

#ifndef FND_SERI_H
#define FND_SERI_H
#include <boost/archive/text_oarchive.hpp>
#include <boost/archive/text_iarchive.hpp>
class SBindName {
private:
  friend class boost::serialization::access;
  // When the class Archive corresponds to an output archive, the
  // & operator is defined similar to <<.  Likewise, when the class Archive
  // is a type of input archive the & operator is defined similar to >>.
  template <class Archive>
  void serialize(Archive &ar, const unsigned int version) {
    ar & m_bindName;
        //ar << m_bindName;
        //ar >> m_bindName;
  }
  std::string m_bindName;

public:
  SBindName(std::string name) : m_bindName(std::move(name)) {}
    SBindName() {}
  const std::string &bindName() const { return m_bindName; }
};

class SChatInfo {
  friend class boost::serialization::access;
  // When the class Archive corresponds to an output archive, the
  // & operator is defined similar to <<.  Likewise, when the class Archive
  // is a type of input archive the & operator is defined similar to >>.
  template <class Archive>
  void serialize(Archive &ar, const unsigned int version) {
    ar &m_chatInformation;
  }
  std::string m_chatInformation;

public:
  SChatInfo(std::string info) : m_chatInformation(std::move(info)) {}
    SChatInfo() {}
  const std::string &chatInformation() const { return m_chatInformation; }
};

class SRoomInfo {
public:
    SRoomInfo() {}
  SRoomInfo(std::string name, std::string info)
      : m_bind(std::move(name)), m_chat(std::move(info)) {}
    const std::string& name() const { return m_bind.bindName();}
    const std::string& information() const { return m_chat.chatInformation();}
private:

  friend class boost::serialization::access;
  template <class Archive>
  void serialize(Archive &ar, const unsigned int version) {
        ar &m_bind;
        ar & m_chat;
  }
    SBindName m_bind;
    SChatInfo m_chat;

};

#endif

通过json的方式处理消息

JsonObject.h(将json转换为string)

#ifndef FND_JSON_OBJECT_H
#define FND_JSON_OBJECT_H
#include <sstream>
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <string>
using ptree = boost::property_tree::ptree;
inline std::string ptreeToJsonString(const ptree& tree) {
    std::stringstream ss;
    boost::property_tree::write_json(ss, tree, false);
    return ss.str();
}
#endif

通过protobuf处理,其中proto文件如下,可以通过proto文件去生成c++代码

syntax = 'proto3';
message PBindName {
    string name = 1;
}
message PChat {
    string information = 1;
}
message PRoomInformation {
    string name = 1;
    string information = 2;
}

 

chat_message.h(主要包含消息头(type+bodysize) 和消息体(body)的解析)

#ifndef CHAT_MESSAGE_HPP
#define CHAT_MESSAGE_HPP

#include "structHeader.h"
#include "SerilizationObject.h"

#include <iostream>

#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <cassert>
// s -> c   c -> s message { header, body } // header length

class chat_message {
public:
  enum { header_length = sizeof(Header) };
  enum { max_body_length = 512 };

  chat_message(){}

  const char *data() const { return data_; }

  char *data() { return data_; }

  std::size_t length() const { return header_length + m_header.bodySize; }

  const char *body() const { return data_ + header_length; }

  char *body() { return data_ + header_length; }
  int type() const { return m_header.type; }

  std::size_t body_length() const { return m_header.bodySize; }
  void setMessage(int messageType, const void *buffer, size_t bufferSize) {
    assert(bufferSize <= max_body_length);
        m_header.bodySize = bufferSize;
        m_header.type = messageType;
        std::memcpy(body(), buffer, bufferSize);
        std::memcpy(data(), &m_header, sizeof(m_header));
  }
    void setMessage(int messageType, const std::string& buffer) {
        setMessage(messageType, buffer.data(), buffer.size());
    }

  bool decode_header() {
    std::memcpy(&m_header, data(), header_length);
    if (m_header.bodySize > max_body_length) {
      std::cout << "body size " << m_header.bodySize << " " << m_header.type
                << std::endl;
      return false;
        }
    return true;
  }

private:
  char data_[header_length + max_body_length];
    Header m_header;
};

#endif // CHAT_MESSAGE_HPP

其中server逻辑如下

server.cpp(其实这里可以优化,读一般会使用async_read_some(读一部分),写才使用,async_write(一定会写完指定的整个大小,底层会多次调用async_write_some))

 

#include "chat_message.h"
#include "JsonObject.h"
#include "Protocal.pb.h"

#include <boost/asio.hpp>

#include <deque>
#include <iostream>
#include <list>
#include <memory>
#include <set>
#include <utility>

#include <cstdlib>

using boost::asio::ip::tcp;

//----------------------------------------------------------------------

using chat_message_queue = std::deque<chat_message>;
using chat_message_queue2 = std::list<chat_message>;
//----------------------------------------------------------------------


//----------------------------------------------------------------------

class chat_session;
using chat_session_ptr = std::shared_ptr<chat_session>;
class chat_room {
public:
public:
    void join(chat_session_ptr);
    void leave(chat_session_ptr);
    void deliver(const chat_message&);
private:
  std::set<chat_session_ptr> participants_;
  enum { max_recent_msgs = 100 };
  chat_message_queue recent_msgs_;
};
//----------------------------------------------------------------------

class chat_session : public std::enable_shared_from_this<chat_session> {
public:
  chat_session(tcp::socket socket, chat_room &room)
      : socket_(std::move(socket)), room_(room) {}

  void start() {
    room_.join(shared_from_this());
    do_read_header();
  }

  void deliver(const chat_message &msg) {
        // first false,避免多次调用写,因为写会一直循环从队列里面拿消息,一般都是将消息放到队列中处理,一次写完,包括回调全部完成,才去取消息,进行下一次写(如果混在一起会导致写回调和下一次写混在一起导致混乱)
    bool write_in_progress = !write_msgs_.empty();
    write_msgs_.push_back(msg);
    if (!write_in_progress) {
            // first
      do_write();
    }
  }

private:
  void do_read_header() {
    auto self(shared_from_this());
    boost::asio::async_read(
        socket_,
        boost::asio::buffer(read_msg_.data(), chat_message::header_length),
        [this, self](boost::system::error_code ec, std::size_t /*length*/) {
          if (!ec && read_msg_.decode_header()) {
            do_read_body();
          } else {
            std::cout << "Player leave the room\n";
            room_.leave(shared_from_this());
          }
        });
  }

  void do_read_body() {
    auto self(shared_from_this());
    boost::asio::async_read(
        socket_, boost::asio::buffer(read_msg_.body(), read_msg_.body_length()),
        [this, self](boost::system::error_code ec, std::size_t /*length*/) {
          if (!ec) {
            //room_.deliver(read_msg_);
                        handleMessage();
            do_read_header();
          } else {
            room_.leave(shared_from_this());
          }
        });
  }

  template <typename T> T toObject() {
    T obj;
    std::stringstream ss(std::string(
        read_msg_.body(), read_msg_.body() + read_msg_.body_length()));
    boost::archive::text_iarchive oa(ss);
    oa &obj;
    return obj;
  }

  bool fillProtobuf(::google::protobuf::Message* msg) {
    std::string ss(
        read_msg_.body(), read_msg_.body() + read_msg_.body_length());
        auto ok = msg->ParseFromString(ss);
    return ok;
  }

    ptree toPtree() {
        ptree obj;
        std::stringstream ss(
                std::string(read_msg_.body(),
                    read_msg_.body() + read_msg_.body_length()));
        boost::property_tree::read_json(ss, obj);
        return obj;
    }

  void handleMessage() {
    if (read_msg_.type() == MT_BIND_NAME) {
            PBindName bindName;
            if(fillProtobuf(&bindName))
                m_name = bindName.name();
    } else if (read_msg_.type() == MT_CHAT_INFO) {
            PChat chat;
            if(!fillProtobuf(&chat)) return;
            m_chatInformation = chat.information();

      auto rinfo = buildRoomInfo();
      chat_message msg;
      msg.setMessage(MT_ROOM_INFO, rinfo);
      room_.deliver(msg);

    } else {
      // not valid msg do nothing
    }
  }

  void do_write() {
    auto self(shared_from_this());
    boost::asio::async_write(
        socket_, boost::asio::buffer(write_msgs_.front().data(),
                                     write_msgs_.front().length()),
        [this, self](boost::system::error_code ec, std::size_t /*length*/) {
          if (!ec) {
            write_msgs_.pop_front();
            if (!write_msgs_.empty()) {
              do_write();
            }
          } else {
            room_.leave(shared_from_this());
          }
        });
  }

  tcp::socket socket_;
  chat_room &room_;
  chat_message read_msg_;
  chat_message_queue write_msgs_;
    std::string m_name;
    std::string m_chatInformation;
    std::string buildRoomInfo() const {
        PRoomInformation roomInfo;
        roomInfo.set_name(m_name);
        roomInfo.set_information(m_chatInformation);
        std::string out;
        auto ok = roomInfo.SerializeToString(&out);
        assert(ok);
        return out;
    }
//    RoomInformation buildRoomInfo() const {
//        RoomInformation info;
//        info.name.nameLen = m_name.size();
//        std::memcpy(info.name.name, m_name.data(), m_name.size());
//        info.chat.infoLen = m_chatInformation.size();
//        std::memcpy(info.chat.information, m_chatInformation.data(),
//                m_chatInformation.size());
//        return info;
//    }
};


  void chat_room::join(chat_session_ptr participant) {
    participants_.insert(participant);
    for (const auto& msg : recent_msgs_)
      participant->deliver(msg);
  }

  void chat_room::leave(chat_session_ptr participant) {
    participants_.erase(participant);
  }

  void chat_room::deliver(const chat_message &msg) {
    recent_msgs_.push_back(msg);
    while (recent_msgs_.size() > max_recent_msgs)
      recent_msgs_.pop_front();

    for (auto& participant : participants_)
      participant->deliver(msg);
  }


//----------------------------------------------------------------------

class chat_server {
public:
  chat_server(boost::asio::io_service &io_service,
              const tcp::endpoint &endpoint)
      : acceptor_(io_service, endpoint), socket_(io_service) {
    do_accept();
  }

private:
  void do_accept() {
    acceptor_.async_accept(socket_, [this](boost::system::error_code ec) {
      if (!ec) {
        auto session =
            std::make_shared<chat_session>(std::move(socket_), room_);
        session->start();
      }

      do_accept();
    });
  }

  tcp::acceptor acceptor_;
  tcp::socket socket_;
  chat_room room_;
};

//----------------------------------------------------------------------

int main(int argc, char *argv[]) {
  try {
        GOOGLE_PROTOBUF_VERIFY_VERSION;
    if (argc < 2) {
      std::cerr << "Usage: chat_server <port> [<port> ...]\n";
      return 1;
    }

    boost::asio::io_service io_service;

    std::list<chat_server> servers;
    for (int i = 1; i < argc; ++i) {
      tcp::endpoint endpoint(tcp::v4(), std::atoi(argv[i]));
      servers.emplace_back(io_service, endpoint);
    }

    io_service.run();
  } catch (std::exception &e) {
    std::cerr << "Exception: " << e.what() << "\n";
  }

    google::protobuf::ShutdownProtobufLibrary();
  return 0;
}

其中client.cpp如下

 

#include "chat_message.h"
#include "structHeader.h"
#include "JsonObject.h"
#include "SerilizationObject.h"
#include "Protocal.pb.h"

#include <boost/asio.hpp>

#include <deque>
#include <iostream>
#include <thread>

#include <cstdlib>
#include <cassert>

using boost::asio::ip::tcp;

using chat_message_queue = std::deque<chat_message>;


class chat_client {
public:
  chat_client(boost::asio::io_service &io_service,
              tcp::resolver::iterator endpoint_iterator)
      : io_service_(io_service), socket_(io_service) {
    do_connect(endpoint_iterator);
  }

  void write(const chat_message &msg) {
    io_service_.post([this, msg]() {
      bool write_in_progress = !write_msgs_.empty();
      write_msgs_.push_back(msg);
      if (!write_in_progress) {
        do_write();
      }
    });
  }

  void close() {
    io_service_.post([this]() { socket_.close(); });
  }

private:
  void do_connect(tcp::resolver::iterator endpoint_iterator) {
    boost::asio::async_connect(
        socket_, endpoint_iterator,
        [this](boost::system::error_code ec, tcp::resolver::iterator) {
          if (!ec) {
            do_read_header();
          }
        });
  }

  void do_read_header() {
    boost::asio::async_read(
        socket_,
        boost::asio::buffer(read_msg_.data(), chat_message::header_length),
        [this](boost::system::error_code ec, std::size_t /*length*/) {
          if (!ec && read_msg_.decode_header()) {
            do_read_body();
          } else {
            socket_.close();
          }
        });
  }

  void do_read_body() {
    boost::asio::async_read(
        socket_, boost::asio::buffer(read_msg_.body(), read_msg_.body_length()),
        [this](boost::system::error_code ec, std::size_t /*length*/) {
          if (!ec) {
            if (read_msg_.type() == MT_ROOM_INFO) {
              //SRoomInfo info;
                        std::string buffer(read_msg_.body(),
                              read_msg_.body() + read_msg_.body_length());

                        PRoomInformation roomInfo;
                        auto ok = roomInfo.ParseFromString(buffer);
                        //if(!ok) throw std::runtime_error("not valid message");
              //std::stringstream ss(buffer);
                            //ptree tree;
                            //boost::property_tree::read_json(ss, tree);
                        if (ok) {
                        std::cout << "client: '";
                        std::cout << roomInfo.name();
                        std::cout << "' says '";
                        std::cout
                        << roomInfo.information();
                        std::cout << "'\n";
                        }
//              boost::archive::text_iarchive ia(ss);
//              ia & info;
//              std::cout << "client: '";
//              std::cout << info.name();
//              std::cout << "' says '";
//              std::cout << info.information();
//              std::cout << "'\n";
            }
            do_read_header();
          } else {
            socket_.close();
          }
        });
  }

  void do_write() {
    boost::asio::async_write(
        socket_, boost::asio::buffer(write_msgs_.front().data(),
                                     write_msgs_.front().length()),
        [this](boost::system::error_code ec, std::size_t /*length*/) {
          if (!ec) {
            write_msgs_.pop_front();
            if (!write_msgs_.empty()) {
              do_write();
            }
          } else {
            socket_.close();
          }
        });
  }

private:
  boost::asio::io_service &io_service_;
  tcp::socket socket_;
  chat_message read_msg_;
  chat_message_queue write_msgs_;
};

int main(int argc, char *argv[]) {
  try {
        GOOGLE_PROTOBUF_VERIFY_VERSION;
    if (argc != 3) {
      std::cerr << "Usage: chat_client <host> <port>\n";
      return 1;
    }

    boost::asio::io_service io_service;

    tcp::resolver resolver(io_service);
    auto endpoint_iterator = resolver.resolve({argv[1], argv[2]});
    chat_client c(io_service, endpoint_iterator);
    //将io_service放入单独的线程中,如果外部主线程需使用io_service处理异步读写消息,需要使用io_service.post
    std::thread t([&io_service]() { io_service.run(); });

    char line[chat_message::max_body_length + 1];
        // ctrl-d
    while (std::cin.getline(line, chat_message::max_body_length + 1)) {
      chat_message msg;
            auto type = 0;
            std::string input(line, line + std::strlen(line));
            std::string output;
            if(parseMessage4(input, &type, output)) {
                msg.setMessage(type, output.data(), output.size());
                c.write(msg);//外部线程需要使用io_service,需要post放入io_service处理队列进行分发处理
                std::cout << "write message for server " << output.size() << std::endl;
            }
    }

    c.close();//需要post
    t.join();
  } catch (std::exception &e) {
    std::cerr << "Exception: " << e.what() << "\n";
  }

    google::protobuf::ShutdownProtobufLibrary();
  return 0;
}