go chan阻塞实例

发布时间 2023-08-01 12:14:41作者: 若-飞

以下的代码段在执行写入通道的时候会发生阻塞:

    spaceId2badgeDatasChan := make(chan map[int32][]*badgeV1.BadgeData)
    spaceId2badgeCountChan := make(chan map[int32]int32)

	var getBadgesTasks []func()
	for _, loopSpaceId := range spaceIds {
		task := func(spaceId int32) func() {
			return func() {
				req := &badgeV1.GetRequest{SpaceIds: []int32{spaceId}}
				getRes, err := uc.badgeClt.Get(ctx, req)
				if err != nil {
					return
				}
				spaceId2badgeDatasChan <- map[int32][]*badgeV1.BadgeData{spaceId: getRes.Data.DataList}
				spaceId2badgeCountChan <- map[int32]int32{spaceId: getRes.Data.Count}
			}
		}(loopSpaceId)
		getBadgesTasks = append(getBadgesTasks, task)
	}

	// 执行任务
	utils.RunParallelTasks(getBadgesTasks...)
	
	// 读取数据
	close(spaceId2badgeCountChan)
	close(spaceId2badgeDatasChan)
	spaceId2badgeDatas := make(map[int32][]*badgeV1.BadgeData)
	spaceId2badgeCount := make(map[int32]int32)
	for m := range spaceId2badgeDatasChan {
		for spaceId, datas := range m {
			spaceId2badgeDatas[spaceId] = datas
		}
	}
	for m := range spaceId2badgeCountChan {
		for spaceId, count := range m {
			spaceId2badgeCount[spaceId] = count
		}
	}

这段代码意图先通过并发执行多任务,获取到数据,然后统一采集数据,但是在代码执行到这两行的时候就卡住了:

spaceId2badgeDatasChan <- map[int32][]*badgeV1.BadgeData{spaceId: getRes.Data.DataList}
spaceId2badgeCountChan <- map[int32]int32{spaceId: getRes.Data.Count}

由于没有设置 spaceId2badgeDatasChanspaceId2badgeCountChan 通道的缓冲区大小,这可能会导致通道在发送数据时被阻塞,因为通道的接收方无法及时读取数据。你可以在通道初始化时指定缓冲区大小来解决这个问题,例如: 

	spaceId2badgeDatasChan := make(chan map[int32][]*badgeV1.BadgeData, len(spaceIds))
	spaceId2badgeCountChan := make(chan map[int32]int32, len(spaceIds))
	var getBadgesTasks []func()
	for _, loopSpaceId := range spaceIds {
		task := func(spaceId int32) func() {
			return func() {
				req := &badgeV1.GetRequest{SpaceIds: []int32{spaceId}}
				getRes, err := uc.badgeClt.Get(ctx, req)
				if err != nil {
					return
				}
				spaceId2badgeDatasChan <- map[int32][]*badgeV1.BadgeData{spaceId: getRes.Data.DataList}
				spaceId2badgeCountChan <- map[int32]int32{spaceId: getRes.Data.Count}
			}
		}(loopSpaceId)
		getBadgesTasks = append(getBadgesTasks, task)
	}

	// 执行任务
	utils.RunParallelTasks(getBadgesTasks...)
	
	// 读取数据
	close(spaceId2badgeCountChan)
	close(spaceId2badgeDatasChan)
	spaceId2badgeDatas := make(map[int32][]*badgeV1.BadgeData)
	spaceId2badgeCount := make(map[int32]int32)
	for m := range spaceId2badgeDatasChan {
		for spaceId, datas := range m {
			spaceId2badgeDatas[spaceId] = datas
		}
	}
	for m := range spaceId2badgeCountChan {
		for spaceId, count := range m {
			spaceId2badgeCount[spaceId] = count
		}
	}

chan的接收和发送需要能同时进行,任何一端都不能卡住

 

通道的缓冲区大小对通道的性能和行为有很大的影响,尤其是在处理大量数据时。如果通道的缓冲区大小过小,发送方可能会被阻塞,因为缓冲区已满,而接收方无法及时读取数据。如果缓冲区大小过大,可能会导致内存浪费和性能下降。

在之前代码中,由于 spaceId2badgeCountChan 的缓冲区大小不够大,发送方在发送数据时被阻塞,而接收方无法及时读取数据。这导致发送方一直等待接收方读取数据,而接收方却无法读取数据,从而导致了阻塞。

通过调整缓冲区大小,可以解决这个问题。如果能够确定通道需要缓存多少数据,可以直接设置缓冲区大小,例如:

spaceId2badgeCountChan := make(chan map[int32]int32, len(spaceIds))

在这个例子中,我将缓冲区大小设置为 len(spaceIds),这样通道就可以缓存所有可能的数据,以避免通道被阻塞。

如果无法确定通道需要缓存的数据量,可以使用无缓冲通道或有缓冲通道。无缓冲通道可以确保发送方和接收方同步,即发送方会等待接收方读取数据,而接收方会等待发送方发送数据。这种方式适用于需要确保发送和接收的顺序的情况。例如:

spaceId2badgeCountChan := make(chan map[int32]int32)

在这个例子中,使用了无缓冲通道,这意味着发送方会等待接收方读取数据,而接收方会等待发送方发送数据。这可以确保发送和接收的顺序正确。

另一种方法是使用有缓冲的通道,这可以避免发送方被阻塞,同时也可以确保发送和接收的顺序正确。你可以根据需要设置缓冲区大小,例如:

spaceId2badgeCountChan := make(chan map[int32]int32, 10)

在这个例子中,我将缓冲区大小设置为 10,这意味着通道可以缓存最多 10 个元素,如果缓冲区已满,发送方会被阻塞。同时,由于有缓冲的通道是异步的,因此发送方不必等待接收方读取数据,这可以提高程序的并发性能。