建议搭配Go并发编程一起使用。

锁是一种并发编程中的同步原语(Synchronization Primitives),它能保证多个 Goroutine 在访问同一片内存时不会出现竞争条件(Race condition)等问题。

目录

  • context
  • channel

上下文Context

context是 goroutine 的上下文,包含 goroutine 的运行状态、环境、现场等信息。主要用来在 goroutine 之间传递上下文信息,包括:取消信号、超时时间、截止时间、k-v 等。

context.Context 类型的值可以协调多个 groutine 中的代码执行“取消”操作,并且可以存储键值对。最重要的是它是并发安全的。
与它协作的 API 都可以由外部控制执行“取消”操作,例如:取消一个 HTTP 请求的执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
type Context interface {
// 当 context 被取消或者到了 deadline,返回一个被关闭的 channel
Done() <-chan struct{}

// 在 channel Done 关闭后,返回 context 取消原因
Err() error

// 返回 context 是否会被取消以及自动取消时间(即 deadline)
Deadline() (deadline time.Time, ok bool)

// 获取 key 对应的 value
Value(key interface{}) interface{}
}

例如:使用Context实现父进程监视子进程自增到5,并关闭的效果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func gen(ctx context.Context) <-chan int {
ch := make(chan int)
go func() {
var n int
for {
select {
case <-ctx.Done():
return
case ch <- n:
n++
time.Sleep(time.Second)
}
}
}()
return ch
}

func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // 避免其他地方忘记 cancel,且重复调用不影响
for n := range gen(ctx) {
fmt.Println(n)
if n == 5 {
cancel()
break
}
}
}

创建Context的四个方式

1
2
3
4
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
func WithValue(parent Context, key, val interface{}) Context

创建context的四点意见

  1. 不要将 Context 塞到结构体里。直接将 Context 类型作为函数的第一参数,而且一般都命名为 ctx。
  2. 不要向函数传入一个 nil 的 context,如果你实在不知道传什么,标准库给你准备好了一个 context:todo。
  3. 不要把本应该作为函数参数的类型塞到 context 中,context 存储的应该是一些共同的数据。例如:登陆的 session、cookie 等。
  4. 同一个 context 可能会被传递到多个 goroutine,别担心,context 是并发安全的。

传递共享数据

通过context设置和获取key Value。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func main() {
ctx := context.Background()
process(ctx)

ctx = context.WithValue(ctx, "traceId", "qcrao-2019")
process(ctx)
}

func process(ctx context.Context) {
traceId, ok := ctx.Value("traceId").(string)
if ok {
fmt.Printf("process over. trace_id=%s\n", traceId)
} else {
fmt.Printf("process over. no trace_id\n")
}
}

取消 goroutine

假设我们需要让perform进程一直执行某任务,直到主进程关闭为止。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func main(){
ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
go Perform(ctx)
// ……
// app 端返回页面,调用cancel 函数
cancel()
}

func Perform(ctx context.Context) {
for {
doWork()
select {
case <-ctx.Done():
// 被取消,直接返回
return
case <-time.After(time.Second):
// block 1 秒钟 继续检查
}
}
}

超时退出

假设我们需要为一个任务设置超时时间,超时就关闭,也可以手动调用cancel()结束进程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func main(){
ctx, cancel := context.WithTimeout(context.Background(), time.Hour) // 在此处设置超时时间
go Perform(ctx)
}

func Perform(ctx context.Context) {
for {
doWork()
select {
case <-ctx.Done():
// 被取消,直接返回
return
case <-time.After(time.Second):
// block 1 秒钟 继续检查
}
}
}

同步原语与锁

如下会介绍 Go 语言中常见的同步原语 sync.Mutexsync.RWMutexsync.WaitGroupsync.Oncesync.Cond 以及扩展原语 golang/sync/errgroup.Groupgolang/sync/semaphore.Weightedgolang/sync/singleflight.Group 的实现原理,同时也会涉及互斥锁、信号量等并发编程中的常见概念。它们是一种相对原始的同步机制,在多数情况下,我们都应该使用抽象层级更高的 Channel 实现同步。

Mutex

Go 语言中的互斥锁。 sync.Mutex 由两个字段 statesema 组成。其中 state 表示当前互斥锁的状态,而 sema 是用于控制锁状态的信号量。

1
2
3
4
type Mutex struct {
state int32
sema uint32
}

Mutex拥有四种状态,Mutex会在饥饿模式和正常模式间切换,综合效率和公平:

  • mutexLocked — 表示互斥锁的锁定状态;
  • mutexWoken — 表示从正常模式被从唤醒;
  • mutexStarving — 当前的互斥锁进入饥饿状态;
  • waitersCount — 当前互斥锁上等待的 Goroutine 个数;

与饥饿模式相比,正常模式下的互斥锁能够提供更好地性能,饥饿模式的能避免 Goroutine 由于陷入等待无法获取锁而造成的高尾延时。

  • 加锁与解锁

    1
    2
    3
    4
    5
    var mu sync.Mutex

    mu.Lock()
    xxxx
    mu.Unlock

RWMutex

读写互斥锁 sync.RWMutex 是细粒度的互斥锁,它不限制资源的并发读,但是读写、写写操作无法并行执行。

Y N
N N
1
2
3
4
5
6
7
type RWMutex struct {
w Mutex
writerSem uint32
readerSem uint32
readerCount int32
readerWait int32
}
  • w — 复用互斥锁提供的能力;
  • writerSemreaderSem — 分别用于写等待读和读等待写:
  • readerCount 存储了当前正在执行的读操作数量;
  • readerWait 表示当写操作被阻塞时等待的读操作个数;

我们会依次分析获取写锁和读锁的实现原理,其中:

调用sync.RWMutex.Lock尝试获取写锁时;

  • 每次 sync.RWMutex.RUnlock 都会将 readerCount 其减一,当它归零时该 Goroutine 会获得写锁;
  • readerCount 减少 rwmutexMaxReaders 个数以阻塞后续的读操作;

调用 sync.RWMutex.Unlock 释放写锁时,会先通知所有的读操作,然后才会释放持有的互斥锁;

WaitGroup

sync.WaitGroup 可以等待一组 Goroutine 的返回,一个比较常见的使用场景是批量发出 RPC 或者 HTTP 请求:

1
2
3
4
5
6
7
8
9
10
11
requests := []*Request{...}
wg := &sync.WaitGroup{}
wg.Add(len(requests))

for _, request := range requests {
go func(r *Request) {
defer wg.Done()
// res, err := service.call(r)
}(request)
}
wg.Wait()

Once

sync.Once 可以保证在 Go 程序运行期间的某段代码只会执行一次。在运行如下所示的代码时,我们会看到如下所示的运行结果:

1
2
3
4
5
6
7
8
func main() {
o := &sync.Once{}
for i := 0; i < 10; i++ {
o.Do(func() {
fmt.Println("only once")
})
}
}

Cond

sync.Cond,它可以让一组的 Goroutine 都在满足特定条件时被唤醒。每一个 sync.Cond 结构体在初始化时都需要传入一个互斥锁

总结一下sync.Mutex的大致用法

  1. 首先声明一个mutex,这里sync.Mutex/sync.RWMutex可根据实际情况选用
  2. 调用sync.NewCond(l Locker) *Cond 使用1中的mutex作为入参 注意 这里传入的是指针 为了避免c.L.Lock()c.L.Unlock()调用频繁复制锁 导致死锁
  3. 根据业务条件 满足则调用cond.Wait()挂起goroutine
  4. cond.Broadcast()唤起所有挂起的gorotune 另一个方法cond.Signal()唤醒一个最先挂起的goroutine

需要注意的是cond.wait()的使用需要参照如下模版 具体为啥我们后续分析

1
2
3
4
5
6
 c.L.Lock()
for !condition() {
c.Wait()
}
... make use of condition ...
c.L.Unlock()

如:唤醒等待队列中的所有goroutine,Singal一个一个唤醒,Broadcast广播全体唤醒。此案例中未涉及wait的条件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

func main() {
locker := new(sync.Mutex)
cond := sync.NewCond(locker)

for i := 0 ; i < 30 ; i++ {
go func(x int) {
cond.L.Lock()
fmt.Println(x," 获取锁")
defer cond.L.Unlock()
cond.Wait()
fmt.Println(x," 被唤醒")
time.Sleep(time.Second)
}(i)
}


time.Sleep(time.Second)
fmt.Println("Signal...")
cond.Signal()
time.Sleep(time.Second)
cond.Signal()
time.Sleep(time.Second*3)
cond.Broadcast()

fmt.Println("Broadcast...")
time.Sleep(time.Minute)
}

Channel

线程间用于通讯的管道,利用channel+select类似Cond,同理也可以实现像WaitGroup的功能…

详情参见Go并发一文。

总结:

本文未对并发做更深刻的了解,只是初略了解其用法。虽然Go提供了许多的并发管理工具,但是如果细心思考,都是想通的在很多情况下,用各种工具都能实现相似的功能,所以这要求我们要熟悉这些工具在各个场景下的应用。至于原理只能深挖的时候再说了。。。

参考:

Go 语言设计与实现