11、从0到1实现SECS协议之HSMS协议中的handler

发布时间 2023-08-27 14:25:26作者: 画个一样的我

11、从0到1实现SECS协议之HSMS协议中的handler

前面实现了 发送事件的处理机制,接下来我们即将实现ISecsHandler 接口,将前面实现的各种功能组合起来,从而提供一个完整可用的服务。

1、handler 的具体实现

package handler

import (
	"context"
	"errors"
	"fmt"
	"github.com/looplab/fsm"
	"go.uber.org/zap"
	"math/rand"
	"runtime/debug"
	"secs-gem/functions"
	"sync"
	"sync/atomic"
	"time"

	"secs-gem/common"
	secs_errors "secs-gem/errors"
	"secs-gem/hsms/connection"
	packets "secs-gem/hsms/packet"
	"secs-gem/hsms/statemachine"
	driver_log "secs-gem/log"
	"secs-gem/secsgem"
)

// HsmsHandler hsms 协议的处理者
type HsmsHandler struct {
	// 建立连接时应当做的事情
	secsgem.IConnection

	// 消息监听者(接收到 pkt 时如何处理这些 pkt )
	listener secsgem.MessageListener

	// device id
	sessionId uint16

	// hsms 协议中的 连接状态机
	connectionState *statemachine.ConnectionState
	connected       bool // 是否连接
	// hsms 协议中的唯一标识
	// 唯一地标识此消息事务, ReplyMessage 的 SystemBytes 应与其对应回复的Prymary Message的 System Bytes 相同
	systemCounter uint32
	systemQueues  sync.Map // 存储一个 systemCounter 使用到的 优先级队列

	// hsms 协议规定, 客户端需要定时发送 LinkTest.req , 这样才认为通信正常?
	linkTestTimer   *time.Timer
	linkTestTimeout time.Duration
	timeMx          sync.Mutex

	// 日志
	log *zap.SugaredLogger
	mx  sync.RWMutex

	// 桥接模式下的消息转发
	bridge secsgem.ISecsBridge
}

func NewHsmsHandler(cfg *secsgem.ConnectionCfg, listener secsgem.MessageListener) *HsmsHandler {
	// 构造函数

	hh := &HsmsHandler{
		sessionId:       uint16(cfg.SessionId),
		listener:        listener,
		linkTestTimeout: 30 * time.Second,
	}

	// 日志
	hh.log = driver_log.S().With("port", cfg.Port, "active", cfg.Active)

	// 连接
	hh.IConnection = connection.NewConnection(cfg, hh)

	// system bytes
	rand.Seed(time.Now().Unix())
	hh.systemCounter = rand.Uint32()

	//连接状态
	callbacks := fsm.Callbacks{
		"on_enter_CONNECTED":          hh.onStateConnect,
		"on_exit_CONNECTED":           hh.onStateDisconnect,
		"on_enter_CONNECTED_SELECTED": hh.onStateSelect,
	}
	hh.connectionState = statemachine.NewConnectionState("hsms", callbacks)

	return hh
}

func (hh *HsmsHandler) IsActive() bool {
	return hh.GetConfig().Active
}

func (hh *HsmsHandler) SetSessionID(sessionId uint16) {
	/*
			设置sessionID
		使用者: 1.自动获取DeviceID, 当接收到 S9F1 指令时, 说明 device id 设置错误, 这时自动修正 device id 为接收到的数据
	*/
	hh.sessionId = sessionId
}

func (hh *HsmsHandler) Enable() error {
	return hh.IConnection.Enable()
}

func (hh *HsmsHandler) Disable() error {
	return hh.IConnection.Disable()
}

// 连接状态
func (hh *HsmsHandler) onStateConnect(ctx context.Context, e *fsm.Event) {
	hh.log.Debug("on state connect")

	if hh.IsActive() { // 客户端才主动发送心跳 (host)
		//心跳检测(应该在init时启动timer)
		hh.startLinkTester()

		go hh.sendSelectReq()
	}
}

func (hh *HsmsHandler) onStateDisconnect(ctx context.Context, e *fsm.Event) {
	//重置
	hh.timeMx.Lock()
	if hh.linkTestTimer != nil {
		hh.linkTestTimer.Stop()
	}
	hh.timeMx.Unlock()
}

func (hh *HsmsHandler) onStateSelect(ctx context.Context, e *fsm.Event) {
}

func (hh *HsmsHandler) onLinkTestTimer() {
	//发送心跳
	if hh.connected {
		hh.sendLinkTestReq()
	}

	//重置
	hh.timeMx.Lock()
	timer := hh.linkTestTimer
	if timer != nil {
		timer.Reset(hh.linkTestTimeout)
	} else {
		//timer过期了
	}
	hh.timeMx.Unlock()
}

// startLinkTester 开启定时发送心跳包 LinkTest.req
func (hh *HsmsHandler) startLinkTester() {
	//连接心跳
	hh.timeMx.Lock()
	timer := hh.linkTestTimer
	if timer == nil {
		hh.linkTestTimer = time.AfterFunc(hh.linkTestTimeout, hh.onLinkTestTimer)
	} else {
		timer.Reset(hh.linkTestTimeout)
	}
	hh.timeMx.Unlock()
}

// sendSelectReq select.req
// 用于使用Select.req和Select.rsp消息在TCP/IP连接上建立HSMS通信
// 实体由 Not Selected 状态转换为 Selected 状态所使用的消息 (Active Entity 发送Select.req);
func (hh *HsmsHandler) sendSelectReq() *packets.HsmsPacket {
	hh.log.Debug("select.req")

	//packet
	systemId := hh.GetNextSystemCounter()
	queueChan := hh.getQueueForSystem(systemId)
	outPacket := &packets.HsmsPacket{
		Header: packets.HsmsSelectReqHeader(systemId),
	}

	//发送
	hh.IConnection.SendPacket(outPacket, secsgem.PriorityNORMAL)

	//等待结果
	response, err := hh.waitResponse(&queueChan, hh.GetConfig().T6)
	if err != nil {
		hh.log.Debugf("[system=0x%x] %s", systemId, err)
	}

	//移除queue
	hh.removeQueue(systemId)
	return response
}

//  sendSelectRsp select.rsp
func (hh *HsmsHandler) sendSelectRsp(systemId uint32) {
	//packet
	outPacket := &packets.HsmsPacket{
		Header: packets.HsmsSelectRspHeader(systemId),
	}
	hh.IConnection.SendPacket(outPacket, secsgem.PriorityRESPONSE)
}

// sendLinkTestReq linktest.req
func (hh *HsmsHandler) sendLinkTestReq() *packets.HsmsPacket {
	hh.log.Debugf("linktest.req (connected=%v)", hh.connected)
	//packet
	systemId := hh.GetNextSystemCounter()
	queueChan := hh.getQueueForSystem(systemId)
	outPacket := &packets.HsmsPacket{
		Header: packets.HsmsLinkTestReqHeader(systemId),
	}
	//日志
	secsgem.PublishEvent(&secsgem.SecsMessageSend{
		PortID:   hh.GetConfig().ID,
		System:   systemId,
		Stream:   outPacket.Header.Stream,
		Function: outPacket.Header.Function,
		Packet:   outPacket,
	})

	//发送
	hh.IConnection.SendPacket(outPacket, secsgem.PriorityNORMAL)

	//等待结果
	response, err := hh.waitResponse(&queueChan, hh.GetConfig().T6)
	if err != nil {
		hh.log.Warnf("[system=0x%x] %s", systemId, err)
	}

	//移除queue
	hh.removeQueue(systemId)
	return response
}

// sendLinktestRsp linktest.rsp
func (hh *HsmsHandler) sendLinktestRsp(systemId uint32) {
	//packet
	outPacket := &packets.HsmsPacket{
		Header: packets.HsmsLinkTestRspHeader(systemId),
	}
	//日志
	secsgem.PublishEvent(&secsgem.SecsMessageSend{
		PortID:   hh.GetConfig().ID,
		System:   systemId,
		Stream:   outPacket.Header.Stream,
		Function: outPacket.Header.Function,
		Packet:   outPacket,
	})
	hh.IConnection.SendPacket(outPacket, secsgem.PriorityRESPONSE)
}

//deselect.req
func (hh *HsmsHandler) sendDeselectReq() *packets.HsmsPacket {
	hh.log.Debug("deselect.req")
	systemId := hh.GetNextSystemCounter()
	queueChan := hh.getQueueForSystem(systemId)
	outPacket := &packets.HsmsPacket{
		Header: packets.HsmsDeselectReqHeader(systemId),
	}

	//发送
	hh.IConnection.SendPacket(outPacket, secsgem.PriorityNORMAL)

	//等待结果
	response, err := hh.waitResponse(&queueChan, hh.GetConfig().T6)
	if err != nil {
		hh.log.Debug(err)
	}

	//移除queue
	hh.removeQueue(systemId)
	return response
}

//deselect.rsp
func (hh *HsmsHandler) sendDeselectRsp(systemId uint32) {
	outPacket := &packets.HsmsPacket{
		Header: packets.HsmsDeselectRspHeader(systemId),
	}
	//发送
	hh.IConnection.SendPacket(outPacket, secsgem.PriorityRESPONSE)
}

//separate.req
func (hh *HsmsHandler) sendSeparateReq() {
	systemId := hh.GetNextSystemCounter()
	outPacket := &packets.HsmsPacket{
		Header: packets.HsmsSeparateReqHeader(systemId),
	}
	//发送
	hh.IConnection.SendPacket(outPacket, secsgem.PriorityNORMAL)
}

// GetNextSystemCounter 获取一个未使用的 system id
func (hh *HsmsHandler) GetNextSystemCounter() uint32 {
	atomic.AddUint32(&hh.systemCounter, 1)
	return hh.systemCounter

}

// waitResponse 等待结果, 等待从 queue chan 中获取收到的 pkt
// queueChan: 对于每一个 system id 都有一个专属的 queue chan
// timeout: 最大等待时长
// 现在就需要知道哪个地方在往 queueChan 中发送 pkt
// 实际上往 queueChan 中发送 pkt 的地方是 OnConnectionPacketReceived 函数,
// 函数中会先根据 system id 尝试从 systemQueues(sync.Map) 获取 chan ,能得到说明是自己主动发送的消息的回复
func (hh *HsmsHandler) waitResponse(queueChan *chan packets.HsmsPacket, timeout time.Duration) (*packets.HsmsPacket, error) {
	timer := common.GlobalTimerPool.Get(timeout)
	defer common.GlobalTimerPool.Put(timer)

	select {
	case response, ok := <-*queueChan:
		if !ok {
			return nil, errors.New("wait channel closed")
		}
		return &response, nil
	case <-hh.IConnection.GetCtx().Done():
		return nil, fmt.Errorf("wait response context canceled")
	case <-timer.C:
		return nil, fmt.Errorf("wait response timeout(T3=%s) ", timeout)
	}
}

/*
	connection 事件接口
*/

func (hh *HsmsHandler) OnConnectionEstablished() {
	hh.connected = true
	hh.log.Info("Connection established")
	hh.connectionState.FSM.Event(context.Background(), "connect")
	secsgem.PublishEvent(&secsgem.ConnectionEstablished{PortID: hh.GetConfig().ID})
}

func (hh *HsmsHandler) OnConnectionPacketReceived(conn secsgem.IConnection, packet interface{}) {
	pkt, ok := packet.(*packets.HsmsPacket)
	if !ok {
		return
	}
	// 往 rxgo 中发送事件
	secsgem.PublishEvent(&secsgem.SecsMessageRecv{
		PortID:   hh.GetConfig().ID,
		System:   pkt.Header.System,
		Stream:   pkt.Header.Stream,
		Function: pkt.Header.Function,
		Packet:   pkt,
	})

	systemId := pkt.Header.System
	if pkt.Header.SType > 0 {
		// 此时是非 data message
		hh._handleHsmsRequests(pkt)
	} else { // data message
		if _queueChan, ok := hh.systemQueues.Load(systemId); ok {
			queueChan := _queueChan.(chan packets.HsmsPacket)
			// 这里就是 waitResponse 函数中等待数据发送的地方
			queueChan <- *pkt
		} else {
			//未进行标记的消息
			hh._OnSecsPacketReceived(pkt)
		}
	}
}
func (hh *HsmsHandler) _handleHsmsRequests(pkt *packets.HsmsPacket) {
	systemId := pkt.Header.System

	hh.log.Debugf("[system=0x%x] < %s", systemId, pkt)

	switch pkt.Header.SType {
	case 0x01:
		// 说明此 pkt 是 select.req, 需要进行回复
		hh.sendSelectRsp(pkt.Header.System)
		hh.connectionState.FSM.Event(context.Background(), "select")
	case 0x02:
		// 说明此 pkt 是 select.rsp, 即前面发送 select.req 消息的回复
		hh.connectionState.FSM.Event(context.Background(), "select")
		if _queueChan, ok := hh.systemQueues.Load(systemId); ok {
			queueChan := _queueChan.(chan packets.HsmsPacket)
			queueChan <- *pkt
		}
	case 0x03:
		// 说明此 pkt 是 deselect.req
		// 用于在断开TCP/IP连接之前,为一个实体提供一个优雅的HSMS通信终止
		// HSMS要求使用该Procedure时连接处于Selected状态。
		// 这里并不用严格按照 协议来实现
		hh.sendDeselectRsp(pkt.Header.System)
		hh.connectionState.FSM.Event(context.Background(), "deselect")
	case 0x04:
		hh.connectionState.FSM.Event(context.Background(), "deselect")
		if _queueChan, ok := hh.systemQueues.Load(systemId); ok {
			queueChan := _queueChan.(chan packets.HsmsPacket)
			queueChan <- *pkt
		}
	case 0x05:
		hh.sendLinktestRsp(pkt.Header.System)
	default:
		if _queueChan, ok := hh.systemQueues.Load(systemId); ok {
			queueChan := _queueChan.(chan packets.HsmsPacket)
			queueChan <- *pkt
		}
	}
}

func (hh *HsmsHandler) OnConnectionBeforeClosed() {
	hh.log.Info("Connection to be close")
	hh.sendSeparateReq()
}

func (hh *HsmsHandler) OnConnectionClosed() {
	hh.log.Info("Connection closed")
	hh.connectionState.FSM.Event(context.Background(), "disconnect")
	hh.connected = false
}

func (hh *HsmsHandler) _OnSecsPacketReceived(packet *packets.HsmsPacket) (err error) {
	defer func() {
		if err1 := recover(); err1 != nil {
			err = secs_errors.CovertError(err1)
			hh.log.Warn(string(debug.Stack()))
		}
	}()

	//自动修改device id
	if packet.Header.Stream == 9 && packet.Header.Function == 1 {
		hh.sessionId = packet.Header.SessionID
	}

	if hh.listener != nil {
		// 下面是决定收到的消息如何处理, 为什么这里并没有在此仓库中实现如何处理接收到的消息呢?
		// 往高一层级思考, 此仓库的作用是如何编解码、建立连接、接受pkt 等公共的功能, 但是收到的 pkt 应当如何处理应该由使用此仓库的者去决定
		// 这样此仓库的功能更加纯粹, 后面也有更多扩展性的可能
		if packet.Header.Function%2 == 1 {
			hh.listener.PrimaryIn(hh, packet)
		} else {
			hh.listener.SecondaryIn(hh, packet)
		}
	}
	return nil
}

// getQueueForSystem 创建 存储元素为 HsmsPacket 的 channel, 并将其存放在并发安全的 sync.Map 中
// 这里使用不当时, 会造成 queue 中的 数据丢失
func (hh *HsmsHandler) getQueueForSystem(systemId uint32) chan packets.HsmsPacket {
	//发送队列
	queue := make(chan packets.HsmsPacket, 10)
	hh.systemQueues.Store(systemId, queue)
	return queue
}

// removeQueue 删除 sync.Map 中的 queue
func (hh *HsmsHandler) removeQueue(systemId uint32) {
	if _, ok := hh.systemQueues.Load(systemId); ok {
		hh.systemQueues.Delete(systemId)
	}
}

// SendResponse 接收到消息时发送回复的响应
// 后期调用此方式的地方主要有两个地方, 一个是 flow 流程中的 PrimaryIn, 另
// 外一个是 StandardEquipment 中的 PrimaryIn
func (hh *HsmsHandler) SendResponse(packet interface{}, system uint32) error {
	// 消息响应
	var err error
	var pkt *secsgem.Packet

	if sf, ok := packet.(*functions.SecsStreamFunction); ok {
		if pkt, err = sf.CastPacket(); err != nil {
			return err
		}
	} else {
		if pkt, err = secsgem.CastPacket(packet); err != nil {
			return err
		}
	}

	// 发送
	header := &packets.HsmsHeader{
		SessionID:       hh.sessionId,
		RequireResponse: false,
		Stream:          pkt.Stream,
		Function:        pkt.Function,
		System:          system,
	}
	outPacket := &packets.HsmsPacket{
		Header: header,
		Data:   pkt.Data,
	}
	//往 rxgo 中发送数据
	secsgem.PublishEvent(&secsgem.SecsMessageSend{
		PortID:   hh.GetConfig().ID,
		System:   header.System,
		Stream:   header.Stream,
		Function: header.Function,
		Packet:   outPacket,
	})

	return hh.IConnection.SendPacket(outPacket, secsgem.PriorityRESPONSE)
}

// SendAndWaitForResponse 发送并等待回复的响应
func (hh *HsmsHandler) SendAndWaitForResponse(packet interface{}, priority int) (interface{}, error) {
	var err error
	var pkt *secsgem.Packet
	if sf, ok := packet.(*functions.SecsStreamFunction); ok {
		if pkt, err = sf.CastPacket(); err != nil {
			return nil, err
		}
	} else {
		if pkt, err = secsgem.CastPacket(packet); err != nil {
			return nil, err
		}
	}

	if _, ok := hh.systemQueues.Load(pkt.System); pkt.System != 0 && ok {
		// 指定 system id,system id已存在
		// 这个时候不能够再使用此 system id , 否则影响数据的正确性
		pkt.System = 0
	}

	if pkt.System == 0 {
		//自动id
		pkt.System = hh.GetNextSystemCounter()
	}
	systemId := pkt.System

	queueChan := hh.getQueueForSystem(pkt.System)
	defer hh.removeQueue(pkt.System)

	//发送
	header := &packets.HsmsHeader{
		SessionID:       hh.sessionId,
		RequireResponse: true,
		Stream:          pkt.Stream,
		Function:        pkt.Function,
		System:          pkt.System,
	}
	outPacket := &packets.HsmsPacket{
		Header: header,
		Data:   pkt.Data,
	}

	//日志
	secsgem.PublishEvent(&secsgem.SecsMessageSend{
		PortID:   hh.GetConfig().ID,
		System:   header.System,
		Stream:   header.Stream,
		Function: header.Function,
		Packet:   outPacket,
	})

	if err1 := hh.IConnection.SendPacket(outPacket, priority|secsgem.PriorityNORMAL); err1 != nil {
		hh.log.Debugf("[system=0x%x] %s", systemId, err1)
		return nil, err1
	}

	hh.log.Debugf("[system=0x%x] start t3 timer", systemId)
	response, err2 := hh.waitResponse(&queueChan, hh.GetConfig().T3)
	if err2 != nil {
		hh.log.Debugf("[system=0x%x] %s", systemId, err2)
		return nil, fmt.Errorf("system=0x%x %s", systemId, err2)
	}
	hh.log.Debugf("[system=0x%x] end t3 timer", systemId)

	return response, nil
}

// SendStreamFunction 发送消息, 只发送不等待
func (hh *HsmsHandler) SendStreamFunction(packet interface{}) error {
	//只发送不等待
	var err error
	var pkt *secsgem.Packet
	if sf, ok := packet.(*functions.SecsStreamFunction); ok {
		if pkt, err = sf.CastPacket(); err != nil {
			return err
		}
	} else {
		if pkt, err = secsgem.CastPacket(packet); err != nil {
			return err
		}
	}

	if pkt.System == 0 {
		pkt.System = hh.GetNextSystemCounter()
	}

	//发送
	header := &packets.HsmsHeader{
		SessionID:       hh.sessionId,
		RequireResponse: pkt.RequireResponse,
		Stream:          pkt.Stream,
		Function:        pkt.Function,
		System:          pkt.System,
	}
	outPacket := &packets.HsmsPacket{
		Header: header,
		Data:   pkt.Data,
	}
	//日志
	secsgem.PublishEvent(&secsgem.SecsMessageSend{
		PortID:   hh.GetConfig().ID,
		System:   header.System,
		Stream:   header.Stream,
		Function: header.Function,
		Packet:   packet,
	})

	return hh.IConnection.SendPacket(outPacket, secsgem.PriorityNORMAL)
}

func (hh *HsmsHandler) Connected() bool {
	return hh.connected
}

func (hh *HsmsHandler) SetBridge(bridge secsgem.ISecsBridge) {
	hh.bridge = bridge
}

func (hh *HsmsHandler) SupportBridgeRecv(fromHandler secsgem.ISecsHandler, packet interface{}, originalResponseUnused interface{}) (interface{}, error) {
	if hh.bridge != nil {
		return hh.bridge.SupportBridgeRecv(fromHandler, packet, originalResponseUnused)
	}
	return originalResponseUnused, nil
}

func (hh *HsmsHandler) GetView() interface{} {
	return hh.IConnection.GetView()
}

func (hh *HsmsHandler) SetConfig(cfg *secsgem.ConnectionCfg) {
	view := hh.IConnection.GetView().(*secsgem.ConnView)
	if !view.Enabled.Load() { // 未连接的才可以修改 cfg
		//连接
		hh.IConnection = connection.NewConnection(cfg, hh)
		hh.IConnection.SetConfig(cfg)
	}
}

2、handler 的单元测试

package handler

import (
	"fmt"
	"runtime"
	"secs-gem/secsgem"
	"testing"
)

func TestHsmsHandlerForActive(t *testing.T) {
	fmt.Println("-------------------- active 启动两次 --------------------")
	activeConf := secsgem.ConnectionCfg{
		Active:     true,
		DriverType: "hsms",
		Host:       "localhost",
		Port:       5000,
	}

	handler := NewHsmsHandler(&activeConf, nil)
	err := handler.Enable()
	if err != nil {
		t.Fatalf("enable active failed, err:%s", err)
	}
	goroutine1 := runtime.NumGoroutine()

	err2 := handler.Enable()
	if err2 != nil {
		t.Fatalf("enable active failed, err:%s", err)
	}

	goroutine2 := runtime.NumGoroutine()

	if goroutine1 != goroutine2 {
		t.Fatalf("active enable goroutine not equal, goroutine1=%d,goroutine2=%d", goroutine1, goroutine2)
	}

}

func TestHsmsHandlerForPassive(t *testing.T) {
	fmt.Println("-------------------- passive 启动两次 --------------------")
	passiveConf := secsgem.ConnectionCfg{
		Active:     false,
		DriverType: "hsms",
		Host:       "localhost",
		Port:       5001,
	}

	handler := NewHsmsHandler(&passiveConf, nil)
	err := handler.Enable()
	if err != nil {
		t.Fatalf("enable passive failed, err:%s", err)
	}
	goroutine1 := runtime.NumGoroutine()

	err2 := handler.Enable()
	if err2 != nil {
		t.Fatalf("enable passive failed, err:%s", err)
	}

	goroutine2 := runtime.NumGoroutine()

	if goroutine1 != goroutine2 {
		t.Fatalf("passive enable goroutine not equal, goroutine1=%d,goroutine2=%d", goroutine1, goroutine2)
	}

}