golang基于Mutex实现可重入锁

来自:网络
时间:2024-06-07
阅读:

golang基于Mutex实现可重入锁

为什么需要可重入锁

我们平时说的分布式锁,一般指的是在不同服务器上的多个线程中,只有一个线程能抢到一个锁,从而执行一个任务。而我们使用锁就是保证一个任务只能由一个线程来完成。所以我们一般是使用这样的三段式逻辑:

Lock();
DoJob();
Unlock();

但是由于我们的系统都是分布式的,这个锁一般不会只放在某个进程中,我们会借用第三方存储,比如 Redis 来做这种分布式锁。但是一旦借助了第三方存储,我们就必须面对这个问题:Unlock是否能保证一定运行呢?

这个问题,我们面对的除了程序的bug之外,还有网络的不稳定,进程被杀死,服务器被down机等。我们是无法保证Unlock一定被运行的。

那么我们就一般在Lock的时候为这个锁加一个超时时间作为兜底。

LockByExpire(duration);
DoJob();
Unlock();

这个超时时间是为了一旦出现异常情况导致Unlock没有被运行,这个锁在duration时间内也会被自动释放。这个在redis中我们一般就是使用set ex 来进行锁超时的设定。

但是有这个超时时间我们又遇上了问题,超时时间设置多久合适呢?当然要设置的比 DoJob 消耗的时间更长,否则的话,在任务还没结束的时候,锁就被释放了,还是有可能导致并发任务的存在。

但是实际上,同样由于网络超时问题,系统运行状况问题等,我们是无法准确知道DoJob这个函数要执行多久的。那么这时候怎么办呢?

有两个办法:

第一个方法,我们可以对DoJob做一个超时设置。让DoJob最多只能执行n秒,那么我的分布式锁的超时时长设置比n秒长就可以了。为一个任务设置超时时间在很多语言是可以做到的。比如golang 中的 TimeoutContext。

而第二种方法,就是我们先为锁设置一个比较小的超时时长,然后不断续期这个锁。对一个锁的不断需求,也可以理解为重新开始加锁,这种可以不断续期的锁,就叫做可重入锁。

除了主线程之外,可重入锁必然有一个另外的线程(或者携程)可以对这个锁进行续期,我们叫这个额外的程序叫做watchDog(看门狗)。

锁重入的定义

锁可重入也就是当前已经获取到锁的goroutine继续调用Lock方法获取锁,Go标准库中提供了sync.Mutex实现了排他锁,但并不是可重入的,如果在代码中重入锁,也就是Lock之后再次进行Lock获取锁,则会被阻塞到第二次Lock上,锁没有办法得到释放从而影响其它goroutine执行

// 例如
package main;

import "sync"

func ReentryExample() {
	var c int64
	var mu sync.Mutex
	mu.Lock() // 第一次加锁
	// TODO //
	mu.Lock() // 第二次加锁,阻塞
	c++;
	// TODO ...
}

重入锁的简单实现思路

  • 拿到能够识别到当前协程的id,(通过堆栈信息获取到goroutine的id)
  • 写一个结构体,实现Locker接口

首先获取到goroutine的id

func GoID() int {
	var buf [32]byte
	n := runtime.Stack(buf[:],false) // 获取堆栈的信息
	// string(buf[:n] 
	/**
	 goroutine 6 [running]:
	 main.XXX	
	*/
	// 拿到goroutine的id
	goIdStr := strings.Fields(strings.TrimPrefix(string(buf[:n]), "goroutine"))[0]
	goId, err := strconv.Atoi(fieldId)// 转换为int
	return goId
}

然后开始编写可重入锁的结构体

// ReentrantMutex 可重入的互斥锁
type ReentrantMutex struct {
	sync.Mutex       // 互斥锁
	goId       int64 // 用于保存goroutine的id
	recursion  int64 // 锁重入的次数
}

// Lock 实现Locker接口,用于加锁
func (r *ReentrantMutex) Lock() {
	gid := GoID()
	if atomic.LoadInt64(&r.goId) == gid { // 看看是否已经加过锁了?
		atomic.AddInt64(&r.recursion, 1) // 如果之前加过锁,则重入的次数+1
		return
	}
	r.Mutex.Lock() // 使用互斥锁上锁
	atomic.StoreInt64(&r.goId, gid) // 使用原子操作保存goroutine的id
	atomic.StoreInt64(&r.recursion, 1) // 第一次加锁,因此重入的次数为一
}

// Unlock 实现了Locker的接口,用于解锁
func (r *ReentrantMutex) Unlock() {
	gid := GoID()
	if atomic.LoadInt64(&r.goId) != gid { // 看是否加过锁
		panic("未加锁") // 没有加过锁,不存在解锁,直接panic
	}
	recursion := atomic.AddInt64(&r.recursion, -1) // 重入次数-1
	if recursion != 0 { // 如果重入次数没有等于0(意味着还有锁没有释放)
		return
	}
	atomic.StoreInt64(&r.goId, -1) // 重入次数为0,则不存在锁没有释放,解锁
	r.Mutex.Unlock() // 互斥锁解锁
}

测试用例

package main;

func main() {
	var m ReentrantMutex
	m.Lock()
	m.Lock() // 不会阻塞
	fmt.Println("1") // 正常打印1
	m.Unlock()
	m.Unlock()// 解锁
}

其他方法实现Golang可重入锁:

具体实现

在Golang中,语言级别天生支持协程,所以这种可重入锁就非常容易实现:

// DistributeLockRedis 基于redis的分布式可重入锁,自动续租
type DistributeLockRedis struct {
 key       string             // 锁的key
 expire    int64              // 锁超时时间
 status    bool               // 上锁成功标识
 cancelFun context.CancelFunc // 用于取消自动续租携程
 redis     redis.Client       // redis句柄
}
 
// 创建可
func NewDistributeLockRedis(key string, expire int64) *DistributeLockRedis {
 return &DistributeLockRedis{
   key : key,
   expire : expire,
 }
}
 
// TryLock 上锁
func (dl *DistributeLockRedis) TryLock() (err error) {
 if err = dl.lock(); err != nil {
  return err
 }
 ctx, cancelFun := context.WithCancel(context.Background())
 dl.cancelFun = cancelFun
 dl.startWatchDog(ctx) // 创建守护协程,自动对锁进行续期
 dl.status = true
 return nil
}
 
// competition 竞争锁
func (dl *DistributeLockRedis) lock() error {
 if res, err := redis.String(dl.redis.Do(context.Background(), "SET", dl.key, 1, "NX", "EX", dl.expire)); err != nil {
  return err
 } 
 return nil
}
 
 
// guard 创建守护协程,自动续期
func (dl *DistributeLockRedis) startWatchDog(ctx context.Context) {
 safeGo(func() error {
  for {
   select {
   // Unlock通知结束
   case <-ctx.Done():
    return nil
   default:
    // 否则只要开始了,就自动重入(续租锁)
    if dl.status {
     if res, err := redis.Int(dl.redis.Do(context.Background(), "EXPIRE", dl.key, dl.expire)); err != nil {
      return nil
     } 
     // 续租时间为 expire/2 秒
     time.Sleep(time.Duration(dl.expire/2) * time.Second)
    }
   }
  }
 })
}
 
// Unlock 释放锁
func (dl *DistributeLockRedis) Unlock() (err error) {
 // 这个重入锁必须取消,放在第一个地方执行
 if dl.cancelFun != nil {
  dl.cancelFun() // 释放成功,取消重入锁
 }
 var res int
 if dl.status {
  if res, err = redis.Int(dl.redis.Do(context.Background(), "Del", dl.key)); err != nil {
   return fmt.Errorf("释放锁失败")
  }
  if res == 1 {
   dl.status = false
   return nil
  }
 }
 return fmt.Errorf("释放锁失败")
}

这段代码的逻辑基本上都以注释的形式来写了。其中主要就在startWatchDog,对锁进行重新续期

ctx, cancelFun := context.WithCancel(context.Background())
dl.cancelFun = cancelFun
dl.startWatchDog(ctx) // 创建守护协程,自动对锁进行续期
dl.status = true

首先创建一个cancelContext,它的context函数cancelFunc是给Unlock进行调用的。然后启动一个goroutine进程来循环续期。

这个新启动的goroutine在主goroutine处理结束,调用Unlock的时候,才会结束,否则会在 过期时间/2 的时候,调用一次redis的expire命令来进行续期。

至于外部,在使用的时候如下

func Foo() error {
  key := foo
  
  // 创建可重入的分布式锁
 dl := NewDistributeLockRedis(key, 10)
 // 争抢锁
 err := dl.TryLock()
 if err != nil {
  // 没有抢到锁
  return err
 }
 
 // 抢到锁的记得释放锁
 defer func() {
  dl.Unlock()
 }
 
 // 做真正的任务
 DoJob()
}
返回顶部
顶部