08、从0到1实现SECS协议之HSMS协议中的connection

发布时间 2023-08-03 18:50:40作者: 画个一样的我

前面实现优先级队列,现在就准备开始实现 HSMS 协议中如何处理 connection。在之前定义的接口中,我们定义了 IConnection 接口,现在我们先来实现这个接口。我们知道, 此软件 既可以作为 Host 端,主动去连接 equipment,也是可以作为 equipment 端,让 其他系统来连接我们的系统。因此,connection 是分为 两种类型的。

但是他们在实现 IConnection 接口 时,有部分内容是相同的,因此我们先实现相同的部分,不相同的部分我们后面在实现。

1、相同的部分

package connection

import (
	"secs-gem/hsms/packet"
	"secs-gem/hsms/queue"
	driver_log "secs-gem/log"
	"secs-gem/secsgem"

	"bytes"
	"context"
	"encoding/binary"
	"errors"
	"fmt"
	"go.uber.org/zap"
	"golang.org/x/sync/errgroup"
	"net"
	"runtime/debug"
	"strings"
	"sync"
	"time"
)

const (
	SendBlockSize int = 1024 * 1024 // 每次发送数据的最大字节数
)

/*-----------------------------------

	连接

-----------------------------------*/

func NewConnection(cfg *secsgem.ConnectionCfg, delegate secsgem.IDelegate) secsgem.IConnection {
	var sc secsgem.IConnection
	if cfg.Active {
		sc = &HsmsActiveConnection{
			HsmsConnection: newHsmsConnection(cfg, delegate),
		}
	} else {
		sc = &HsmsPassiveConnection{
			HsmsConnection: newHsmsConnection(cfg, delegate),
		}
	}
	return sc
}

func newHsmsConnection(cfg *secsgem.ConnectionCfg, delegate secsgem.IDelegate) *HsmsConnection {
	sc := &HsmsConnection{
		view:      &secsgem.ConnView{},
		delegate:  delegate,
		recvChan:  make(chan *packets.HsmsPacket, 1024),
		connected: false,
		log:       zap.S(),
	}
	//设置cfg
	sc.SetConfig(cfg)
	return sc
}

type HsmsConnection struct {
	ctx       context.Context
	ctxCancel context.CancelFunc
	config    *secsgem.ConnectionCfg
	view      *secsgem.ConnView

	// 建立连接/断开连接的处理过程
	delegate secsgem.IDelegate
	sock     *net.TCPConn
	status   secsgem.Status

	// 缓存
	receiveBuffer bytes.Buffer

	// 接收chan
	recvChan chan *packets.HsmsPacket
	// 发送queue
	sendQueue *queue.PriorityQueue
	// 是否已连接
	connected bool

	//日志
	log   *zap.SugaredLogger
	mx    sync.RWMutex
	group errgroup.Group
}

func (sc *HsmsConnection) GetConfig() *secsgem.ConnectionCfg {
	return sc.config
}

func (sc *HsmsConnection) SetConfig(cfg *secsgem.ConnectionCfg) {
	if sc.view.Enabled.Load() {
		//已启用不能修改
		return
	}
	sc.mx.Lock()
	defer sc.mx.Unlock()

	cfg.ReviseDuration() // 修正精度
	sc.config = cfg
	// 往日志中添加 port、active 字段
	// 后续使用 sc.log 输出的日志都将拥有 With 后面的内容
	sc.log = driver_log.S().With("port", cfg.GetPort(), "active", cfg.Active)
}

func (sc *HsmsConnection) disable() error {
	if sc.ctx != nil && sc.ctx.Err() == nil {
		sc.ctxCancel()
	}
	//关闭连接
	if sc.sock != nil {
		sc.sock.Close()
	}

	// 等待 errgroup 启动的 goroutine 的协程退出
	sc.group.Wait()
	return nil
}

func (sc *HsmsConnection) GetCtx() context.Context {
	return sc.ctx
}

// SendPacket 发送 packet
func (sc *HsmsConnection) SendPacket(packet interface{}, priority int) error {
	pkt, ok := packet.(*packets.HsmsPacket)
	if !ok {
		return fmt.Errorf("packet type error %s", pkt)
	}
	if !sc.connected {
		return fmt.Errorf("disconnected")
	}
	priority |= secsgem.PriorityNORMAL

	//放入queue
	systemId := pkt.Header.System
	sc.log.Debugf("[system=0x%x] into send_queue", systemId)

	sc.sendQueue.PutNoWait(&queue.QueueItem{
		Value:    pkt,
		Priority: priority,
		JoinTime: time.Now(),
	})
	return nil
}

// 发送
func (sc *HsmsConnection) _sendPacket(sock *net.TCPConn, pkt *packets.HsmsPacket) error {

	systemId := pkt.Header.System
	data := pkt.Encode()

	length := len(data)

	sc.log.Debugf("[system=0x%x] send packet %s", systemId, pkt)

	for i := 0; i < length; i += SendBlockSize {
		start := i
		end := i + SendBlockSize
		if end > length {
			end = length
		}
		block := data[start:end]
		retryCount := 5
		retry := true
		for retry && retryCount > 0 {
			retryCount -= 1

			sc.log.Debugf("[system=0x%x][send] %v", systemId, block)

			//secsgem.MetricHsmsSendTotal.Add(float64(len(block)))

			_, err := sock.Write(block)
			if err != nil {
				errMsg := err.Error()
				sc.log.Warn("socket conn send data failed, err: ", errMsg)
				if strings.Contains(errMsg, "use of closed network connection") {
					return err
				}
				continue
			}
			retry = false
		}
	}
	return nil
}

// startReceiverAndSender 开启接受者和发送者
func (sc *HsmsConnection) startReceiverAndSender() {
	sc.connected = true
	//工作context
	_ctx, _ctxCancel := context.WithCancel(sc.ctx)

	//接收线程
	sc.group.Go(func() error {
		defer func() {
			if e := recover(); e != nil {
				sc.log.Errorf("[panic]%v\n%s", e, debug.Stack())
			}
		}()
		sc.receiverLoop(_ctx, _ctxCancel)
		return nil
	})

	//发送线程
	sc.group.Go(func() error {
		defer func() {
			if e := recover(); e != nil {
				sc.log.Errorf("[panic]%v\n%s", e, debug.Stack())
			}
		}()
		sc.senderLoop(_ctx)
		return nil
	})

	//接收消息处理
	sc.group.Go(func() error {
		sc.handleRevMessageLoop(_ctx)
		return nil
	})

	//连接
	if sc.delegate != nil {
		sc.delegate.OnConnectionEstablished()
	}
}

func (sc *HsmsConnection) receiverLoop(ctx context.Context, cancel context.CancelFunc) {
	defer func() {
		sc.view.IsReceiving = false
	}()

	sc.view.IsReceiving = true
	if err := sc._receiverReadData(ctx); err != nil {
		sc.log.Warnf("receiver error: %s", err)
	}

	//关闭连接
	sc.sock.Close()
	//ctx.
	cancel()

	//通知关闭连接
	if sc.delegate != nil {
		sc.delegate.OnConnectionClosed()
	}
}

//发送线程
func (sc *HsmsConnection) senderLoop(ctx context.Context) {
	defer func() {
		sc.view.IsSending = false
	}()
	sc.view.IsSending = true

	sock := sc.sock

	for ctx.Err() != context.Canceled {
		item := sc.sendQueue.Get(ctx, time.Second*5)
		if item != nil {
			pkt := item.Value
			systemId := pkt.Header.System
			// 检测超时
			waitTime := time.Now().Sub(item.JoinTime)
			if waitTime > sc.config.T3 {
				sc.log.Debugf("[system=0x%x] send_queue overtime wait_time=%s ", systemId, waitTime)
				continue
			}
			sc.log.Infof("[system=0x%x] send message: %s", systemId, pkt)
			err := sc._sendPacket(sock, pkt)
			if err != nil {
				sc.log.Warnf("[system=0x%x] %s", systemId, err)
			}
		}
	}
	sc.log.Debug("sender stopped")
}

func (sc *HsmsConnection) _receiverReadData(ctx context.Context) error {
	var buff = make([]byte, 10240)
	for ctx.Err() == nil {
		n, err := sc.sock.Read(buff)
		if err != nil {
			sc.connected = false
			return err
		}
		//缓存
		data := buff[:n]
		sc.log.Debugf("[recv] %v", data)

		//secsgem.MetricHsmsReadTotal.Add(float64(len(data)))

		sc.receiveBuffer.Write(data)

		//处理
		for sc._processReceiveBuffer() && ctx.Err() == nil {
		}
	}
	return errors.New("receiver stopped")
}

//处理数据包
func (sc *HsmsConnection) _processReceiveBuffer() bool {
	// hsms 协议规定的前四个字节为: Message Length, 因此收到的字节数不能少于 4 个
	// 这里有一个问题, 假设 tcp 的包被拆分后, 后面几个字节恰好在下一个包中, 那下一个包就会被丢弃??
	// 其实不会的, 因为这里返回 false , 在调用此函数的地方, 是一个 for 循环, 而接收到的数据是存放在 bytes.Buffer 中的
	if sc.receiveBuffer.Len() < 4 {
		return false
	}

	//长度, 这里加上 4 的原因是加上了 Message Length 后, 方便后面的计算而已
	length := binary.BigEndian.Uint32(sc.receiveBuffer.Bytes()[:4]) + 4

	if sc.receiveBuffer.Len() < int(length) { // 未接受完数据, 继续接受
		return false
	}
	data := make([]byte, int(length))
	// 从 bytes.Buffer 中读取 length 的数据, 这里一定确保了有 length 的长度的数据
	sc.receiveBuffer.Read(data)

	//解析, 所以 decode 函数里面, 会先把前 4 个字节去掉
	response := packets.Decode(data)
	sc.log.Infof("[system=0x%x] recv message: %s", response.Header.System, response)

	//放入chan
	sc.recvChan <- response

	//返回, 如果还有剩余的数据, 继续接受处理
	hasData := sc.receiveBuffer.Len() > 0
	return hasData
}

//接收消息处理
func (sc *HsmsConnection) handleRevMessageLoop(ctx context.Context) {
	defer func() {
		sc.view.IsHanding = false
	}()

	sc.view.IsHanding = true
	for ctx.Err() == nil {
		select {
		case message := <-sc.recvChan: // _processReceiveBuffer 函数中接受数据, 将接收到的数据写入到 sc.recvChan 中
			if sc.delegate == nil {
				continue
			}

			go func() {
				defer func() {
					if e := recover(); e != nil {
						sc.log.Errorf("[panic] recv handler %v\n%s", e, debug.Stack())
					}
				}()
				sc.delegate.OnConnectionPacketReceived(nil, message)
			}()
		case <-ctx.Done():
			sc.log.Debug("recv message handle stopped")
			return
		}
	}
}

/*-----------------------------------

	multi-server

-----------------------------------*/

type HsmsMultiPassiveServer struct {
}

/*-------------------------------------
		状态
-------------------------------------*/

func (sc *HsmsConnection) Status() secsgem.Status {
	sc.mx.RLock()
	defer sc.mx.RUnlock()
	return sc.status
}

func (sc *HsmsConnection) isClosed() bool {
	return sc.status == secsgem.CLOSED
}

func (sc *HsmsConnection) isConnected() bool {
	return sc.status == secsgem.CONNECTED
}

func (sc *HsmsConnection) isDisconnected() bool {
	return sc.status == secsgem.DISCONNECTED
}

func (sc *HsmsConnection) isConnecting() bool {
	return sc.status == secsgem.CONNECTING
}

func (sc *HsmsConnection) isReconnecting() bool {
	return sc.status == secsgem.RECONNECTING
}

2、host mode

此时配置中的 active=true

package connection

import (
	"secs-gem/hsms/queue"

	"context"
	"fmt"
	"net"
	"time"
)

/*-----------------------------------

	active (host mode)

-----------------------------------*/

type HsmsActiveConnection struct {
	*HsmsConnection
	hTimer *time.Timer
}

func (hac *HsmsActiveConnection) Enable() error {
	if hac.view.Enabled.Load() { // 已启动就不能启动了
		return nil
	}
	hac.mx.Lock()
	defer hac.mx.Unlock()

	hac.ctx, hac.ctxCancel = context.WithCancel(context.Background())
	hac.sendQueue = queue.NewPriorityQueue()
	hac.view.SQueue = hac.sendQueue.SQueueView
	//启动连接
	if err := hac.startActiveConnect(); err != nil {
		return err
	}

	// 首次连接成功后, 才会开启 循环重连
	if hac.config.AutoReconnect {
		if hac.hTimer == nil {
			hac.hTimer = time.AfterFunc(hac.config.T5, hac.activeConnCheckTimer)
			hac.log.Info("start timing detector")
		} else {
			hac.hTimer.Reset(hac.config.T5)
		}
	}
	hac.view.Enabled.Store(true)
	return nil
}

func (hac *HsmsActiveConnection) Disable() error {
	if !hac.view.Enabled.Load() {
		return nil
	}

	hac.mx.Lock()
	defer hac.mx.Unlock()

	hac.stopCheckTimer() // 停止 hTimer
	hac.disable()
	hac.view.Enabled.Store(false)
	return nil
}

func (hac *HsmsActiveConnection) GetView() interface{} {
	hac.mx.RLock()
	defer hac.mx.RUnlock()
	return hac.view
}

// startActiveConnect 此时配置中的active=true
// 主动连接配置中的 ip:port
func (hac *HsmsActiveConnection) startActiveConnect() error {

	if !hac.connected {
		//打开连接
		address := fmt.Sprintf("%s:%d", hac.config.Host, hac.config.GetPort())
		tcpAddr, err := net.ResolveTCPAddr("tcp4", address)
		if err != nil {
			return fmt.Errorf("start active hsms error: %s", err)
		}

		if conn, err1 := net.DialTCP("tcp", nil, tcpAddr); err1 != nil {
			hac.log.Warn(err1)
			return err1
		} else {
			hac.log.Infof("connected to %s", address)
			hac.sock = conn
			//消息处理
			hac.startReceiverAndSender()
		}
	}
	return nil
}

func (hac *HsmsActiveConnection) activeConnCheckTimer() {
	defer hac.hTimer.Reset(hac.config.T5)

	//连接
	hac.startActiveConnect()
}

func (hac *HsmsActiveConnection) stopCheckTimer() {
	if hac.hTimer != nil {
		// 停止计时器, 停止计数器并不会关闭 通道, 以防止从通道读取成功
		hac.hTimer.Stop()
	}
}

3、equipment mode

package connection

import (
	"secs-gem/hsms/queue"

	"context"
	"fmt"
	"net"
)

/*-----------------------------------

	server (equipment mode)

-----------------------------------*/

type HsmsPassiveConnection struct {
	*HsmsConnection
	tcpServer *net.TCPListener
}

func (hpc *HsmsPassiveConnection) Enable() error {
	if hpc.view.Enabled.Load() { // 已启动就不能启动了
		return nil
	}

	hpc.mx.Lock()
	defer hpc.mx.Unlock()

	hpc.ctx, hpc.ctxCancel = context.WithCancel(context.Background())
	hpc.sendQueue = queue.NewPriorityQueue()
	hpc.view.SQueue = hpc.sendQueue.SQueueView

	if err := hpc.startPassiveServer(); err != nil {
		return err
	}
	hpc.view.Enabled.Store(true)
	return nil
}

func (hpc *HsmsPassiveConnection) Disable() error {
	if !hpc.view.Enabled.Load() {
		return nil
	}

	hpc.mx.Lock()
	defer hpc.mx.Unlock()

	hpc.HsmsConnection.disable()
	hpc.tcpServer.Close()
	hpc.view.Enabled.Store(false)
	return nil
}

func (hpc *HsmsPassiveConnection) GetView() interface{} {
	hpc.mx.RLock()
	defer hpc.mx.RUnlock()

	return hpc.view
}

// startPassiveServer 此时配置文件中的active=false
func (hpc *HsmsPassiveConnection) startPassiveServer() error {
	address := fmt.Sprintf("%s:%d", hpc.config.Host, hpc.config.GetPort())
	tcpAddr, err := net.ResolveTCPAddr("tcp4", address) // 获取一个TCP的Addr
	if err != nil {
		return fmt.Errorf("start passive hsms error1: %s", err)
	}

	var err1 error
	// 监听服务器地址
	if hpc.tcpServer, err1 = net.ListenTCP("tcp", tcpAddr); err1 != nil {
		return fmt.Errorf("start passive hsms error2: %s", err1)
	}

	hpc.log.Info("passive server started")
	go func() {
		defer func() {
			hpc.view.IsListening = false
		}()
		hpc.view.IsListening = true

		for hpc.ctx.Err() == nil {
			conn, err := hpc.tcpServer.AcceptTCP() // 阻塞等待客户端建立连接请求
			if err != nil {
				hpc.log.Warnf("accept error %s", err)
				continue
			}
			if hpc.sock != nil { // 意味着此时只能有一个连接能够连接成功!!
				if hpc.connected {
					hpc.log.Warnf("exists conn %s", hpc.sock)
					conn.Close()
					continue
				}
			}
			hpc.log.Infof("connected by %s", conn.RemoteAddr())
			hpc.sock = conn

			go hpc.startReceiverAndSender()
		}
	}()
	return nil
}

看过上述代码后,其实最重要的就是相同部分中如何接受数据以及如何处理数据,把这个搞懂了,其他的就比较好理解。

通过上面的 _processReceiveBuffer 函数,我们也可以看到了 tcp 解决粘包的经典解决方式,即传输的数据格式采用TLV的方式组合数据。

对此不太熟悉的同学,可以看看刘丹冰大佬写的 zinx 框架,此链接介绍了 TLV封包来解决 TCP 粘包的问题, 5.2 消息的封包与拆包