GO并发基础实践

此文只记录GO并发的理论,及对IO的并发实践。同步与锁参见Go同步一文。

基础理论

CSP:通信顺序进程,是 Go 语言采用的并发同步模型,是一种形式语言,用来描述并发系统间进行交互的模式。

Actor 模型:参与者模型采用了 everything is an actor 哲学。所有参与者都是独立的运行单元,参与者可以修改自己的私有状态,参与者之间只能通过发送消息通信(避免任何锁的使用)。

goroutine

一个 goroutine 是一个轻量级的可独立工作的单元。通过 go 语句启动。goroutine 有自己独立的调用栈,会按需伸缩,可以并发调度大量 goroutine,每个 goroutine 自身大约占用 4K 空间。可以类比为非常轻量级的线程,例如协程。一个程序可能只有一个线程,但是会有大量 goroutine 被调度到这个线程上。

操作系统在物理处理器上调度线程来运行,Go 语言为每个物理处理器分配一个逻辑处理器,Go 语言的运行时会在逻辑处理器上调度 goroutine 来运行。

goroutine

阻塞系统调用(如打开文件),goroutine 会从逻辑处理器上分离,线程继续阻塞,等待调用返回。调度器创建一个新的线程,继续绑定到该逻辑处理器上,然后调度器从本地队列中选择另一个 goroutine 来运行。一旦被阻塞的系统调用执行完成并返回,对应的 goroutine 会放回本地本地运行队列,之前的线程被保存好,以便之后可以继续使用。

如果一个 goroutine 需要做网络 I/O 调用,goroutine 会和逻辑处理器分离,并移到集成了网络轮询器的运行时,该轮询器指示某个网络读或者写操作已经就绪,对应的 goroutine 就会重新分配到逻辑处理器上来完成操作。 goroutine 可以被停止并重新调度。

修改逻辑处理器个数,可以通过下面的相关函数设置。但是使用多个逻辑处理器不一定有更好的性能,在修改任何语言运行时配置参数的时候, 都需要配合基准测试来评估程序的运行效果。

1
2
3
runtime.NumCPU()       // 获得 CPU 个数
runtime.GOMAXPROCS(1) // 设置最大可用的 CPU 个数
runtime.NumGoroutine() // 获得当前 goroutine 的个数

基本用法

  • 使用go命令开启一条新的运行时线程, 即 goroutine。 同一个程序中的所有 goroutine 共享同一个地址空间。当该函数执行结束,Goroutine也随之隐式退出。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    func say(s string) {
    for i := 0; i < 5; i++ {
    time.Sleep(100 * time.Millisecond)
    fmt.Println(s)
    }
    }

    func main() {
    go say("world")
    say("hello")
    }
  • Channel可用于两个 goroutine 之间通过传递一个指定类型的值来同步运行和通讯。操作符 <- 用于指定通道的方向,发送或接收。如果未指定方向,则为双向通道。

    1
    2
    3
    4
    5
    6
    ch := make(chan int)
    ch <- v // 把 v 发送到通道 ch
    v := <-ch // 从 ch 接收数据
    // 并把值赋给 v

    需要注意的是:chan 后面可以跟任意类型表示一个type类型的通道。

    如,使用channel返回线程运行的结果。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    func main() {
    ch := make(chan int)
    go say(1,ch)
    go say(2,ch)
    a,b := <- ch,<-ch
    println(a,b)
    }

    func say(n int, ch chan int) {
    ch <- n*5
    }

    Channel可以设置缓冲区,通过 make 的第二个参数指定缓冲区大小:

    1
    ch := make(chan int, 100)

    需要注意的是:不带缓冲的channel是同步的channel,如果通道不带缓冲,发送方会阻塞直到接收方从通道中接收了值!导致陷入死锁。fatal error: all goroutines are asleep - deadlock!

    而带缓冲的channel运行我们发送的数据可以放在缓冲区里面,实现异步操作。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    func main() {
    // 这里我们定义了一个可以存储整数类型的带缓冲通道
    // 缓冲区大小为2
    ch := make(chan int, 2)

    // 因为 ch 是带缓冲的通道,我们可以同时发送两个数据
    // 而不用立刻需要去同步读取数据
    ch <- 1
    ch <- 2

    // 获取这两个数据
    fmt.Println(<-ch)
    fmt.Println(<-ch)
    }
  • Channel也可以配合range实现遍历,类似于与数组或切片,也可以关闭。

    1
    v, ok := <-ch
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    func fibonacci(n int, c chan int) {
    x, y := 0, 1
    for i := 0; i < n; i++ {
    c <- x
    x, y = y, x+y
    }
    close(c)
    }

    func main() {
    c := make(chan int, 10)
    go fibonacci(cap(c), c)
    // range 函数遍历每个从通道接收到的数据,因为 c 在发送完 10 个
    // 数据之后就关闭了通道,所以这里我们 range 函数在接收到 10 个数据
    // 之后就结束了。如果上面的 c 通道不关闭,那么 range 函数就不
    // 会结束,从而在接收第 11 个数据的时候就阻塞了。
    for i := range c {
    fmt.Println(i)
    }
    }
  • 值得注意的是,除了异步传输数据:带缓冲区的channel可以像信号量一样使用,用来完成诸如吞吐率限制等功能。该channel的缓冲区容量决定了并发调用process函数的上限,因此在channel初始化时,需要传入相应的容量参数。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    var sem = make(chan int, MaxOutstanding)

    func handle(r *Request) {
    <-sem // 等待分配信号量
    process(r) // 执行task
    sem <- 1 // Done; 返回信号量
    }

    func init() {
    for i := 0; i < MaxOutstanding; i++ {
    sem <- 1 // 初始化分配信号量
    }
    }

    func Serve(queue chan *Request) {
    for {
    req := <-queue
    go handle(req) // Don't wait for handle to finish.
    }
    }
  • 闭包 — 将“函数文本”(function literals)嵌入到一个Goroutine创建之际。保证了在这类函数中被引用的变量在函数结束之前不会被释放。

    1
    2
    3
    4
    5
    6
    func Announce(message string, delay time.Duration) {
    go func() {
    time.Sleep(delay)
    fmt.Println(message)
    }() // Note the parentheses - must call the function.
    }
  • 将上述的信号量模型,改用闭包实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    var sem = make(chan int, MaxOutstanding)
    func init() {
    for i := 0; i < MaxOutstanding; i++ {
    sem <- 1 // 初始化分配信号量
    }
    }
    func Serve(queue chan *Request) {
    for req := range queue {
    <-sem
    go func() {
    process(req) // Buggy; see explanation below.
    sem <- 1
    }()
    }
    }

    需要注意的是,此处的req变量会在所有Goroutine间共享。这样不好,推荐将req设置为私有。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    func Serve(queue chan *Request) {
    for req := range queue {
    <-sem
    go func(req *Request) {
    process(req)
    sem <- 1
    }(req)
    }
    }
  • select 是 Go 中的一个控制结构,类似于用于通信的 switch 语句。每个 case 必须是一个通信操作,要么是发送要么是接收。

    select 随机执行一个可运行的 case。如果没有 case 可运行,它将阻塞,直到有 case 可运行。一个默认的子句应该总是可运行的。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    func main() {
    var c1, c2, c3 chan int
    var i1, i2 int
    select {
    case i1 = <-c1:
    fmt.Printf("received ", i1, " from c1\n")
    case c2 <- i2:
    fmt.Printf("sent ", i2, " to c2\n")
    case i3, ok := (<-c3): // same as: i3, ok := <-c3
    if ok {
    fmt.Printf("received ", i3, " from c3\n")
    } else {
    fmt.Printf("c3 is closed\n")
    }
    default:
    fmt.Printf("no communication\n")
    }
    }
  • 除了并发模型外,channel和select还可以结合实现一些非并发的想法,如:一个客户端从某些源接收数据,为了避免频繁的内存分配,实现类似连接池的功能。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    var freeList = make(chan *Buffer, 100)
    var serverChan = make(chan *Buffer)

    func client() {
    for {
    var b *Buffer
    // Grab a buffer if available; allocate if not.
    select {
    case b = <-freeList:
    // Got one; nothing more to do.
    default:
    // None free, so allocate a new one.
    b = new(Buffer)
    }
    load(b) // Read next message from the net.
    serverChan <- b // Send to server.
    }
    }

    服务器端循环从客户端接收并处理每个消息,然后将Buffer对象返回到空闲链表中。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    func server() {
    for {
    b := <-serverChan // Wait for work.
    process(b)
    // Reuse buffer if there's room.
    select {
    case freeList <- b:
    // Buffer on free list; nothing more to do.
    default:
    // Free list full, just carry on.
    }
    }
    }

    客户端会尝试从空闲链表freeList中获取Buffer对象;如果没有可用对象,则分配一个新的。服务器端会将用完的Buffer对象 b 加入到空闲链表freeList中,如果链表已满,则将b丢弃,垃圾收集器会在未来某个时刻自动回收对应的内存单元。

  • WaitGroup 其实是属于Go并发锁的内容,主进程wait子进程执行完成后再执行!

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    func main() {
    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
    // wg.Add(1) 注意将add放在go协程内,容易造成add未执行,而父进程wait已经成功执行!
    fmt.Println("halo world start")
    time.Sleep(time.Second * 5)
    fmt.Println("halo world end")
    wg.Done()
    }()
    wg.Wait()
    }

并发获取API内容

1
2
3
4
5
6
7
8
9
10
11
12
13
func TestApi(t *testing.T) {
var wg sync.WaitGroup
for i := 0; i <10;i++{
wg.Add(1)
go func(i int) {
defer wg.Done()
res,_ := http.Get("https://api.uomg.com/api/rand.qinghua")
body,_ := ioutil.ReadAll(res.Body)
println(string(body))
}(i)
}
wg.Wait()
}

参考:

菜鸟教程

Effective go