Go并发编程--扩展并发原语SingleFlight

发布于 2020-11-18 · 本文总共 1950 字 · 阅读大约需要 6 分钟

请求合并 SingleFlight

SingleFlight 是 Go 开发组提供的一个扩展并发原语。它的作用是,在处理多个 goroutine 同时调用同一个函数的时候,只让一个 goroutine 去调用这个函数,等到这个 goroutine 返回结果的时候,再把结果返回给这几个同时调用的 goroutine,这样可以减少并发调用的数量。

sync.Once 主要是用在单次初始化场景中,而 SingleFlight 主要用在合并并发请求的场景中,尤其是缓存场景。

实现原理


  // 代表一个正在处理的请求,或者已经处理完的请求
  type call struct {
    wg sync.WaitGroup
  

    // 这个字段代表处理完的值,在waitgroup完成之前只会写一次
        // waitgroup完成之后就读取这个值
    val interface{}
    err error
  
        // 指示当call在处理时是否要忘掉这个key
    forgotten bool
    dups  int
    chans []chan<- Result
  }
  
    // group代表一个singleflight对象
  type Group struct {
    mu sync.Mutex       // protects m
    m  map[string]*call // lazily initialized
  }

Do 方法


  func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
    g.mu.Lock()
    if g.m == nil {
      g.m = make(map[string]*call)
    }
    if c, ok := g.m[key]; ok {//如果已经存在相同的key
      c.dups++
      g.mu.Unlock()
      c.wg.Wait() //等待这个key的第一个请求完成
      return c.val, c.err, true //使用第一个key的请求结果
    }
    c := new(call) // 第一个请求,创建一个call
    c.wg.Add(1)
    g.m[key] = c //加入到key map中
    g.mu.Unlock()
  

    g.doCall(c, key, fn) // 调用方法
    return c.val, c.err, c.dups > 0
  }

  func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
    c.val, c.err = fn()
    c.wg.Done()
  

    g.mu.Lock()
    if !c.forgotten { // 已调用完,删除这个key
      delete(g.m, key)
    }
    for _, ch := range c.chans {
      ch <- Result{c.val, c.err, c.dups > 0}
    }
    g.mu.Unlock()
  }

应用场景

使用 SingleFlight 时,可以通过合并请求的方式降低对下游服务的并发压力,从而提高系统的性能,常常用于缓存系统中。


func metaImportsForPrefix(importPrefix string, mod ModuleMode, security web.SecurityMode) (*urlpkg.URL, []metaImport, error) {
        // 使用缓存保存请求结果
    setCache := func(res fetchResult) (fetchResult, error) {
      fetchCacheMu.Lock()
      defer fetchCacheMu.Unlock()
      fetchCache[importPrefix] = res
      return res, nil
    
        // 使用 SingleFlight请求
    resi, _, _ := fetchGroup.Do(importPrefix, func() (resi interface{}, err error) {
      fetchCacheMu.Lock()
            // 如果缓存中有数据,那么直接从缓存中取
      if res, ok := fetchCache[importPrefix]; ok {
        fetchCacheMu.Unlock()
        return res, nil
      }
      fetchCacheMu.Unlock()
            ......

refs

https://github.com/golang/go/blob/b1b67841d1e229b483b0c9dd50ddcd1795b0f90f/src/net/lookup.go

https://github.com/golang/go/blob/50bd1c4d4eb4fac8ddeb5f063c099daccfb71b26/src/internal/singleflight/singleflight.go

https://time.geekbang.org/column/article/309098




本博客所有文章采用的授权方式为 自由转载-非商用-非衍生-保持署名 ,转载请务必注明出处,谢谢。
声明:
本博客欢迎转发,但请保留原作者信息!
博客地址:邱文奇(qiuwenqi)的博客;
内容系本人学习、研究和总结,如有雷同,实属荣幸!
阅读次数:

文章评论

comments powered by Disqus


章节列表