生产消费模型-使用channel控制生产速度

发布时间 2023-08-01 10:37:50作者: 知道了呀~
package main

import (
    "fmt"
    "sync"
)

type Producer struct {
    ID         int
    DataStream chan int
    WaitGroup  *sync.WaitGroup
}

func (p *Producer) Produce(concurrency <-chan struct{}) {
    defer p.WaitGroup.Done()
    for i := 0; i < 10; i++ {
        <-concurrency // 允许堆积的计数-1
        p.DataStream <- i
        fmt.Printf("Producer %d produced data: %d\n", p.ID, i)
    }//生产结束要关闭DataStream,否则consumer会一直阻塞
    close(p.DataStream)
}

type Consumer struct {
    ID         int
    DataStream chan int
    WaitGroup  *sync.WaitGroup
}

func (c *Consumer) Consume(concurrency chan<- struct{}) {
    defer c.WaitGroup.Done()
    for data := range c.DataStream {
        fmt.Printf("Consumer %d consumed data: %d\n", c.ID, data)
        concurrency <- struct{}{} // 允许堆积的计数+1
    }
}

func main() {
    //保存消费数据的chan
    dataStream := make(chan int, 10)
    //控制生产速度的chan
    concurrency := make(chan struct{}, 5) // 设置生产者允许堆积的最大消息数
    for i := 0; i < 5; i++ {
        concurrency <- struct{}{}
    }

    var wg sync.WaitGroup
    producer := &Producer{
        ID:         1,
        DataStream: dataStream,
        WaitGroup:  &wg,
    }

    consumer := &Consumer{
        ID:         1,
        DataStream: dataStream,
        WaitGroup:  &wg,
    }

    wg.Add(2)
    go producer.Produce(concurrency)
    go consumer.Consume(concurrency)

    wg.Wait()
    close(concurrency) // 关闭并发信号通道
}