Go 并发编程

并发编程

认识

协程的定义

进程是程序在操作系统中的一次执行过程,系统进行资源分配和调度的一个独立单位。

线程是进程的一个执行实体,是 CPU 调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。

协程拥有独立的栈空间,共享堆空间,调度由用户自己控制,本质上有点类似于用户级线程,这些用户级线程的调度也是自己实现的。

一个线程上可以跑多个协程,协程是轻量级的线程。

协程有时也被称为绿色线程。绿色线程是由程序的运行时(runtime)维护的线程。一个绿色线程的内存开销和情景转换(context switching)时耗比一个系统线程常常小得多。 只要内存充足,一个程序可以轻松支持上万个并发协程。

Go 不支持创建系统线程,所以协程是一个 Go 程序内部唯一的并发实现方式。

并发同步

concurrency synchronization

不同的并发计算可能共享一些资源,其中共享内存资源最为常见。 在一个并发程序中,常常会发生下面的情形:

  • 在一个计算向一段内存写数据的时候,另一个计算从此内存段读数据,结果导致读出的数据的完整性得不到保证。
  • 在一个计算向一段内存写数据的时候,另一个计算也向此段内存写数据,结果导致被写入的数据的完整性得不到保证。

这些情形被称为数据竞争(data race)。

并发编程的一大任务就是要调度不同计算,控制它们对资源的访问时段,以使数据竞争的情况不会发生。 此任务常称为并发同步(或者数据同步)。

并发编程中的其它任务包括:

  • 决定需要开启多少计算;
  • 决定何时开启、阻塞、解除阻塞和结束哪些计算;
  • 决定如何在不同的计算中分担工作负载。

协程的状态

当一个新协程被创建的时候,它将自动进入运行状态,一个协程只能从运行状态退出而不能从阻塞状态退出。 如果因为某种原因而导致某个协程一直处于阻塞状态,则此协程将永远不会退出。 除了极个别的应用场景,在编程时我们应该尽量避免出现这样的情形。

一个处于阻塞状态的协程不会自发结束阻塞状态,它必须被另外一个协程通过某种并发同步方法来被动地结束阻塞状态。 如果一个运行中的程序当前所有的协程都出于阻塞状态,则这些协程将永远阻塞下去,程序将被视为死锁了。 当一个程序死锁后,官方标准编译器的处理是让这个程序崩溃。

比如下面这个程序将在运行两秒钟后崩溃。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
package main

import (
	"sync"
	"time"
)

var wg sync.WaitGroup

func main() {
	wg.Add(1)
	go func() {
		time.Sleep(time.Second * 2)
		wg.Wait() // 阻塞在此
	}()
	wg.Wait() // 阻塞在此
}

它的输出:

1
2
fatal error: all goroutines are asleep - deadlock!
...

协程的调度

并非所有处于运行状态的协程都在执行。在任一时刻,只能最多有和逻辑 CPU 数目一样多的协程在同时执行。 因为每个逻辑 CPU 在同一时刻只能最多执行一个协程。

我们可以调用 runtime.NumCPU 函数来查询当前程序可利用的逻辑 CPU 数目。

Go 运行时(runtime)必须让逻辑 CPU 频繁地在不同的处于运行状态的协程之间切换,从而每个处于运行状态的协程都有机会得到执行。 这和操作系统执行系统线程的原理是一样的。

下面这张图显示了一个协程的更详细的生命周期。在此图中,运行状态被细分成了多个子状态。 一个处于排队子状态的协程等待着进入执行子状态。一个处于执行子状态的协程在被执行一会儿(非常短的时间片)之后将进入排队子状态。

https://markdown-1303167219.cos.ap-shanghai.myqcloud.com/goroutine-schedule.png

注意,睡眠和等待系统调用返回子状态被认为是运行状态,而不是阻塞状态。

标准编译器采纳了一种被称为 M-P-G 模型 的算法来实现协程调度。 其中,M 表示系统线程,P 表示逻辑处理器(并非上述的逻辑 CPU),G 表示协程。具体原理详见 [Go 专家编程](./进阶/Go 专家编程.md)

Goroutine

goroutine 奉行通过通信来共享内存,而不是共享内存来通信

goroutine 的概念类似于线程,但 goroutine 是由 Go 的运行时(runtime)调度和管理的。Go 程序会智能地将 goroutine 中的任务合理地分配给每个 CPU。

启动

单个 goroutine

1
2
3
4
5
6
7
func hello() {
    fmt.Println("Hello Goroutine!")
}
func main() {
    go hello()
    fmt.Println("main goroutine done!")
}

执行结果:只打印了 main goroutine done!,并没有打印 Hello Goroutine!。

原因

  • 在程序启动时,Go 程序就会为 main() 函数创建一个默认的 goroutine
  • 当 main() 函数返回的时候该 goroutine 就结束了,所有在 main() 函数中启动的 goroutine 会一同结束
  • 创建新的 goroutine 的时候需要花费一些时间,而此时 main 函数所在的 goroutine 是继续执行的

让 main() 等一等他的子协程

1
2
3
4
5
func main() {
    go hello() // 启动另外一个goroutine去执行hello函数
    fmt.Println("main goroutine done!")
    time.Sleep(time.Second)
}

多个 goroutine

这里使用了 sync.WaitGroup 来实现 goroutine 的同步

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
var wg sync.WaitGroup

func hello(i int) {
    defer wg.Done() // goroutine结束就登记-1
    fmt.Println("Hello Goroutine!", i)
}
func main() {

    for i := 0; i < 10; i++ {
        wg.Add(1) // 启动一个goroutine就登记+1
        go hello(i)
    }
    wg.Wait() // 等待所有登记的goroutine都结束
}

多次执行上面的代码,会发现每次打印的数字的顺序都不一致。这是因为 10 个 goroutine 是并发执行的,而 goroutine 的调度是随机的。

Channel

通过共享内存来通讯和通过通讯来共享内存是并发编程中的两种编程风格。 当通过共享内存来通讯的时候,我们需要一些传统的并发同步技术(比如互斥锁)来避免数据竞争。

Go 提供了一种独特的并发同步技术来实现通过通讯来共享内存。此技术即为通道

我们可以把一个通道看作是在一个程序内部的一个先进先出(FIFO:first in first out)数据队列。 一些协程可以向此通道发送数据,另外一些协程可以从此通道接收数据。

随着一个数据值的传递(发送和接收),一些数据值的所有权从一个协程转移到了另一个协程。

  • 当一个协程发送一个值到一个通道,我们可以认为此协程释放了一些值的所有权
  • 当一个协程从一个通道接收到一个值,我们可以认为此协程获取了一些值的所有权

当然,在通过通道传递数据的时候,也可能没有任何所有权发生转移。

所有权发生转移的值常常被传递的值所引用着,但有时候也并非如此。 在 Go 中,数据所有权的转移并非体现在语法上,而是体现在逻辑上。 Go 通道可以帮助程序员轻松地避免数据竞争,但不会防止程序员因为犯错而写出错误的并发代码的情况发生。

尽管 Go 也支持几种传统的数据同步技术,但是只有通道为一等公民。 通道是 Go 中的一种类型,所以我们可以无需引进任何代码包就可以使用通道。 几种传统的数据同步技术提供在 syncsync/atomic 标准库包中。

实事求是地说,每种并发同步技术都有它们各自的最佳应用场景,但是通道的 应用范围更广。 使用通道来做同步常常可以使得代码看上去更整洁和易于理解。不过通道的编程体验常常很有趣(?)以至于程序员们经常在并非是通道的最佳应用场景中仍坚持使用通道。

channel 的类型和值

和数组、切片以及映射类型一样,每个通道类型也有一个元素类型。 一个通道只能传送它的(通道类型的)元素类型的值。

通道可以是双向的,也可以是单向的。

  • 字面形式 chan T 表示一个元素类型为 T 的双向通道类型。 编译器允许从此类型的值中接收和向此类型的值中发送数据。
  • 字面形式 chan<- T 表示一个元素类型为 T 的单向发送通道类型。 编译器不允许从此类型的值中接收数据。
  • 字面形式 <-chan T 表示一个元素类型为 T 的单向接收通道类型。 编译器不允许向此类型的值中发送数据。

双向通道 chan T 的值可以被隐式转换为单向通道类型 chan<- T<-chan T,但反之不行(即使显式也不行)。 类型 chan<- T<-chan T 的值也不能相互转换。

每个通道值有一个容量属性。此属性的意义将在下一节中得到解释。

  • 一个容量为 0 的通道值称为一个非缓冲通道(unbuffered channel)
  • 一个容量不为 0 的通道值称为一个缓冲通道(buffered channel)。

通道类型的零值也使用预声明的 nil 来表示。 一个非零通道值必须通过内置的 make 函数来创建。 比如 make(chan int, 10) 将创建一个元素类型为 int 的通道值。 第二个参数指定了欲创建的通道的容量(可选的,默认值为 0)。

channel 值的比较

所有通道类型均为可比较类型。

一个通道值可能含有底层部分。 当一个通道值被赋给另一个通道值后,这两个通道值将共享相同的底层部分

换句话说,这两个通道引用着同一个底层的内部通道对象。 比较这两个通道的结果为 true

channel 操作

Go 中有五种通道相关的操作。假设一个通道(值)为 ch,下面列出了这五种操作的语法或者函数调用。

  1. 调用内置函数 close 来关闭一个通道:

    1
    
    close(ch)
    

    传给 close 函数调用的实参必须为一个通道值,并且此通道值不能为单向接收的

  2. 使用下面的语法向通道 ch 发送一个值 v

    1
    
    ch <- v
    

    v 必须能够赋值给通道 ch 的元素类型。 ch 不能为单向接收通道。 <- 称为数据发送操作符。

  3. 使用下面的语法从通道 ch 接收一个值:

    1
    
    <-ch
    

    如果一个通道操作不永久阻塞,它总会返回至少一个值,此值的类型为通道 ch 的元素类型。

    ch 不能为单向发送通道。<- 称为数据接收操作符,它和数据发送操作符的表示形式一样。

    在大多数场合下,一个数据接收操作可以被认为是一个单值表达式。

    但是,当一个数据接收操作被用做一个赋值语句中的唯一的源值的时候,它可以返回第二个可选的 " 类型不确定的布尔值 " 返回值从而成为一个多值表达式。;这个 " 类型不确定的布尔值 " 表示第一个接收到的值是否是在通道被关闭前发送的。

    从后面的章节,我们将得知我们可以从一个已关闭的通道中接收到无穷个值

    数据接收操作在赋值中被用做源值的例子:

    1
    2
    
    v = <-ch
    v, sentBeforeClosed = <-ch
    
  4. 查询一个通道的容量:

    1
    
    cap(ch)
    

    cap() 是一个内置函数。其返回值的类型为内置类型 int

  5. 查询一个通道的长度:

    1
    
    len(ch)
    

    len() 是一个内置函数。其返回值的类型也为内置类型 int。 一个通道的长度是指当前有多少个已被发送到此通道但还未被接收出去的元素值。

Go 中大多数的基本操作都是未同步的。换句话说,它们都不是并发安全的。 这些操作包括赋值、传参、和各种容器值操作等。 但是,上面列出的五种通道相关的操作都已经同步过了,因此它们可以在并发协程中安全运行而无需其它同步操作。

注意:通道的赋值和其它类型值的赋值一样,是未同步的。 同样,将刚从一个通道接收出来的值赋给另一个值也是未同步的。

如果被查询的通道为一个 nil 零值通道,则 caplen 函数调用都返回 0。 这两个操作是如此简单,所以后面将不再对它们进行详解。 事实上,这两个操作在实践中很少使用。

⭐ channel 操作详解

为了让解释简单清楚,在本文后续部分,通道将被归为三类:

  1. 零值(nil)通道;
  2. 非零值但已关闭的通道;
  3. 非零值并且尚未关闭的通道。

下表简单地描述了三种通道操作施加到三类通道的结果。

操作一个零值 nil 通道一个非零值但已关闭的通道一个非零值且尚未关闭的通道
关闭产生恐慌产生恐慌成功关闭 (C)
发送数据永久阻塞产生恐慌阻塞或者成功发送 (B)
接收数据永久阻塞永不阻塞 (D)阻塞或者成功接收 (A)

对于上表中的五种未打上标的情形,规则很简单:

  • 关闭一个 nil 通道或者一个已经关闭的通道将产生一个恐慌。
  • 向一个已关闭的通道发送数据也将导致一个恐慌。
  • 向一个 nil 通道发送数据或者从一个 nil 通道接收数据将使当前协程永久阻塞。

下面将详细解释其它四种被打了上标(A/B/C/D)的情形。

channel 大致的内部实现

为了更好地理解通道和为了后续讲解方便,先了解一下通道类型的大致内部实现是很有帮助的。

我们可以认为一个通道内部维护了三个队列(均可被视为先进先出队列):

  1. 接收数据协程队列(可以看做是先进先出队列但其实并不完全是,见下面解释)。此队列是一个没有长度限制的链表。 此队列中的协程均处于阻塞状态,它们正等待着从此通道接收数据。
  2. 发送数据协程队列(可以看做是先进先出队列但其实并不完全是,见下面解释)。此队列也是一个没有长度限制的链表。 此队列中的协程亦均处于阻塞状态,它们正等待着向此通道发送数据。 此队列中的每个协程将要发送的值(或者此值的指针,取决于具体编译器实现)和此协程一起存储在此队列中。
  3. 数据缓冲队列。这是一个循环队列(绝对先进先出),它的长度为此通道的容量。此队列中存放的值的类型都为此通道的元素类型。 如果此队列中当前存放的值的个数已经达到此通道的容量,则我们说此通道已经处于满槽状态。 如果此队列中当前存放的值的个数为零,则我们说此通道处于空槽状态。 对于一个非缓冲通道(容量为零),它总是同时处于满槽状态和空槽状态。

每个通道内部维护着一个互斥锁用来在各种通道操作中防止数据竞争。

channel 操作情形

通道操作情形 A: 当一个协程 R 尝试从一个非零且尚未关闭的通道接收数据的时候,此协程 R 将首先尝试获取此通道的锁,成功之后将执行下列步骤,直到其中一个步骤的条件得到满足。

  1. 如果此通道的缓冲队列不为空(这种情况下,接收数据协程队列必为空),此协程 R 将从缓冲队列取出(接收)一个值。 如果发送数据协程队列不为空,一个发送协程将从此队列中弹出,此协程欲发送的值将被推入缓冲队列。此发送协程将恢复至运行状态。 接收数据协程 R 继续运行,不会阻塞。对于这种情况,此数据接收操作为一个非阻塞操作
  2. 否则(即此通道的缓冲队列为空),如果发送数据协程队列不为空(这种情况下,此通道必为一个非缓冲通道), 一个发送数据协程将从此队列中弹出,此协程欲发送的值将被接收数据协程 R 接收。此发送协程将恢复至运行状态。 接收数据协程 R 继续运行,不会阻塞。对于这种情况,此数据接收操作为一个非阻塞操作
  3. 对于剩下的情况(即此通道的缓冲队列和发送数据协程队列均为空),此接收数据协程 R 将被推入接收数据协程队列,并进入阻塞状态。 它以后可能会被另一个发送数据协程唤醒而恢复运行。 对于这种情况,此数据接收操作为一个阻塞操作

通道操作情形 B: 当一个协程 S 尝试向一个非零且尚未关闭的通道发送数据的时候,此协程 S 将首先尝试获取此通道的锁,成功之后将执行下列步骤,直到其中一个步骤的条件得到满足。

  1. 如果此通道的接收数据协程队列不为空(这种情况下,缓冲队列必为空), 一个接收数据协程将从此队列中弹出,此协程将接收到发送协程 S 发送的值。此接收协程将恢复至运行状态。 发送数据协程 S 继续运行,不会阻塞。对于这种情况,此数据发送操作为一个非阻塞操作
  2. 否则(接收数据协程队列为空),如果缓冲队列未满(这种情况下,发送数据协程队列必为空), 发送协程 S 欲发送的值将被推入缓冲队列,发送数据协程 S 继续运行,不会阻塞。 对于这种情况,此数据发送操作为一个非阻塞操作
  3. 对于剩下的情况(接收数据协程队列为空,并且缓冲队列已满),此发送协程 S 将被推入发送数据协程队列,并进入阻塞状态。 它以后可能会被另一个接收数据协程唤醒而恢复运行。 对于这种情况,此数据发送操作为一个阻塞操作

上面已经提到过,一旦一个非零通道被关闭,继续向此通道发送数据将产生一个恐慌。 注意,向关闭的通道发送数据属于一个非阻塞操作

通道操作情形 C: 当一个协程成功获取到一个非零且尚未关闭的通道的锁并且准备关闭此通道时,下面两步将依次执行:

  1. 如果此通道的接收数据协程队列不为空(这种情况下,缓冲队列必为空),此队列中的所有协程将被依个弹出,并且每个协程将接收到此通道的元素类型的一个零值,然后恢复至运行状态。
  2. 如果此通道的发送数据协程队列不为空,此队列中的所有协程将被依个弹出,并且每个协程中都将产生一个恐慌(因为向已关闭的通道发送数据)。 这就是我们在上面说并发地关闭一个通道和向此通道发送数据这种情形属于不良设计的原因。 事实上,在数据竞争侦测编译选项(-race)打开时,Go 官方标准运行时将很可能会对并发地关闭一个通道和向此通道发送数据这种情形报告成数据竞争。

注意:当一个缓冲队列不为空的通道被关闭之后,它的缓冲队列不会被清空,其中的数据仍然可以被后续的数据接收操作所接收到。详见下面的对情形 D 的解释。

通道操作情形 D: 一个非零通道被关闭之后,此通道上的后续数据接收操作将永不会阻塞。

  • 此通道的缓冲队列中存储数据仍然可以被接收出来。 伴随着这些接收出来的缓冲数据的第二个可选返回(类型不确定布尔)值仍然是 true
  • 一旦此缓冲队列变为空,后续的数据接收操作将永不阻塞并且总会返回此通道的元素类型的零值和值为 false 的第二个可选返回结果。

上面已经提到了,一个接收操作的第二个可选返回(类型不确定布尔)结果表示一个接收到的值是否是在此通道被关闭之前发送的。 如果此返回值为 false,则第一个返回值必然是一个此通道的元素类型的零值。

知道哪些通道操作是阻塞的和哪些是非阻塞的对正确理解后面将要介绍的 select 流程控制机制非常重要。

如果一个协程被从一个通道的某个队列中(不论发送数据协程队列还是接收数据协程队列)弹出,并且此协程是在一个 select 控制流程中推入到此队列的,那么此协程将在下面将要讲解的 select 控制流程的执行步骤 中的第 9 步中恢复至运行状态,并且同时它会被从相应的 select 控制流程中的相关的若干通道的协程队列中移除掉。

一些结论

根据上面的解释,我们可以得出如下的关于一个通道的内部的三个队列的各种事实

  • 如果一个通道已经关闭了,则它的发送数据协程队列和接收数据协程队列肯定都为空,但是它的缓冲队列可能不为空。
  • 在任何时刻,如果缓冲队列不为空,则接收数据协程队列必为空。
  • 在任何时刻,如果缓冲队列未满,则发送数据协程队列必为空。
  • 如果一个通道是缓冲的,则在任何时刻,它的发送数据协程队列和接收数据协程队列之一必为空。
  • 如果一个通道是非缓冲的,则在任何时刻,一般说来,它的发送数据协程队列和接收数据协程队列之一必为空, 但是有一个例外:一个协程可能在一个 select 控制流程中同时被推入到此通道的发送数据协程队列和接收数据协程队列中。

一些 channel 的使用例子

来看一些通道的使用例子来加深一下对上一节中的解释的理解。

一个简单的通过一个非缓冲通道实现的请求/响应的例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import (
	"fmt"
	"time"
)

func main() {
	c := make(chan int) // 一个非缓冲通道
	go func(ch chan<- int, x int) {
		time.Sleep(time.Second)
		// <-ch    // 此操作编译不通过
		ch <- x*x  // 阻塞在此,直到发送的值被接收
	}(c, 3)
	done := make(chan struct{})
	go func(ch <-chan int) {
		n := <-ch      // 阻塞在此,直到有值发送到c
		fmt.Println(n) // 9
		// ch <- 123   // 此操作编译不通过
		time.Sleep(time.Second)
		done <- struct{}{}
	}(c)
	<-done // 阻塞在此,直到有值发送到done
	fmt.Println("bye")
}

输出结果:

1
2
9
bye

下面的例子使用了一个缓冲通道。此例子程序并非是一个并发程序,它只是为了展示缓冲通道的使用。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package main

import "fmt"

func main() {
	c := make(chan int, 2) // 一个容量为2的缓冲通道
	c <- 3
	c <- 5
	close(c)
	fmt.Println(len(c), cap(c)) // 2 2
	x, ok := <-c
	fmt.Println(x, ok) // 3 true
	fmt.Println(len(c), cap(c)) // 1 2
	x, ok = <-c
	fmt.Println(x, ok) // 5 true
	fmt.Println(len(c), cap(c)) // 0 2
	x, ok = <-c
	fmt.Println(x, ok) // 0 false
	x, ok = <-c
	fmt.Println(x, ok) // 0 false
	fmt.Println(len(c), cap(c)) // 0 2
	close(c) // 此行将产生一个恐慌
	c <- 7   // 如果上一行不存在,此行也将产生一个恐慌。
}

一场永不休场的足球比赛:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package main

import (
	"fmt"
	"time"
)

func main() {
	var ball = make(chan string)
	kickBall := func(playerName string) {
		for {
			fmt.Print(<-ball, "传球", "\n")
			time.Sleep(time.Second)
			ball <- playerName
		}
	}
	go kickBall("张三")
	go kickBall("李四")
	go kickBall("王二麻子")
	go kickBall("刘大")
	ball <- "裁判"   // 开球
	var c chan bool // 一个零值nil通道
	<-c             // 永久阻塞在此
}

channel 的元素值的传递都是复制过程

在一个值被从一个协程传递到另一个协程的过程中,此值将被复制至少一次。

如果此传递值曾经在某个通道的缓冲队列中停留过,则它在此传递过程中将被复制两次。 一次复制发生在从发送协程向缓冲队列推入此值的时候,另一个复制发生在接收协程从缓冲队列取出此值的时候。

和赋值以及函数调用传参一样,当一个值被传递时,只有它的直接部分被复制

对于官方标准编译器,最大支持的通道的元素类型的尺寸为 65535。 但是,一般说来,为了在数据传递过程中避免过大的复制成本,我们不应该使用尺寸很大的通道元素类型。 如果欲传送的值的尺寸较大,应该改用指针类型做为通道的元素类型。

关于 channel 和协程的垃圾回收

注意,一个通道被其发送数据协程队列和接收数据协程队列中的所有协程引用着。因此,如果一个通道的这两个队列只要有一个不为空,则此通道肯定不会被垃圾回收。

另一方面,如果一个协程处于一个通道的某个协程队列之中,则此协程也肯定不会被垃圾回收,即使此通道仅被此协程所引用。

事实上,一个协程只有在退出后才能被垃圾回收。

数据接收和发送操作都属于简单语句

数据接收和发送操作都属于简单语句。 另外一个数据接收操作总是可以被用做一个单值表达式。 简单语句和表达式可以被用在一些控制流程的某些部分。

在下面这个例子中,数据接收和发送操作被用在两个 for 循环的初始化和步尾语句。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import (
	"fmt"
	"time"
)

func main() {
	fibonacci := func() chan uint64 {
		c := make(chan uint64)
		go func() {
			var x, y uint64 = 0, 1
			for ; y < (1 << 63); c <- y { // 步尾语句
				x, y = y, x+y
			}
			close(c)
		}()
		return c
	}
	c := fibonacci()
	for x, ok := <-c; ok; x, ok = <-c { // 初始化和步尾语句
		time.Sleep(time.Second)
		fmt.Println(x)
	}
}

for-range 应用于 channel

for-range 循环控制流程也适用于通道。 此循环将不断地尝试从一个通道接收数据,直到此通道关闭并且它的缓冲队列为空为止。

和应用于数组/切片/映射的 for-range 语法不同,应用于通道的 for-range 语法中最多只能出现一个循环变量,此循环变量用来存储接收到的值。

1
2
3
for v := range aChannel {
	// 使用v
}

等价于

1
2
3
4
5
6
7
for {
	v, ok = <-aChannel
	if !ok {
		break
	}
	// 使用v
}

当然,这里的通道 aChannel 一定不能为一个单向发送通道。

如果它是一个 nil 零值,则此 for-range 循环将使当前协程永久阻塞。

上一节中的例子中的最后一个 for 循环可以改写为下面这样:

1
2
3
4
for x := range c {
    time.Sleep(time.Second)
    fmt.Println(x)
}

select-case 分支流程控制代码块

Go 中有一个专门为通道设计的 select-case 分支流程控制语法。 此语法和 switch-case 分支流程控制语法很相似。 比如,select-case 流程控制代码块中也可以有若干 case 分支和最多一个 default 分支。 但是,这两种流程控制也有很多不同点。

在一个 select-case 流程控制中:

  • select 关键字和 { 之间不允许存在任何表达式和语句。
  • fallthrough 语句不能被使用.
  • 每个 case 关键字后必须跟随一个通道接收数据操作或者一个通道发送数据操作。 通道接收数据操作可以做为源值出现在一条简单赋值语句中。 以后,一个 case 关键字后跟随的通道操作将被称为一个 case 操作。
  • 所有的非阻塞 case 操作中将有一个被随机选择执行(而不是按照从上到下的顺序),然后执行此操作对应的 case 分支代码块。
  • 所有case 操作均为阻塞的情况下,如果 default 分支存在,则 default 分支代码块将得到执行; 否则,当前协程将被推入所有阻塞操作中相关的通道的发送数据协程队列或者接收数据协程队列中,并进入阻塞状态。

按照上述规则,一个不含任何分支的 select-case 代码块 select{} 将使当前协程处于永久阻塞状态。

一些例子

在下面这个例子中,default 分支将铁定得到执行,因为两个 case 分支后的操作均为阻塞的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
package main

import "fmt"

func main() {
	var c chan struct{} // nil
	select {
	case <-c:             // 阻塞操作
	case c <- struct{}{}: // 阻塞操作
	default:
		fmt.Println("Go here.")
	}
}

下面这个例子中实现了尝试发送(try-send)和尝试接收(try-receive)。 它们都是用含有一个 case 分支和一个 default 分支的 select-case 代码块来实现的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package main

import "fmt"

func main() {
	c := make(chan string, 2)
	trySend := func(v string) {
		select {
		case c <- v:
		default: // 如果c的缓冲已满,则执行默认分支。
		}
	}
	tryReceive := func() string {
		select {
		case v := <-c: return v
		default: return "-" // 如果c的缓冲为空,则执行默认分支。
		}
	}
	trySend("Hello!") // 发送成功
	trySend("Hi!")    // 发送成功
	trySend("Bye!")   // 发送失败,但不会阻塞。
	// 下面这两行将接收成功。
	fmt.Println(tryReceive()) // Hello!
	fmt.Println(tryReceive()) // Hi!
	// 下面这行将接收失败。
	fmt.Println(tryReceive()) // -
}

下面这个程序有 50% 的几率会因为恐慌而崩溃。 此程序中 select-case 代码块中的两个 case 操作均不阻塞,所以随机一个将被执行。 如果第一个 case 操作(向已关闭的通道发送数据)被执行,则一个恐慌将产生。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
package main

func main() {
	c := make(chan struct{})
	close(c)
	select {
	case c <- struct{}{}: // 若此分支被选中,则产生一个恐慌
	case <-c:
	}
}

select-case 流程控制的实现机理

select-case 流程控制是 Go 中的一个重要和独特的特性。 下面列出了官方标准运行时中 select-case 流程控制的 实现步骤

  1. 将所有 case 操作中涉及到的通道表达式和发送值表达式按照从上到下,从左到右的顺序一一估值。 在赋值语句中做为源值的数据接收操作对应的目标值在此时刻不需要被估值。

  2. 将所有分支随机排序。default 分支总是排在最后。 所有 case 操作中相关的通道可能会有重复的。

  3. 为了防止在下一步中造成(和其它协程互相)死锁,对所有 case 操作中相关的通道进行排序。(排序依据并不重要,官方 Go 标准编译器使用通道的地址顺序进行排序)。排序结果中前 N 个通道不存在重复的情况。 N 为所有 case 操作中涉及到的不重复的通道的数量。 下面,通道锁顺序是针对此排序结果中的前 N 个通道来说的,通道锁逆序是指此顺序的逆序。

  4. 按照上一步中的生成通道锁顺序获取所有相关的通道的锁。

  5. 按照第 2 步中生成的分支顺序检查相应分支:

    1. 如果这是一个 case 分支并且相应的通道操作是一个向关闭了的通道发送数据操作,则按照通道锁逆序解锁所有的通道并在当前协程中产生一个恐慌。 跳到第 12 步(完毕)。
    2. 如果这是一个 case 分支并且相应的通道操作是非阻塞的,则按照通道锁逆序解锁所有的通道并执行相应的 case 分支代码块。 (此相应的通道操作可能会唤醒另一个处于阻塞状态的协程。) 跳到第 12 步(完毕)。
    3. 如果这是 default 分支,则按照通道锁逆序解锁所有的通道并执行此 default 分支代码块。 跳到第 12 步(完毕)。

    (到这里,default 分支肯定是不存在的,并且所有的 case 操作均为阻塞的。)

  6. 将当前协程(和对应 case 分支信息)推入到每个 case 操作中对应的通道的发送数据协程队列或接收数据协程队列中。 当前协程可能会被多次推入到同一个通道的这两个队列中,因为多个 case 操作中对应的通道可能为同一个。

  7. 使当前协程进入阻塞状态并且按照通道锁逆序解锁所有的通道。

  8. …,当前协程处于阻塞状态,等待其它协程通过通道操作唤醒当前协程,…

  9. 当前协程被另一个协程中的一个通道操作唤醒。 此唤醒通道操作可能是一个通道关闭操作,也可能是一个数据发送/接收操作。 如果它是一个数据发送/接收操作,则(当前正被解释的 select-case 流程中)肯定有一个相应 case 操作与之配合传递数据。 在此配合过程中,当前协程将从相应 case 操作相关的通道的接收/发送数据协程队列中弹出。

  10. 按照第 3 步中的生成的通道锁顺序获取所有相关的通道的锁。

  11. 将当前协程从各个 case 操作中对应的通道的发送数据协程队列或接收数据协程队列中(可能以非弹出的方式)移除。

    1. 如果当前协程是被一个通道关闭操作所唤醒,则跳到第 5 步。
    2. 如果当前协程是被一个数据发送/接收操作所唤醒,则相应的 case 分支已经在第 9 步中知晓。 按照通道锁逆序解锁所有的通道并执行此 case 分支代码块。
  12. 完毕。

从此实现中,我们得知

  • 一个协程可能同时多次处于同一个通道的发送数据协程队列或接收数据协程队列中。
  • 当一个协程被阻塞在一个 select-case 流程控制中并在以后被唤醒时,它可能会从多个通道的发送数据协程队列和接收数据协程队列中被移除。

通道并非在任何场合总是最佳的并发同步方案

sync 标准库包中提供的并发同步技术

通道并不是 Go 支持的唯一的一种并发同步技术。而且对于一些特定的情形,通道并不是最有效和可读性最高的同步技术。 本文下面将介绍 sync 标准库包中提供的各种并发同步技术。相对于通道,这些技术对于某些情形更加适用。

sync 标准库包提供了一些用于实现并发同步的类型。这些类型适用于各种不同的内存顺序需求。 对于这些特定的需求,这些类型使用起来比通道效率更高,代码实现更简洁。

请注意:为了避免各种异常行为,最好不要复制 sync 标准库包中提供的类型的值。

sync.WaitGroup(等待组)类型

每个 sync.WaitGroup 值在内部维护着一个计数,此计数的初始默认值为零。

*sync.WaitGroup 类型有三个方法:Add(delta int)Done()Wait()

对于一个可寻址的 sync.WaitGroupwg

  • 我们可以使用方法调用 wg.Add(delta) 来改变值 wg 维护的计数。
  • 方法调用 wg.Done()wg.Add(-1) 是完全等价的。
  • 如果一个 wg.Add(delta) 或者 wg.Done() 调用将 wg 维护的计数更改成一个负数,一个恐慌将产生。
  • 当一个协程调用了 wg.Wait() 时,
    • 如果此时 wg 维护的计数为零,则此 wg.Wait() 此操作为一个空操作(no-op);
    • 否则(计数为一个正整数),此协程将进入阻塞状态。 当以后其它某个协程将此计数更改至 0 时(一般通过调用 wg.Done()),此协程将重新进入运行状态(即 wg.Wait() 将返回)。

请注意 wg.Add(delta)wg.Done()wg.Wait() 分别是 (&wg).Add(delta)(&wg).Done()(&wg).Wait() 的简写形式(省略了间址运算符)

一般,一个 sync.WaitGroup 值用来让某个协程等待其它若干协程都先完成它们各自的任务。 一个例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

func main() {
	rand.Seed(time.Now().UnixNano())

	const N = 5
	var values [N]int32

	var wg sync.WaitGroup
	wg.Add(N)
	for i := 0; i < N; i++ {
		i := i
		go func() {
			values[i] = 50 + rand.Int31n(50)
			fmt.Println("Done:", i)
			wg.Done() // <=> wg.Add(-1)
		}()
	}

	wg.Wait()
	// 所有的元素都保证被初始化了。
	fmt.Println("values:", values)
}

在此例中,主协程等待着直到其它 5 个协程已经将各自负责的元素初始化完毕此会打印出各个元素值。 这里是一个可能的程序执行输出结果:

1
2
3
4
5
6
Done: 4
Done: 1
Done: 3
Done: 0
Done: 2
values: [71 89 50 62 60]

我们可以将上例中的 Add 方法调用拆分成多次调用:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
...
	var wg sync.WaitGroup
	for i := 0; i < N; i++ {
		wg.Add(1) // 将被执行5次
		i := i
		go func() {
			values[i] = 50 + rand.Int31n(50)
			wg.Done()
		}()
	}
...

一个 *sync.WaitGroup 值的 Wait 方法可以在多个协程中调用。 当对应的 sync.WaitGroup 值维护的计数降为 0,这些协程都将得到一个(广播)通知而结束阻塞状态。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func main() {
	rand.Seed(time.Now().UnixNano())

	const N = 5
	var values [N]int32

	var wgA, wgB sync.WaitGroup
	wgA.Add(N)
	wgB.Add(1)

	for i := 0; i < N; i++ {
		i := i
		go func() {
			wgB.Wait() // 等待广播通知
			log.Printf("values[%v]=%v \n", i, values[i])
			wgA.Done()
		}()
	}

	// 下面这个循环保证将在上面的任何一个
	// wg.Wait调用结束之前执行。
	for i := 0; i < N; i++ {
		values[i] = 50 + rand.Int31n(50)
	}
	wgB.Done() // 发出一个广播通知
	wgA.Wait()
}

一个 WaitGroup 可以在它的一个 Wait 方法返回之后被重用。 但是请注意,当一个 WaitGroup 值维护的基数为零时,它的带有正整数实参的 Add 方法调用不能和它的 Wait 方法调用并发运行,否则将可能出现数据竞争。

sync.Once 类型

每个 *sync.Once 值有一个 Do(f func()) 方法。 此方法只有一个类型为 func() 的参数。

对一个可寻址的 sync.Onceoo.Do()(即 (&o).Do() 的简写形式)方法调用可以在多个协程中被多次并发地执行, 这些方法调用的实参应该(但并不强制)为同一个函数值。 在这些方法调用中,有且只有一个调用的实参函数(值)将得到调用。 此被调用的实参函数保证在任何 o.Do() 方法调用返回之前退出。 换句话说,被调用的实参函数内的代码将在任何 o.Do() 方法返回调用之前被执行。

一般来说,一个 sync.Once 值被用来确保一段代码在一个并发程序中被执行且仅被执行一次。

一个例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import (
	"log"
	"sync"
)

func main() {
	log.SetFlags(0)

	x := 0
	doSomething := func() {
		x++
		log.Println("Hello")
	}

	var wg sync.WaitGroup
	var once sync.Once
	for i := 0; i < 5; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			once.Do(doSomething)
			log.Println("world!")
		}()
	}

	wg.Wait()
	log.Println("x =", x) // x = 1
}

在此例中,Hello 将仅被输出一次,而 world! 将被输出 5 次,并且 Hello 肯定在所有的 5 个 world! 之前输出。

sync.Mutex(互斥锁)和 sync.RWMutex(读写锁)类型

*sync.Mutex*sync.RWMutex 类型都实现了 sync.Locker 接口类型。 所以这两个类型都有两个方法:Lock()Unlock(),用来保护一份数据不会被多个使用者同时读取和修改。

除了 Lock()Unlock() 这两个方法,*sync.RWMutex 类型还有两个另外的方法:RLock()RUnlock(),用来支持多个读取者并发读取一份数据但防止此份数据被某个数据写入者和其它数据访问者(包括读取者和写入者)同时使用。

(注意:这里的数据读取者和数据写入者不应该从字面上理解。有时候某些数据读取者可能修改数据,而有些数据写入者可能只读取数据。)

一个 Mutex 值常称为一个互斥锁。 一个 Mutex 零值为一个尚未加锁的互斥锁。 一个(可寻址的)Mutexm 只有在未加锁状态时才能通过 m.Lock() 方法调用被成功加锁。 换句话说,一旦 m 值被加了锁(亦即某个 m.Lock() 方法调用成功返回), 一个新的加锁试图将导致当前协程进入阻塞状态,直到此 Mutex 值被解锁为止(通过 m.Unlock() 方法调用)。

m.Lock()m.Unlock() 分别是 (&m).Lock()(&m).Unlock() 的简写形式。

一个使用 sync.Mutex 的例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main

import (
	"fmt"
	"runtime"
	"sync"
)

type Counter struct {
	m sync.Mutex
	n uint64
}

func (c *Counter) Value() uint64 {
	c.m.Lock()
	defer c.m.Unlock()
	return c.n
}

func (c *Counter) Increase(delta uint64) {
	c.m.Lock()
	c.n += delta
	c.m.Unlock()
}

func main() {
	var c Counter
	for i := 0; i < 100; i++ {
		go func() {
			for k := 0; k < 100; k++ {
				c.Increase(1)
			}
		}()
	}

	// 此循环仅为演示目的。
	for c.Value() < 10000 {
		runtime.Gosched()
	}
	fmt.Println(c.Value()) // 10000
}

在上面这个例子中,一个 Counter 值使用了一个 Mutex 字段来确保它的字段 n 永远不会被多个协程同时使用。

一个 RWMutex 值常称为一个读写互斥锁,它的内部包含两个锁:一个写锁和一个读锁。 对于一个可寻址的 RWMutexrwm,数据写入者可以通过方法调用 rwm.Lock()rwm 加写锁,或者通过 rwm.RLock() 方法调用对 rwm 加读锁。 方法调用 rwm.Unlock()rwm.RUnlock() 用来解开 rwm 的写锁和读锁。 rwm 的读锁维护着一个计数。当 rwm.RLock() 调用成功时,此计数增 1;当 rwm.Unlock() 调用成功时,此计数减 1; 一个零计数表示 rwm 的读锁处于未加锁状态;反之,一个非零计数(肯定大于零)表示 rwm 的读锁处于加锁状态。

注意 rwm.Lock()rwm.Unlock()rwm.RLock()rwm.RUnlock() 分别是 (&rwm).Lock()(&rwm).Unlock()(&rwm).RLock()(&rwm).RUnlock() 的简写形式。

对于一个可寻址的 RWMutexrwm,下列规则存在:

  • rwm 的写锁只有在它的写锁和读锁都处于未加锁状态时才能被成功加锁。 换句话说,rwm 的写锁在任何时刻最多只能被一个数据写入者成功加锁,并且 rwm 的写锁和读锁不能同时处于加锁状态。
  • rwm 的写锁正处于加锁状态的时候,任何新的对之加写锁或者加读锁的操作试图都将导致当前协程进入阻塞状态,直到此写锁被解锁,这样的操作试图才有机会成功。
  • rwm 的读锁正处于加锁状态的时候,新的加写锁的操作试图将导致当前协程进入阻塞状态。 但是,一个新的加读锁的操作试图将成功,只要此操作试图发生在任何被阻塞的加写锁的操作试图之前(见下一条规则)。 换句话说,一个读写互斥锁的读锁可以同时被多个数据读取者同时加锁而持有。 当 rwm 的读锁维护的计数清零时,读锁将返回未加锁状态。
  • 假设 rwm 的读锁正处于加锁状态的时候,为了防止后续数据写入者没有机会成功加写锁,后续发生在某个被阻塞的加写锁操作试图之后的所有加读锁的试图都将被阻塞。
  • 假设 rwm 的写锁正处于加锁状态的时候,(至少对于标准编译器来说,)为了防止后续数据读取者没有机会成功加读锁,发生在此写锁下一次被解锁之前的所有加读锁的试图都将在此写锁下一次被解锁之后肯定取得成功,即使所有这些加读锁的试图发生在一些仍被阻塞的加写锁的试图之后。

后两条规则是为了确保数据读取者和写入者都有机会执行它们的操作,避免产生饥饿

请注意:一个锁并不会绑定到一个协程上,即一个锁并不记录哪个协程成功地加锁了它。 换句话说,一个锁的加锁者和此锁的解锁者可以不是同一个协程,尽管在实践中这种情况并不多见。

在上一个例子中,如果 Value 方法被十分频繁调用而 Increase 方法并不频繁被调用,则 Counter 类型的 m 字段的类型可以更改为 sync.RWMutex,从而使得执行效率更高,如下面的代码所示。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
...
type Counter struct {
	//m sync.Mutex
	m sync.RWMutex
	n uint64
}

func (c *Counter) Value() uint64 {
	//c.m.Lock()
	//defer c.m.Unlock()
	c.m.RLock()
	defer c.m.RUnlock()
	return c.n
}
...

sync.RWMutex 值的另一个应用场景是将一个写任务分隔成若干小的写任务。下一节中展示了一个这样的例子。

根据上面列出的后两条规则,下面这个程序最有可能输出 abdc

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package main

import (
	"fmt"
	"time"
	"sync"
)

func main() {
	var m sync.RWMutex
	go func() {
		m.RLock()
		fmt.Print("a")
		time.Sleep(time.Second)
		m.RUnlock()
	}()
	go func() {
		time.Sleep(time.Second * 1 / 4)
		m.Lock()
		fmt.Print("b")
		time.Sleep(time.Second)
		m.Unlock()
	}()
	go func() {
		time.Sleep(time.Second * 2 / 4)
		m.Lock()
		fmt.Print("c")
		m.Unlock()
	}()
	go func () {
		time.Sleep(time.Second * 3 / 4)
		m.RLock()
		fmt.Print("d")
		m.RUnlock()
	}()
	time.Sleep(time.Second * 3)
	fmt.Println()
}

请注意,上例这个程序仅仅是为了解释和验证上面列出的读写锁的后两条加锁规则。 此程序使用了 time.Sleep 调用来做协程间的同步。

sync.Mutexsync.RWMutex 值也可以用来实现通知,尽管这不是 Go 中最优雅的方法来实现通知。 下面是一个使用了 Mutex 值来实现通知的例子。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	var m sync.Mutex
	m.Lock()
	go func() {
		time.Sleep(time.Second)
		fmt.Println("Hi")
		m.Unlock() // 发出一个通知
	}()
	m.Lock() // 等待通知
	fmt.Println("Bye")
}

在此例中,Hi 将确保在 Bye 之前打印出来。 关于 sync.Mutexsync.RWMutex 值相关的内存顺序保证,请阅读 Go中的内存顺序保证 一文。

sync.Cond 类型

sync.Cond 类型提供了一种有效的方式来实现多个协程间的通知

每个 sync.Cond 值拥有一个 sync.Locker 类型的名为 L 的字段。 此字段的具体值常常为一个 *sync.Mutex 值或者 *sync.RWMutex 值。

*sync.Cond 类型有三个方法:Wait()Signal()Broadcast()

每个 Cond 值维护着一个先进先出等待协程队列。 对于一个可寻址的 Condc

  • c.Wait() 必须在 c.L 字段值的锁处于加锁状态的时候调用;否则,c.Wait() 调用将造成一个恐慌。 一个 c.Wait() 调用将

    1. 首先将当前协程推入到 c 所维护的等待协程队列;
    2. 然后调用 c.L.Unlock()c.L 的锁解锁;
    3. 然后使当前协程进入阻塞状态(当前协程将被另一个协程通过 c.Signal()c.Broadcast() 调用唤醒而重新进入运行状态。)一旦当前协程重新进入运行状态,c.L.Lock() 将被调用以试图重新对 c.L 字段值的锁加锁。 此 c.Wait() 调用将在此试图成功之后退出。
  • 一个 c.Signal() 调用将唤醒并移除 c 所维护的等待协程队列中的第一个协程(如果此队列不为空的话)。

  • 一个 c.Broadcast() 调用将唤醒并移除 c 所维护的等待协程队列中的所有协程(如果此队列不为空的话)。

请注意:c.Wait()c.Signal()c.Broadcast() 分别为 (&c).Wait()(&c).Signal()(&c).Broadcast() 的简写形式。

c.Signal()c.Broadcast() 调用常用来通知某个条件的状态发生了变化。 一般说来,c.Wait() 应该在一个检查某个条件是否已经得到满足的循环中调用。

下面是一个典型的 sync.Cond 用例。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

func main() {
    rand.Seed(time.Now().UnixNano())

    const N = 10
    var values [N]string

    cond := sync.NewCond(&sync.Mutex{})

    for i := 0; i < N; i++ {
        d := time.Second * time.Duration(rand.Intn(10)) / 10
        go func(i int) {
            time.Sleep(d) // 模拟一个工作负载
            cond.L.Lock()
            // 下面的修改必须在cond.L被锁定的时候执行
            values[i] = string('a' + i)
            cond.Broadcast() // 可以在cond.L被解锁后发出通知
            cond.L.Unlock()
            // 上面的通知也可以在cond.L未锁定的时候发出。
            //cond.Broadcast() // 上面的调用也可以放在这里
        }(i)
    }

    // 此函数必须在cond.L被锁定的时候调用。
    checkCondition := func() bool {
        fmt.Println(values)
        for i := 0; i < N; i++ {
            if values[i] == "" {
                return false
            }
        }
        return true
    }

    cond.L.Lock()
    defer cond.L.Unlock()
    for !checkCondition() {
        cond.Wait() // 必须在cond.L被锁定的时候调用
    }
}

一个可能的输出:

1
2
3
4
5
6
7
8
9
[         ]
[     f    ]
[  c   f    ]
[  c   f  h  ]
[ b c   f  h  ]
[a b c   f  h  j]
[a b c   f g h i j]
[a b c  e f g h i j]
[a b c d e f g h i j]

因为上例中只有一个协程(主协程)在等待通知,所以其中的 cond.Broadcast() 调用也可以换为 cond.Signal()。 如上例中的注释所示,cond.Broadcast()cond.Signal() 不必在 cond.L 的锁处于加锁状态时调用。

为了防止数据竞争,对自定义条件的修改必须在 cond.L 的锁处于加锁状态时才能执行。 另外,checkCondition 函数和 cond.Wait 方法也必须在 cond.L 的锁处于加锁状态时才可被调用。

事实上,对于上面这个特定的例子,cond.L 字段的也可以为一个 *sync.RWMutex 值。 对自定义条件的十个部分的修改可以在 RWMutex 值的读锁处于加锁状态时执行。这十个修改可以并发进行,因为它们是互不干扰的。 如下面的代码所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
...
cond := sync.NewCond(&sync.RWMutex{})
cond.L.Lock()

for i := 0; i < N; i++ {
    d := time.Second * time.Duration(rand.Intn(10)) / 10
    go func(i int) {
        time.Sleep(d)
        cond.L.(*sync.RWMutex).RLock()
        values[i] = string('a' + i)
        cond.L.(*sync.RWMutex).RUnlock()
        cond.Signal()
    }(i)
}
...

在上面的代码中,此 sync.RWMutex 值的用法有些不符常规。 它的读锁被一些修改数组元素的协程所加锁并持有,而它的写锁被主协程加锁持有用来读取并检查各个数组元素的值。

Cond 值所表示的自定义条件可以是一个虚无。对于这种情况,此 Cond 值纯粹被用来实现通知。 比如,下面这个程序将打印出 abc 或者 bac

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import (
	"fmt"
	"sync"
)

func main() {
	wg := sync.WaitGroup{}
	wg.Add(1)
	cond := sync.NewCond(&sync.Mutex{})
	cond.L.Lock()
	go func() {
		cond.L.Lock()
		go func() {
			cond.L.Lock()
			cond.Broadcast()
			cond.L.Unlock()
		}()
		cond.Wait()
		fmt.Print("a")
		cond.L.Unlock()
		wg.Done()
	}()
	cond.Wait()
	fmt.Print("b")
	cond.L.Unlock()
	wg.Wait()
	fmt.Println("c")
}

如果需要,多个 sync.Cond 值可以共享一个 sync.Locker 值。但是这种情形在实践中并不多见。

sync/atomic 标准库包中提供的原子操作

原子操作是比其它同步技术更基础的操作。原子操作是无锁的,常常直接通过 CPU 指令直接实现。 事实上,其它同步技术的实现常常依赖于原子操作。

注意,本文中的很多例子并非并发程序。它们只是用来演示如何使用 sync/atomic 标准库包中提供的原子操作。

Go 支持的原子操作概述

对于一个整数类型 Tsync/atomic 标准库包提供了下列原子操作函数。 其中 T 可以是内置 int32int64uint32uint64uintptr 类型。

1
2
3
4
5
func AddT(addr *T, delta T)(new T)
func LoadT(addr *T) (val T)
func StoreT(addr *T, val T)
func SwapT(addr *T, new T) (old T)
func CompareAndSwapT(addr *T, old, new T) (swapped bool)

比如,下列五个原子操作函数提供给了内置 int32 类型。

1
2
3
4
5
func AddInt32(addr *int32, delta int32)(new int32)
func LoadInt32(addr *int32) (val int32)
func StoreInt32(addr *int32, val int32)
func SwapInt32(addr *int32, new int32) (old int32)
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)

下列四个原子操作函数提供给了(安全)指针类型。因为 Go 目前(1.17)并不支持自定义泛型,所以这些函数是通过 非类型安全指针 unsafe.Pointer 来实现的。

1
2
3
4
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)

因为 Go 指针不支持算术运算,所以相对于整数类型,指针类型的原子操作少了一个 AddPointer 函数。

sync/atomic 标准库包也提供了一个 Value 类型。以它为基的指针类型 *Value 拥有两个方法:LoadStoreValue 值用来原子读取和修改任何类型的 Go 值。

1
2
func (v *Value) Load() (x interface{})
func (v *Value) Store(x interface{})

本文的余下部分将通过一些示例来展示如何使用这些原子操作函数。

整数原子操作

下面这个例子展示了如何使用 add 原子操作来并发地递增一个 int32 值。 在此例子中,主协程中创建了 1000 个新协程。每个新协程将整数 n 的值增加 1。 原子操作保证这 1000 个新协程之间不会发生数据竞争。此程序肯定打印出 1000

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import (
	"fmt"
	"sync"
	"sync/atomic"
)

func main() {
	var n int32
	var wg sync.WaitGroup
	for i := 0; i < 1000; i++ {
		wg.Add(1)
		go func() {
			atomic.AddInt32(&n, 1)
			wg.Done()
		}()
	}
	wg.Wait()

	fmt.Println(atomic.LoadInt32(&n)) // 1000
}

如果我们将新协程中的语句 atomic.AddInt32(&n, 1) 替换为 n++,则最后的输出结果很可能不是 1000

StoreTLoadT 原子操作函数经常被用来需要并发运行的实现 setter 和 getter 方法。下面是一个这样的例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
type Page struct {
	views uint32
}

func (page *Page) SetViews(n uint32) {
	atomic.StoreUint32(&page.views, n)
}

func (page *Page) Views() uint32 {
	return atomic.LoadUint32(&page.views)
}
  • 如果 T 是一个有符号整数类型,比如 int32int64,则 AddT 函数调用的第二个实参可以是一个负数,用来实现原子减法操作。

  • 但是如果 T 是一个无符号整数类型,比如 uint32uint64 或者 uintptr,则 AddT 函数调用的第二个实参需要为一个非负数,那么如何实现无符号整数类型 T 值的原子减法操作呢? 毕竟 sync/atomic 标准库包没有提供 SubstractT 函数。 根据欲传递的第二个实参的特点,我们可以把 T 为一个无符号整数类型的情况细分为两类:

    1. 第二个实参为类型为 T 的一个变量值 v。 因为 -v 在 Go 中是合法的,所以 -v 可以直接被用做 AddT 调用的第二个实参。
    2. 第二个实参为一个正整数常量 c,这时 -c 在 Go 中是编译不通过的,所以它不能被用做 AddT 调用的第二个实参。 这时我们可以使用 ^T(c-1)(仍为一个正数)做为 AddT 调用的第二个实参。

^T(v-1) 小技巧对于无符号类型的变量 v 也是适用的,但是 ^T(v-1)T(-v) 的效率要低。

对于这个 ^T(c-1) 小技巧,如果 c 是一个类型确定值并且它的类型确实就是 T,则它的表示形式可以简化为 ^(c-1)

一个例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main

import (
	"fmt"
	"sync/atomic"
)

func main() {
	var (
		n uint64 = 97
		m uint64 = 1
		k int    = 2
	)
	const (
		a        = 3
		b uint64 = 4
		c uint32 = 5
		d int    = 6
	)

	show := fmt.Println
	atomic.AddUint64(&n, -m)
	show(n) // 96 (97 - 1)
	atomic.AddUint64(&n, -uint64(k))
	show(n) // 94 (95 - 2)
	atomic.AddUint64(&n, ^uint64(a - 1))
	show(n) // 91 (94 - 3)
	atomic.AddUint64(&n, ^(b - 1))
	show(n) // 87 (91 - 4)
	atomic.AddUint64(&n, ^uint64(c - 1))
	show(n) // 82 (87 - 5)
	atomic.AddUint64(&n, ^uint64(d - 1))
	show(n) // 76 (82 - 6)
	x := b; atomic.AddUint64(&n, -x)
	show(n) // 72 (76 - 4)
	atomic.AddUint64(&n, ^(m - 1))
	show(n) // 71 (72 - 1)
	atomic.AddUint64(&n, ^uint64(k - 1))
	show(n) // 69 (71 - 2)
}

SwapT 函数调用和 StoreT 函数调用类似,但是返回修改之前的旧值(因此称为置换操作)。

一个 CompareAndSwapT 函数调用传递的旧值和目标值的当前值匹配的情况下才会将目标值改为新值,并返回 true;否则立即返回 false

一个例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
package main

import (
	"fmt"
	"sync/atomic"
)

func main() {
	var n int64 = 123
	var old = atomic.SwapInt64(&n, 789)
	fmt.Println(n, old) // 789 123
	swapped := atomic.CompareAndSwapInt64(&n, 123, 456)
	fmt.Println(swapped) // false
	fmt.Println(n)       // 789
	swapped = atomic.CompareAndSwapInt64(&n, 789, 456)
	fmt.Println(swapped) // true
	fmt.Println(n)       // 456
}

请注意,到目前为止(Go 1.17),一个 64 位字(int64 或 uint64 值)的原子操作要求此 64 位字的内存地址必须是 8 字节对齐的。 请阅读 关于 Go 值的内存布局 一文获取详情。

指针值的原子操作

上面已经提到了 sync/atomic 标准库包为指针值的原子操作提供了四个函数,并且指针值的原子操作是通过非类型安全指针来实现的。

非类型安全指针 一文,我们得知,在 Go 中, 任何指针类型的值可以被显式转换为非类型安全指针类型 unsafe.Pointer,反之亦然。 所以指针类型 *unsafe.Pointer 的值也可以被显式转换为类型 unsafe.Pointer,反之亦然。

下面这个程序不是一个并发程序。它仅仅展示了如何使用指针原子操作。在这个例子中,类型 T 可以为任何类型。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package main

import (
	"fmt"
	"sync/atomic"
	"unsafe"
)

type T struct {x int}
var pT *T

func main() {
	var unsafePPT = (*unsafe.Pointer)(unsafe.Pointer(&pT))
	var ta, tb = T{1}, T{2}
	// 修改
	atomic.StorePointer(
		unsafePPT, unsafe.Pointer(&ta))
	fmt.Println(pT) // &{1}
	// 读取
	pa1 := (*T)(atomic.LoadPointer(unsafePPT))
	fmt.Println(pa1 == &ta) // true
	// 置换
	pa2 := atomic.SwapPointer(
		unsafePPT, unsafe.Pointer(&tb))
	fmt.Println((*T)(pa2) == &ta) // true
	fmt.Println(pT) // &{2}
	// 比较置换
	b := atomic.CompareAndSwapPointer(
		unsafePPT, pa2, unsafe.Pointer(&tb))
	fmt.Println(b) // false
	b = atomic.CompareAndSwapPointer(
		unsafePPT, unsafe.Pointer(&tb), pa2)
	fmt.Println(b) // true
}

是的,目前指针的原子操作使用起来是相当的啰嗦。 事实上,啰嗦还是次要的,更主要的是,因为指针的原子操作需要引入 unsafe 标准库包,所以这些操作函数不在 Go1兼容性保证 之列。

感觉目前支持的这些指针原子操作在今后变为不合法的可能性很小。 即使它们变得不再合法,Go 官方工具链中的 go fix 命令应该会将它们转换为今后的新的合法形式。

如果你确实担忧这些指针原子操作在未来的合法性,你可以使用下一节将要介绍的原子操作。 但是下一节将要介绍的原子操作对于指针值来说比本节介绍的指针原子操作效率要低得多。

任何类型值的原子操作

sync/atomic 标准库包中提供的 Value 类型可以用来读取和修改任何类型的值。

类型 *Value 有几个方法:LoadStoreSwapCompareAndSwap(其中后两个方法是在 Go 1.17 中引入的)。 这些方法均以 interface{} 做为参数类型,所以传递给它们的实参可以是任何类型的值。

但是对于一个可寻址的 Value 类型的值 v,一旦 v.Store 方法((&v).Store 的简写形式)被曾经调用一次,则传递给值 v 的后续方法调用的实参的具体类型必须和传递给它的第一次调用的实参的具体类型一致; 否则,将产生一个恐慌。nil 接口类型实参也将导致 v.Store() 方法调用产生恐慌。

一个例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
package main

import (
	"fmt"
	"sync/atomic"
)

func main() {
	type T struct {a, b, c int}
	var ta = T{1, 2, 3}
	var v atomic.Value
	v.Store(ta)
	var tb = v.Load().(T)
	fmt.Println(tb)       // {1 2 3}
	fmt.Println(ta == tb) // true

	v.Store("hello") // 将导致一个恐慌
}

另一个例子(针对 Go 1.17+):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
	"fmt"
	"sync/atomic"
)

func main() {
	type T struct {a, b, c int}
	var x = T{1, 2, 3}
	var y = T{4, 5, 6}
	var z = T{7, 8, 9}
	var v atomic.Value
	v.Store(x)
	fmt.Println(v) // {{1 2 3}}
	old := v.Swap(y)
	fmt.Println(v)       // {{4 5 6}}
	fmt.Println(old.(T)) // {1 2 3}
	swapped := v.CompareAndSwap(x, z)
	fmt.Println(swapped, v) // false {{4 5 6}}
	swapped = v.CompareAndSwap(y, z)
	fmt.Println(swapped, v) // true {{7 8 9}}
}

事实上,我们也可以使用上一节介绍的指针原子操作来对任何类型的值进行原子读取和修改,不过需要多一级指针的间接引用。 两种方法有各自的好处和缺点。在实践中需要根据具体需要选择合适的方法。

原子操作相关的内存顺序保证

为了便于理解和使用简单,Go 值的原子操作被设计的和内存顺序保证无关。 没有任何官方文档规定了原子操作应该保证的内存顺序。 详见 Go 中的内存顺序保证 一文对此情况的说明。

一些常见并发编程错误

Go 并不阻止程序员在并发编程中因为粗心或者经验不足而犯错。 本文的余下部分将展示一些常见的并发错误,来帮助 Go 程序员在实践中避免这些错误。

当需要同步的时候没有同步

我们已经知道,源文件中的代码行在运行时刻 并非总是按照它们的出现次序被执行

下面这个示例程序犯了两个错误:

  • 首先,主协程中对变量 b 的读取和匿名协程中的对变量 b 的写入可能会产生数据竞争;
  • 其次,在主协程中,条件 b == true 成立并不能确保条件 a != nil 也成立。 编译器和 CPU 可能会对 调整此程序中匿名协程中的某些指令的顺序 已获取更快的执行速度。 所以,站在主协程的视角看,对变量 b 的赋值可能会发生在对变量 a 的赋值之前,这将造成在修改 a 的元素时 a 依然为一个 nil 切片。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
	"time"
	"runtime"
)

func main() {
	var a []int // nil
	var b bool  // false

	// 一个匿名协程。
	go func () {
		a = make([]int, 3)
		b = true // 写入b
	}()

	for !b { // 读取b
		time.Sleep(time.Second)
		runtime.Gosched()
	}
	a[0], a[1], a[2] = 0, 1, 2 // 可能会发生恐慌
}

上面这个程序可能在很多计算机上运行良好,但是可能会在某些计算机上因为恐慌而崩溃退出;或者使用某些编译器编译的时候运行良好,但使用另外的某个编译器编译的时候将造成程序运行时崩溃退出。

我们应该使用通道或者 sync 标准库包中的同步技术来确保内存顺序。比如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
package main

func main() {
	var a []int = nil
	c := make(chan struct{})

	go func () {
		a = make([]int, 3)
		c <- struct{}{}
	}()

	<-c
	a[0], a[1], a[2] = 0, 1, 2 // 绝不会造成恐慌
}

使用 time.Sleep 调用来做同步

让我们看一个简单的例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
package main

import (
	"fmt"
	"time"
)

func main() {
	var x = 123

	go func() {
		x = 789 // 写入x
	}()

	time.Sleep(time.Second)
	fmt.Println(x) // 读取x
}

我们期望着此程序打印出 789。 事实上,则其运行结果常常正如我们所期待的。 但是,此程序中的同步处理实现的正确吗?否!原因很简单,Go 运行时并不能保证对 x 的写入一定发生在对 x 的读取之前。 在某些特定的情形下,比如 CPU 资源被很一些其它计算密集的程序所占用,则对 x 的写入有可能发生在对 x 的读取之后。 因此,我们不应该在正式的项目中使用 time.Sleep 调用来做同步。

让我们看另一个简单的例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
	"fmt"
	"time"
)

var x = 0

func main() {
	var num = 123
	var p = &num

	c := make(chan int)

	go func() {
		c <- *p + x
	}()

	time.Sleep(time.Second)
	num = 789
	fmt.Println(<-c)
}

你觉得此程序会输出什么?123 还是 789? 事实上,它的输出是和具体使用的编译器相关的。 对于标准编译器 1.17 版本来说,它很可能输出 123。 但是从理论上说,它输出 789 或者另外一个预想不到的值也是有可能的。

让我们将此例中的 c <- *p + x 一行换成 c <- *p,然后重新运行它,你将会发现它的输出变成了 789(如果它使用标准编译器 1.17 版本编译的话)。 重申一次,此结果是和具体使用的编译器和编译器的版本相关的。

是的,此程序中存在数据竞争。表达式 *p 的估值可能发生在赋值 num = 789 之前、之后、或者同时。 time.Sleep 调用并不能保证 *p 的估值发生在此赋值之后。

对于这个特定的例子,我们应该将欲发送的值在开启新协程之前存储在一个临时变量中来避免数据竞争。

1
2
3
4
5
6
...
tmp := *p
go func() {
    c <- tmp
}()
...

使一些协程永久处于阻塞状态

有很多原因导致某个协程永久阻塞,比如:

  • 从一个永远不会有其它协程向其发送数据的通道接收数据;
  • 向一个永远不会有其它协程从中读取数据的通道发送数据;
  • 被自己死锁了;
  • 和其它协程相互死锁了;

除了有时我们故意地将主协程永久阻塞以防止程序退出外,其它大多数造成协程永久阻塞的情况都不是我们所期待的。 Go 运行时很难分辨出一个处于阻塞状态的协程是否将永久阻塞下去,所以 Go 运行时不会释放永久处于阻塞状态的协程占用的资源。

采用最快回应 通道用例中,如果被当作 future/promise 来用的通道的容量不足够大,则较慢回应的协程在准备发送回应结果时将永久阻塞。 比如,下面的例子中,每个请求将导致 4 个协程永久阻塞。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func request() int {
	c := make(chan int)
	for i := 0; i < 5; i++ {
		i := i
		go func() {
			c <- i // 4个协程将永久阻塞在这里
		}()
	}
	return <-c
}

为了防止有 4 个协程永久阻塞,被当作 future/promise 使用的通道的容量必须至少为 4.

第二种“采用最快回应”实现方法 中,如果被当作 future/promise 使用的通道是一个非缓冲通道(如下面的代码所示),则有可能导致其通道的接收者可能会错过所有的回应而导致处于永久阻塞状态。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func request() int {
	c := make(chan int)
	for i := 0; i < 5; i++ {
		i := i
		go func() {
			select {
			case c <- i:
			default:
			}
		}()
	}
	return <-c // 有可能永久阻塞在此
}

接收者协程可能会永久阻塞的原因是如果 5 个尝试发送操作都发生在接收操作 <-c 准备好之前,亦即 5 个尝试发送操作都失败了,则接收者协程将永远无值可接收(从而将处于永久阻塞状态)。

将通道 c 改为一个缓冲通道,则至少会有一个尝试发送将成功,从而接收者协程肯定不会永久阻塞。

复制 sync 标准库包中的类型的值

在实践中,sync 标准库包中的类型(除了 Locker 接口类型)的值不应该被复制。 我们只应该复制它们的指针值。

下面是一个有问题的并发编程的例子。 在此例子中,当 Counter.Value 方法被调用时,一个 Counter 属主值将被复制,此属主值的字段 Mutex 也将被一同复制。 此复制并没有被同步保护,因此复制结果可能是不完整的,并非被复制的属主值的一个快照。 即使此 Mutex 字段得以侥幸完整复制,它的副本所保护的是对字段 n 的一个副本的访问,因此一般是没有意义的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import "sync"

type Counter struct {
	sync.Mutex
	n int64
}

// 此方法实现是没问题的。
func (c *Counter) Increase(d int64) (r int64) {
	c.Lock()
	c.n += d
	r = c.n
	c.Unlock()
	return
}

// 此方法的实现是有问题的。当它被调用时,
// 一个Counter属主值将被复制。
func (c Counter) Value() (r int64) {
	c.Lock()
	r = c.n
	c.Unlock()
	return
}

我们应该将 Value 方法的属主参数类型更改为指针类型 *Counter 来避免复制 sync.Mutex 值。

Go 官方工具链中提供的 go vet 命令将提示此例中的 Value 方法的声明可能是一个潜在的逻辑错误。

在错误的地方调用 sync.WaitGroup.Add 方法

每个 sync.WaitGroup 值内部维护着一个计数。此计数的初始值为 0。 如果一个 sync.WaitGroup 值的 Wait 方法在此计数为 0 的时候被调用,则此调用不会阻塞,否则此调用将一直阻塞到此计数变为 0 为止。

为了让一个 WaitGroup 值的使用有意义,在此值的计数为 0 的情况下,对它的下一次 Add 方法的调用必须出现在对它的下一次 Wait 方法的调用之前。

比如,在下面的例子中,Add 方法的调用位置是不合适的。 此例子程序的打印结果并不总是 100,而可能是 0100 间的任何一个值。 原因是没有任何一个 Add 方法调用可以确保发生在唯一的 Wait 方法调用之前,结果导致没有任何一个 Done 方法调用可以确保发生在唯一的 Wait 方法调用返回之前。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
	"fmt"
	"sync"
	"sync/atomic"
)

func main() {
	var wg sync.WaitGroup
	var x int32 = 0
	for i := 0; i < 100; i++ {
		go func() {
			wg.Add(1)
			atomic.AddInt32(&x, 1)
			wg.Done()
		}()
	}

	fmt.Println("等待片刻...")
	wg.Wait()
	fmt.Println(atomic.LoadInt32(&x))
}

我们应该将对 Add 方法的调用移出匿名协程之外,像下面这样,使得任何一个 Done 方法调用都确保发生在唯一的 Wait 方法调用返回之前。

1
2
3
4
5
6
7
8
9
...
	for i := 0; i < 100; i++ {
		wg.Add(1)
		go func() {
			atomic.AddInt32(&x, 1)
			wg.Done()
		}()
	}
...

不当地使用用做 Future/Promise 的通道

通道用例大全 一文中,我们了解到一些函数可以返回用做 future/promise 的通道结果。 假设 fafb 是这样的两个函数,则下面的调用方式并没有体现出这两个函数的真正价值。

1
doSomethingWithFutureArguments(<-fa(), <-fb())

在上面这行调用中,两个实参值(promise 回应结果)的生成实际上是串行进行的,future/promise 的价值没有体现出来。

我们应该像下面这样调用这两个函数来并发生成两个回应结果:

1
2
ca, cb := fa(), fb()
doSomethingWithFutureArguments(<-ca, <-cb)

没有让最后一个活跃的发送者关闭通道

Go 程序员常犯的一个错误是关闭一个后续可能还会有协程向其发送数据的通道。 当向一个已关闭的通道发送数据的时候,一个恐慌将产生。

这样的错误曾经发生在一些很有名的项目中,比如 Kubernetes 项目中的 这个 bug这个 bug

对地址不保证为 8 字节对齐的值执行 64 位原子操作

截至目前(Go 1.17),64 位原子操作中涉及到的实参地址必须为 8 字节对齐的。不满足此条件的 64 位原子操作将造成一个恐慌。 对于标准编译器,这样的情形只 可能发生在 32 位的架构中。 请阅读 内存布局一文 来获知如何确保让 64 位的整数值的地址在 32 位的架构中 8 字节对齐。

没留意过多的 time.After 函数调用消耗了大量资源

time 标准库包中的 After 函数返回 一个用做延迟通知的通道。 此函数给并发编程带来了很多便利,但是它的每个调用都需要创建一个 time.Timer 值,此新创建的 Timer 值在传递给 After 函数调用的时长(实参)内肯定不会被垃圾回收。 如果此函数在某个时段内被多次频繁调用,则可能导致积累很多尚未过期的 Timer 值从而造成大量的内存和计算消耗。

比如在下面这个例子中,如果 longRunning 函数被调用并且在一分钟内有一百万条消息到达, 那么在某个特定的很小时间段(大概若干秒)内将存在一百万个活跃的 Timer 值,即使其中只有一个是真正有用的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import (
	"fmt"
	"time"
)

// 如果某两个连续的消息的间隔大于一分钟,此函数将返回。
func longRunning(messages <-chan string) {
	for {
		select {
		case <-time.After(time.Minute):
			return
		case msg := <-messages:
			fmt.Println(msg)
		}
	}
}

为了避免太多的 Timer 值被创建,我们应该只使用(并复用)一个 Timer 值,像下面这样:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
func longRunning(messages <-chan string) {
	timer := time.NewTimer(time.Minute)
	defer timer.Stop()

	for {
		select {
		case <-timer.C: // 过期了
			return
		case msg := <-messages:
			fmt.Println(msg)

			// 此if代码块很重要。
			if !timer.Stop() {
				<-timer.C
			}
		}

		// 必须重置以复用。
		timer.Reset(time.Minute)
	}
}

注意,此示例中的 if 代码块用来舍弃一个可能在执行第二个分支代码块的时候发送过来的超时通知。

不正确地使用 time.Timer

一个典型的 time.Timer 的使用已经在上一节中展示了。一些解释:

  • 如果一个 Timer 值已经过期或者已经被终止(stopped),则相应的 Stop 方法调用返回 false。 在此 Timer 值尚未终止的时候,Stop 方法调用返回 false 只能意味着此 Timer 值已经过期。
  • 一个 Timer 值被终止之后,它的通道字段 C 最多只能含有一个过期的通知。
  • 在一个 Timer 终止(stopped)之后并且在重置和重用此 Timer 值之前,我们应该确保此 Timer 值中肯定不存在过期的通知。 这就是上一节中的例子中的 if 代码块的意义所在。

一个 *Timer 值的 Reset 方法必须在对应 Timer 值过期或者终止之后才能被调用; 否则,此 Reset 方法调用和一个可能的向此 Timer 值的 C 通道字段的发送通知操作产生数据竞争。

如果上一节中的例子中的 select 流程控制代码块中的第一个分支被选中,则这表示相应的 Timer 值已经过期,所以我们不必终止它。 但是我们必须在第二个分支中通过终止此 Timer 以检查此 Timer 中是否存在一个过期的通知。 如果确实有一个过期的通知,我们必须在重用这个 Timer 之前将此过期的通知取出;否则,此过期的通知将下一个循环步导致在第一个分支立即被选中。

比如,下面这个程序将在运行后大概一秒钟(而不是十秒钟)后退出。 而且此程序存在着潜在的数据竞争。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
package main

import (
	"fmt"
	"time"
)

func main() {
	start := time.Now()
	timer := time.NewTimer(time.Second/2)
	select {
	case <-timer.C:
	default:
		time.Sleep(time.Second) // 此分支被选中的可能性较大
	}
	timer.Reset(time.Second * 10) // 可能数据竞争
	<-timer.C
	fmt.Println(time.Since(start)) // 大约1s
}

当一个 time.Timer 值不再被使用后,我们不必(但是推荐)终止之。

在多个协程中使用同一个 time.Timer 值比较容易写出不当的并发代码,所以尽量不要跨协程使用一个 Timer 值。

我们不应该依赖于 time.TimerReset 方法的返回值。此返回值只要是为了历史兼容性而存在的。