Go并发编程--基本并发原语Mutex
1.Mutex:如何解决资源并发访问问题
见golang-concurrency总结1
2.Mutex:庖丁解牛看实现
初版的互斥锁
Russ Cox 在 2008 年提交的第一版 Mutex:
// CAS操作,当时还没有抽象出atomic包
func cas(val *int32, old, new int32) bool
func semacquire(*int32)
func semrelease(*int32)
// 互斥锁的结构,包含两个字段
type Mutex struct {
key int32 // 锁是否被持有的标识
sema int32 // 信号量专用,用以阻塞/唤醒goroutine
}
// 保证成功在val上增加delta的值
func xadd(val *int32, delta int32) (new int32) {
for {
v := *val
if cas(val, v, v+delta) {
return v + delta
}
}
panic("unreached")
}
// 请求锁
func (m *Mutex) Lock() {
if xadd(&m.key, 1) == 1 { //标识加1,如果等于1,成功获取到锁
return
}
semacquire(&m.sema) // 否则阻塞等待
}
func (m *Mutex) Unlock() {
if xadd(&m.key, -1) == 0 { // 将标识减去1,如果等于0,则没有其它等待者
return
}
semrelease(&m.sema) // 唤醒其它阻塞的goroutine
}
CAS:
CAS 指令将给定的值和一个内存地址中的值进行比较,如果它们是同一个值,就使用新值替换内存地址中的值,这个操作是原子性的。
原子性保证这个指令总是基于最新的值进行计算,如果同时有其它线程已经修改了这个值,那么,CAS 会返回失败。
type Foo struct {
mu sync.Mutex
count int
}
func (f *Foo) Bar() {
f.mu.Lock()
if f.count < 1000 {
f.count += 3
f.mu.Unlock() // 此处释放锁
return
}
f.count++
f.mu.Unlock() // 此处释放锁
return
}
从 1.14 版本起,Go 对 defer 做了优化,采用更有效的内联方式,取代之前的生成 defer 对象到 defer chain 中,defer 对耗时的影响微乎其微了,所以基本上修改成下面简洁的写法也没问题:
func (f *Foo) Bar() {
f.mu.Lock()
defer f.mu.Unlock()
if f.count < 1000 {
f.count += 3
return
}
f.count++
return
}
初版的 Mutex 实现有一个问题:请求锁的 goroutine 会排队等待获取互斥锁。 虽然这貌似很公平,但是从性能上来看,却不是最优的。 如果能够把锁交给正在占用 CPU 时间片的 goroutine 的话,那就不需要做上下文的切换, 在高并发的情况下,可能会有更好的性能。
cas和非公平锁的目的都是为了减少线程的上下文的切换
给新人机会
Mutex结构:
type Mutex struct {
state int32
sema uint32
}
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexWaiterShift = iota
)
Lock实现:
func (m *Mutex) Lock() {
// Fast path: 幸运case,能够直接获取到锁
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
awoke := false
for {
old := m.state
new := old | mutexLocked // 新状态加锁
if old&mutexLocked != 0 {
new = old + 1<<mutexWaiterShift //等待者数量加一
}
if awoke {
// goroutine是被唤醒的,
// 新状态清除唤醒标志
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {//设置新状态
if old&mutexLocked == 0 { // 锁原状态未加锁
break
}
runtime.Semacquire(&m.sema) // 请求信号量
awoke = true
}
}
}
Unlock实现:
func (m *Mutex) Unlock() {
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked) //去掉锁标志
if (new+mutexLocked)&mutexLocked == 0 { //本来就没有加锁
panic("sync: unlock of unlocked mutex")
}
old := new
for {
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 { // 没有等待者,或者有唤醒的waiter,或者锁原来已加锁
return
}
new = (old - 1<<mutexWaiterShift) | mutexWoken // 新状态,准备唤醒goroutine,并设置唤醒标志
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime.Semrelease(&m.sema)
return
}
old = m.state
}
}
多给些机会
func (m *Mutex) Lock() {
// Fast path: 幸运之路,正好获取到锁
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
awoke := false
iter := 0
for { // 不管是新来的请求锁的goroutine, 还是被唤醒的goroutine,都不断尝试请求锁
old := m.state // 先保存当前锁的状态
new := old | mutexLocked // 新状态设置加锁标志
if old&mutexLocked != 0 { // 锁还没被释放
if runtime_canSpin(iter) { // 还可以自旋
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
continue // 自旋,再次尝试请求锁
}
new = old + 1<<mutexWaiterShift
}
if awoke { // 唤醒状态
if new&mutexWoken == 0 {
panic("sync: inconsistent mutex state")
}
new &^= mutexWoken // 新状态清除唤醒标记
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&mutexLocked == 0 { // 旧状态锁已释放,新状态成功持有了锁,直接返回
break
}
runtime_Semacquire(&m.sema) // 阻塞等待
awoke = true // 被唤醒
iter = 0
}
}
}
解决饥饿
Mutex 绝不容忍一个 goroutine 被落下,永远没有机会获取锁。不抛弃不放弃是它的宗旨,而且它也尽可能地让等待较长的 goroutine 更有机会获取到锁。
type Mutex struct {
state int32
sema uint32
}
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving // 从state字段中分出一个饥饿标记
mutexWaiterShift = iota
starvationThresholdNs = 1e6
)
func (m *Mutex) Lock() {
// Fast path: 幸运之路,一下就获取到了锁
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
// Slow path:缓慢之路,尝试自旋竞争或饥饿状态下饥饿goroutine竞争
m.lockSlow()
}
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false // 此goroutine的饥饿标记
awoke := false // 唤醒标记
iter := 0 // 自旋次数
old := m.state // 当前的锁的状态
for {
// 锁是非饥饿状态,锁还没被释放,尝试自旋
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state // 再次获取锁的状态,之后会检查是否锁被释放了
continue
}
new := old
if old&mutexStarving == 0 {
new |= mutexLocked // 非饥饿状态,加锁
}
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift // waiter数量加1
}
if starving && old&mutexLocked != 0 {
new |= mutexStarving // 设置饥饿状态
}
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken // 新状态清除唤醒标记
}
// 成功设置新状态
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 原来锁的状态已释放,并且不是饥饿状态,正常请求到了锁,返回
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// 处理饥饿状态
// 如果以前就在队列里面,加入到队列头
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 阻塞等待
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 唤醒之后检查锁是否应该处于饥饿状态
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// 如果锁已经处于饥饿状态,直接抢到锁,返回
if old&mutexStarving != 0 {
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 有点绕,加锁并且将waiter数减1
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving // 最后一个waiter或者已经不饥饿了,清除饥饿标记
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
}
func (m *Mutex) Unlock() {
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
m.unlockSlow(new)
}
}
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
old := new
for {
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
runtime_Semrelease(&m.sema, true, 1)
}
}
3.Mutex:4种易错场景
3.1 Lock/Unlock不是成对出现
1.代码中有太多的if-else分支,可能在某个分支中漏写了Unlock
2.在重构时把Unlock给删除了;
3.Unlock误写成了Lock;
3.2 Copy已使用的Mutex
1.死锁的检查机制checkdead()方法;
2.vet工具;
3.3 重入
Mutex是不可重入的锁
两个实现可重入锁的方案
1.goroutine方案
2.token方案
3.4 死锁
死锁产生的4个条件
互斥–持有和等待–不可剥夺–环路等待
如何检测
1.死锁探测功能
2.引入一个第三方的锁
4.Mutex:如何扩展额外功能
实现TryLock
获取等待者的数量等指标
使用Mutex实现一个线程安全的队列
5.RWMutex:读写锁的实现原理及避坑指南
refs
https://time.geekbang.org/column/article/296541