golang 多生产者+多消费者模式

发布时间 2023-09-26 09:08:09作者: 北京涛子

参考

https://gist.github.com/vitan/aedb628a40478cf8b6a33dc87a5ff52f
https://gist.github.com/mochow13/74ee57078d58536929575ab481dd9693

1

package main

import (
	"errors"
	"fmt"
	"math"
	"reflect"
	"sync"
)

const (
	ITEM_COUNT         = 5
	EMPTY_VAL          = math.MaxInt64
	ERROR_QUEUE_CLOSED = "error-closed-queue"
)

// Priority Queue Implement
type PriorityQueue struct {
	queues           []chan int
	capacity         int
	opening_q_counts int
	mutex            *sync.Mutex
}

func (pQ *PriorityQueue) NewPriorityQueue(prioritys int, capacity int) *PriorityQueue {
	pQ.queues = []chan int{}
	pQ.capacity = capacity
	pQ.opening_q_counts = prioritys
	pQ.mutex = &sync.Mutex{}
	for i := 0; i < prioritys; i++ {
		pQ.queues = append(pQ.queues, make(chan int, capacity))
	}
	return pQ
}

func (pQ *PriorityQueue) Enqueue(priority int, val int) error {
	if priority >= len(pQ.queues) || priority < 0 {
		return errors.New("out of index")
	}
	idx := len(pQ.queues) - priority - 1
	pQ.queues[idx] <- val
	return nil
}

func (pQ *PriorityQueue) Dequeue() (int, error) {
	cases := make([]reflect.SelectCase, len(pQ.queues))
	for i, q := range pQ.queues {
		cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(q)}
	}
	for pQ.opening_q_counts > 0 {
		chosen, value, ok := reflect.Select(cases)
		if !ok {
			cases[chosen].Chan = reflect.ValueOf(nil)
			pQ.mutex.Lock()
			pQ.opening_q_counts -= 1
			pQ.mutex.Unlock()
		} else {
			return int(value.Int()), nil
		}
	}
	return EMPTY_VAL, errors.New(ERROR_QUEUE_CLOSED)
}

// Producer&Consumer avatar
func producer(wg *sync.WaitGroup, priority int, pQ *PriorityQueue) {
	defer wg.Done()
	for i := 0; i < ITEM_COUNT; i++ {
		//(wtzhou) Q: why priority*10+i?
		// A: make consumer output readable. change me if needed
		value := priority*10 + i

		if err := pQ.Enqueue(priority, value); err != nil {
			fmt.Printf("ERROR: %s\n", err.Error())
		}
		fmt.Printf("Produced item: %d on priority %d\n", i, priority)
	}
}

func consumer(wg *sync.WaitGroup, pQ *PriorityQueue) {
	defer wg.Done()
	for {
		val, err := pQ.Dequeue()
		if err != nil {
			if err.Error() == ERROR_QUEUE_CLOSED {
				break
			}
		} else {
			fmt.Printf("Dequeue value: %d\n", val)
		}
	}
}

// Sample: produce some value to different priority
func SpawnProducer(wg *sync.WaitGroup, pQ *PriorityQueue) {
	for i := 0; i < 8; i++ {
		wg.Add(1)
		go producer(wg, i, pQ)
	}
}

// Sample: consume some value
func SpawnConsumer(wg *sync.WaitGroup, pQ *PriorityQueue) {
	wg.Add(1)
	go consumer(wg, pQ)
	wg.Add(1)
	go consumer(wg, pQ)
}

func main() {
	fmt.Println("Starting Producer, Consumer")

	pQ := &PriorityQueue{}
	pQ = pQ.NewPriorityQueue(10, 10)

	producer_wg := &sync.WaitGroup{}
	SpawnProducer(producer_wg, pQ)

	producer_wg.Wait()
	// close all the queue
	for _, q := range pQ.queues {
		close(q)
	}

	consumer_wg := &sync.WaitGroup{}
	SpawnConsumer(consumer_wg, pQ)
	consumer_wg.Wait()

	fmt.Println("Exited successfully")
}

2

package main

import (
	"fmt"
	"sync"
)

var messages = [][]string{
	{
		"The world itself's",
		"just one big hoax.",
		"Spamming each other with our",
		"running commentary of bullshit,",
	},
	{
		"but with our things, our property, our money.",
		"I'm not saying anything new.",
		"We all know why we do this,",
		"not because Hunger Games",
		"books make us happy,",
	},
	{
		"masquerading as insight, our social media",
		"faking as intimacy.",
		"Or is it that we voted for this?",
		"Not with our rigged elections,",
	},
	{
		"but because we wanna be sedated.",
		"Because it's painful not to pretend,",
		"because we're cowards.",
		"- Elliot Alderson",
		"Mr. Robot",
	},
}

const producerCount int = 4
const consumerCount int = 3

func produce(link chan<- string, id int, wg *sync.WaitGroup) {
	defer wg.Done()
	for _, msg := range messages[id] {
		link <- msg
	}
}

func consume(link <-chan string, id int, wg *sync.WaitGroup) {
	defer wg.Done()
	for msg := range link {
		fmt.Printf("Message \"%v\" is consumed by consumer %v\n", msg, id)
	}
}

func main() {
	link := make(chan string)
	wp := &sync.WaitGroup{}
	wc := &sync.WaitGroup{}

	wp.Add(producerCount)
	wc.Add(consumerCount)

	for i := 0; i < producerCount; i++ {
		go produce(link, i, wp)
	}

	for i := 0; i < consumerCount; i++ {
		go consume(link, i, wc)
	}

	wp.Wait()
	close(link)
	wc.Wait()
}