13、从0到1实现SECS协议之优先级队列(SECS-I)

发布时间 2023-08-27 14:25:26作者: 画个一样的我

13、从0到1实现SECS协议之优先级队列(SECS-I)

逻辑和HSMS协议中的优先级队列一样,只不过存储的数据变了而已。

1、并发安全的优先级队列

package queue

import (
	"secs-gem/common"
	"secs-gem/secs/packets"
	"secs-gem/secsgem"

	"container/heap"
	"context"
	"sync"
	"time"
)

/*---------------------------------------
		优先队列(提供并发安全接口)
---------------------------------------*/

func NewPriorityQueue() *PriorityQueue {
	q := &PriorityQueue{
		SQueueView: &secsgem.SQueueView{},
		fch:        make(chan interface{}, 1),
	}
	return q
}

type PriorityQueue struct {
	*secsgem.SQueueView
	//队列(最小堆)
	items []*QueueItem     //堆
	fch   chan interface{} //推送状态
	rmu   sync.RWMutex     //更新锁
}

// 数据项
type QueueItem struct {
	Value    *packets.SecsPacket   // value
	Priority int                   // 优先级(越小越靠前)
	JoinTime time.Time             // 加入时间
	Blocks   []*packets.SecsPacket //sendblock
	Cursor   int                   //发送索引
	index    int                   // 索引
}

/*---------------------------------------
		heap接口
---------------------------------------*/

func (pq PriorityQueue) Len() int { return len(pq.items) }

func (pq PriorityQueue) Less(i, j int) bool {
	// priority越小越靠前
	if pq.items[i].Priority != pq.items[j].Priority {
		return pq.items[i].Priority < pq.items[j].Priority
	} else {
		return pq.items[i].JoinTime.Sub(pq.items[j].JoinTime) < 0
	}
}

func (pq PriorityQueue) Swap(i, j int) {
	pq.items[i], pq.items[j] = pq.items[j], pq.items[i]
	pq.items[i].index = i
	pq.items[j].index = j
}

func (pq *PriorityQueue) Push(x interface{}) {
	n := len(pq.items)
	item := x.(*QueueItem)
	item.index = n
	pq.items = append(pq.items, item)
}

func (pq *PriorityQueue) Pop() interface{} {
	old := *pq
	n := len(old.items)
	if n == 0 {
		return nil
	}
	item := old.items[n-1]
	old.items[n-1] = nil // avoid memory leak
	item.index = -1      // for safety
	pq.items = old.items[0 : n-1]
	return item
}

// update modifies the priority and value of an Item in the queue.
func (pq *PriorityQueue) update(item *QueueItem, value *packets.SecsPacket, priority int) {
	item.Value = value
	item.Priority = priority
	heap.Fix(pq, item.index)
}

/*---------------------------------------
		同步接口
---------------------------------------*/

func (pq *PriorityQueue) Get(ctx context.Context, timeout time.Duration) *QueueItem {
	timer := common.GlobalTimerPool.Get(timeout)
	defer common.GlobalTimerPool.Put(timer)

	for {
		//是否已存在
		if item := pq.PopItem(); item != nil {
			return item
		}

		select {
		case <-pq.fch:
			if item := pq.PopItem(); item != nil {
				return item
			}
		case <-ctx.Done():
			return nil
		case <-timer.C:
			return nil
		}
	}
}

func (pq *PriorityQueue) PopItem() *QueueItem {
	pq.rmu.Lock()
	defer pq.rmu.Unlock()

	if pq.Len() <= 0 {
		return nil
	}
	item := heap.Pop(pq)
	if resultItem, ok := item.(*QueueItem); ok {
		return resultItem
	}
	return nil
}

func (pq *PriorityQueue) PutNoWait(item *QueueItem) {
	pq.rmu.Lock()
	heap.Push(pq, item)
	pq.rmu.Unlock()

	//通知
	pq.kickWaiter()
}

func (pq *PriorityQueue) kickWaiter() {
	//通知
	select {
	case pq.fch <- 1:
	default:
	}
}

func (pq *PriorityQueue) QSize() int {
	pq.rmu.RLock()
	size := pq.Len()
	pq.rmu.RUnlock()
	return size
}

2、测试用例

package queue

import (
	"context"
	"fmt"
	"sync"
	"testing"
	"time"
)

func TestDataSpace(t *testing.T) {
	pq := NewPriorityQueue()
	pq.items = make([]*QueueItem, 10000000)
	for i := 0; i < 10000000; i++ {
		pq.items[i] = &QueueItem{
			Priority: i,
		}
	}

	time.Sleep(time.Second * 30)
}

func TestSendQueuePutAndGet(t *testing.T) {
	/*
		测试put get
	*/
	pq := NewPriorityQueue()

	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		defer wg.Done()
		for i := 0; i < 10000000; i++ {
			pq.PutNoWait(&QueueItem{
				Priority: i,
			})
		}
	}()

	wg.Add(1)
	go func() {
		defer wg.Done()
		j := 0
		for j < 10000000 {
			if item := pq.Get(context.Background(), time.Second*5); item != nil {
				if item.Priority%1000 == 0 {
					fmt.Println(item.Priority, len(pq.items))
				}
				j += 1
			}
		}
	}()
	wg.Wait()
	fmt.Println("over")
	time.Sleep(time.Second * 10)
}

func TestPriorityQueue(t *testing.T) {
	// 测试 PriorityQueue 功能

	var want [1000]int

	pq := NewPriorityQueue()
	for i := 0; i < 1000; i++ {
		i := i
		go func(x int) {
			pq.PutNoWait(&QueueItem{
				Priority: x,
			})
			want[x] = x
		}(i)
	}
	var got [1000]int
	for i := 0; i < 1000; i++ {
		qItem := pq.Get(context.Background(), time.Second*1)
		got[i] = qItem.Priority
	}

	if got != want {
		t.Fatalf("PriorityQueue test failed, want != got")
	}
}