Go并发编程--性能提升大杀器sync.Pool

发布于 2020-12-04 · 本文总共 8102 字 · 阅读大约需要 24 分钟

虽然Go自带垃圾回收,但如果想使用Go开发一个高性能的应用程序的话,就必须考虑垃圾回收给性能带来的影响;

1.Go的自动垃圾回收机制还是有一个STW(stop-the-world,程序暂停)的时间;

2.大量地创建在堆上的对象,也会影响垃圾回收标记的时间;

一般做性能优化的时候,会采用对象池的方式,把不用的对象回收起来,避免被垃圾回收掉,这样使用的时候就不必在堆上重新创建了;

像数据库连接、TCP 的长连接,这些连接在创建的时候是一个非常耗时的操作。如果每次都创建一个新的连接对象,耗时较长,很可能整个业务的大部分耗时都花在了创建连接上; 如果能把这些连接保存下来,避免每次使用的时候都重新创建,不仅可以大大减少业务的耗时,还能提高应用程序的整体性能;

这个类型也有一些使用起来不太方便的地方,就是它池化的对象可能会被垃圾回收掉,这对于数据库长连接等场景是不合适的

sync.Pool

sync.Pool 本身就是线程安全的,多个 goroutine 可以并发地调用它的方法存取对象;

sync.Pool 不可在使用之后再复制使用。

使用方法

New

当调用 Pool 的 Get 方法从池中获取元素,没有更多的空闲元素可返回时,就会调用这个 New 方法来创建新的元素;

Get

从 Pool取走一个元素,这也就意味着,这个元素会从 Pool 中移除,返回给调用者。不过,除了返回值是正常实例化的元素,Get 方法的返回值还可能会是一个 nil(Pool.New 字段没有设置,又没有空闲元素可以返回);

Put

将一个元素返还给 Pool,Pool 会把这个元素保存到池中,并且可以复用。但如果 Put 一个 nil 值,Pool 就会忽略这个值;

var buffers = sync.Pool{
  New: func() interface{} { 
    return new(bytes.Buffer)
  },
}

func GetBuffer() *bytes.Buffer {
  return buffers.Get().(*bytes.Buffer)
}

func PutBuffer(buf *bytes.Buffer) {
  buf.Reset()
  buffers.Put(buf)
}

示例

type Node struct {
	Name string
}

var nodePool = sync.Pool{
	New : func() interface{}{
		return new(Node)
	},
}

func BenchmarkWithoutSyncPool1(b *testing.B) {
	var node *Node
	b.ReportAllocs()
	b.ResetTimer()
	for i := 0; i < b.N; i ++{
		for j := 0; j < 10000; j ++{
			node = new(Node)
			node.Name = "test"
		}
	}
}

// goos: darwin
// goarch: amd64
// pkg: go_practice/go_concurrency
// BenchmarkWithoutSyncPool1-16    	    4324	    246215 ns/op	  160001 B/op	   10000 allocs/op
// PASS

func BenchmarkWithSyncPool1(b *testing.B) {
	var node *Node
	b.ReportAllocs()
	b.ResetTimer()
	for i := 0; i < b.N; i ++{
		for j := 0; j < 10000; j ++{
			node = nodePool.Get().(*Node)
			node.Name = "test"
			nodePool.Put(node)
		}
	}
}

// goos: darwin
// goarch: amd64
// pkg: go_practice/go_concurrency
// BenchmarkWithSyncPool1-16    	    8138	    157292 ns/op	       0 B/op	       0 allocs/op
// PASS

使用sync.Pool比简单的初始化要慢得多

实现原理

Go 1.13 之前的 sync.Pool 的实现有 2 大问题:

  1. 每次 GC 都会回收创建的对象。

  2. 底层实现使用了 Mutex,对这个锁并发请求竞争激烈的时候,会导致性能的下降。

在 Go 1.13 中,sync.Pool 做了大量的优化。前几讲中我提到过,提高并发程序性能的优化点是尽量不要使用锁,如果不得已使用了锁,就把锁 Go 的粒度降到最低; Go 对 Pool 的优化就是避免使用锁,同时将加锁的 queue 改成 lock-free 的 queue 的实现,给即将移除的元素再多一次“复活”的机会

Pool 最重要的两个字段是 local 和 victim,因为它们两个主要用来存储空闲的元素。弄清楚这两个字段的处理逻辑,你就能完全掌握 sync.Pool 的实现了;


func poolCleanup() {
    // 丢弃当前victim, STW所以不用加锁
    for _, p := range oldPools {
        p.victim = nil
        p.victimSize = 0
    }

    // 将local复制给victim, 并将原local置为nil
    for _, p := range allPools {
        p.victim = p.local
        p.victimSize = p.localSize
        p.local = nil
        p.localSize = 0
    }

    oldPools, allPools = allPools, nil
}

Get

如果调用这个方法,就会从Pool取走一个元素;

func (p *Pool) Get() interface{} {
    // 把当前goroutine固定在当前的P上
    l, pid := p.pin()
    x := l.private // 优先从local的private字段取,快速
    l.private = nil
    if x == nil {
        // 从当前的local.shared弹出一个,注意是从head读取并移除
        x, _ = l.shared.popHead()
        if x == nil { // 如果没有,则去偷一个
            x = p.getSlow(pid) 
        }
    }
    runtime_procUnpin()
    // 如果没有获取到,尝试使用New函数生成一个新的
    if x == nil && p.New != nil {
        x = p.New()
    }
    return x
}

getSlow 方法的主要逻辑:


func (p *Pool) getSlow(pid int) interface{} {

    size := atomic.LoadUintptr(&p.localSize)
    locals := p.local                       
    // 从其它proc中尝试偷取一个元素
    for i := 0; i < int(size); i++ {
        l := indexLocal(locals, (pid+i+1)%int(size))
        if x, _ := l.shared.popTail(); x != nil {
            return x
        }
    }

    // 如果其它proc也没有可用元素,那么尝试从vintim中获取
    size = atomic.LoadUintptr(&p.victimSize)
    if uintptr(pid) >= size {
        return nil
    }
    locals = p.victim
    l := indexLocal(locals, pid)
    if x := l.private; x != nil { // 同样的逻辑,先从vintim中的local private获取
        l.private = nil
        return x
    }
    for i := 0; i < int(size); i++ { // 从vintim其它proc尝试偷取
        l := indexLocal(locals, (pid+i)%int(size))
        if x, _ := l.shared.popTail(); x != nil {
            return x
        }
    }

    // 如果victim中都没有,则把这个victim标记为空,以后的查找可以快速跳过了
    atomic.StoreUintptr(&p.victimSize, 0)

    return nil
}

Put

用于将一个元素返还给Pool,Pool会把这个元素保存到池中,并且可以复用;

func (p *Pool) Put(x interface{}) {
    if x == nil { // nil值直接丢弃
        return
    }
    l, _ := p.pin()
    if l.private == nil { // 如果本地private没有值,直接设置这个值即可
        l.private = x
        x = nil
    }
    if x != nil { // 否则加入到本地队列中
        l.shared.pushHead(x)
    }
    runtime_procUnpin()
}

sync.Pool 的坑

内存泄漏

取出来的 bytes.Buffer 在使用的时候,我们可以往这个元素中增加大量的 byte 数据,这会导致底层的 byte slice 的容量可能会变得很大。这个时候,即使 Reset 再放回到池子中,这些 byte slice 的容量不会改变,所占的空间依然很大。而且,因为 Pool 回收的机制,这些大的 Buffer 可能不被回收,而是会一直占用很大的空间,这属于内存泄漏的问题

在使用 sync.Pool 回收 buffer 的时候,一定要检查回收的对象的大小。如果 buffer 太大,就不要回收了,否则就太浪费了;

内存浪费

除了内存泄漏以外,还有一种浪费的情况,就是池子中的 buffer 都比较大,但在实际使用的时候,很多时候只需要一个小的 buffer,这也是一种浪费现象;

要做到物尽其用,尽可能不浪费的话,我们可以将 buffer 池分成几层。首先,小于 512 byte 的元素的 buffer 占一个池子;其次,小于 1K byte 大小的元素占一个池子;再次,小于 4K byte 大小的元素占一个池子。这样分成几个池子以后,就可以根据需要,到所需大小的池子中获取 buffer 了;


第三方库

bytebufferpool

fasthttp 作者 valyala 提供的一个 buffer 池,基本功能和 sync.Pool 相同。它的底层也是使用 sync.Pool 实现的,包括会检测最大的 buffer,超过最大尺寸的 buffer,就会被丢弃

oxtoacart/bpool

提供了以下几种类型的 buffer。

bpool.BufferPool: 提供一个固定元素数量的 buffer 池,元素类型是 bytes.Buffer,如果超过这个数量,Put 的时候就丢弃,如果池中的元素都被取光了,会新建一个返回。Put 回去的时候,不会检测 buffer 的大小。

bpool.BytesPool:提供一个固定元素数量的 byte slice 池,元素类型是 byte slice。Put 回去的时候不检测 slice 的大小。

bpool.SizedBufferPool: 提供一个固定元素数量的 buffer 池,如果超过这个数量,Put 的时候就丢弃,如果池中的元素都被取光了,会新建一个返回。Put 回去的时候,会检测 buffer 的大小,超过指定的大小的话,就会创建一个新的满足条件的 buffer 放回去。

bpool 最大的特色就是能够保持池子中元素的数量,一旦 Put 的数量多于它的阈值,就会自动丢弃,而 sync.Pool 是一个没有限制的池子,只要 Put 就会收进去。

bpool 是基于 Channel 实现的,不像 sync.Pool 为了提高性能而做了很多优化,所以,在性能上比不过 sync.Pool。 不过,它提供了限制 Pool 容量的功能,所以,如果你想控制 Pool 的容量的话,可以考虑这个库;

连接池

TCP 连接池


// 工厂模式,提供创建连接的工厂方法
factory    := func() (net.Conn, error) { return net.Dial("tcp", "127.0.0.1:4000") }

// 创建一个tcp池,提供初始容量和最大容量以及工厂方法
p, err := pool.NewChannelPool(5, 30, factory)

// 获取一个连接
conn, err := p.Get()

// Close并不会真正关闭这个连接,而是把它放回池子,所以你不必显式地Put这个对象到池子中
conn.Close()

// 通过调用MarkUnusable, Close的时候就会真正关闭底层的tcp的连接了
if pc, ok := conn.(*pool.PoolConn); ok {
  pc.MarkUnusable()
  pc.Close()
}

// 关闭池子就会关闭=池子中的所有的tcp连接
p.Close()

// 当前池子中的连接的数量
current := p.Len()

    type PoolConn struct {
    net.Conn
    mu       sync.RWMutex
    c        *channelPool
    unusable bool
  }
  
    //拦截Close
  func (p *PoolConn) Close() error {
    p.mu.RLock()
    defer p.mu.RUnlock()
  
    if p.unusable {
      if p.Conn != nil {
        return p.Conn.Close()
      }
      return nil
    }
    return p.c.put(p.Conn)
  }

    type channelPool struct {
    // 存储连接池的channel
    mu    sync.RWMutex
    conns chan net.Conn
  

    // net.Conn 的产生器
    factory Factory
  }

数据库连接池


Memcached Client 连接池


   // 放回一个待重用的连接
   func (c *Client) putFreeConn(addr net.Addr, cn *conn) {
    c.lk.Lock()
    defer c.lk.Unlock()
    if c.freeconn == nil { // 如果对象为空,创建一个map对象
      c.freeconn = make(map[string][]*conn)
    }
    freelist := c.freeconn[addr.String()] //得到此地址的连接列表
    if len(freelist) >= c.maxIdleConns() {//如果连接已满,关闭,不再放入
      cn.nc.Close()
      return
    }
    c.freeconn[addr.String()] = append(freelist, cn) // 加入到空闲列表中
  }
  
    // 得到一个空闲连接
  func (c *Client) getFreeConn(addr net.Addr) (cn *conn, ok bool) {
    c.lk.Lock()
    defer c.lk.Unlock()
    if c.freeconn == nil { 
      return nil, false
    }
    freelist, ok := c.freeconn[addr.String()]
    if !ok || len(freelist) == 0 { // 没有此地址的空闲列表,或者列表为空
      return nil, false
    }
    cn = freelist[len(freelist)-1] // 取出尾部的空闲连接
    c.freeconn[addr.String()] = freelist[:len(freelist)-1]
    return cn, true
  }

Worker Pool

一个 goroutine 初始的栈大小是 2048 个字节,并且在需要的时候可以扩展到 1GB(具体的内容你可以课下看看代码中的配置:不同的架构最大数会不同),所以,大量的 goroutine 还是很耗资源的。同时,大量的 goroutine 对于调度和垃圾回收的耗时还是会有影响的,因此,goroutine 并不是越多越好;

大部分的 Worker Pool 都是通过 Channel 来缓存任务的,因为 Channel 能够比较方便地实现并发的保护,有的是多个 Worker 共享同一个任务 Channel,有些是每个 Worker 都有一个独立的 Channel;

gammazero/workerpool:gammazero/workerpool 可以无限制地提交任务,提供了更便利的 Submit 和 SubmitWait 方法提交任务,还可以提供当前的 worker 数和任务数以及关闭 Pool 的功能。

ivpusic/grpool:grpool 创建 Pool 的时候需要提供 Worker 的数量和等待执行的任务的最大数量,任务的提交是直接往 Channel 放入任务。

dpaks/goworkers:dpaks/goworkers 提供了更便利的 Submi 方法提交任务以及 Worker 数、任务数等查询方法、关闭 Pool 的方法。它的任务的执行结果需要在 ResultChan 和 ErrChan 中去获取,没有提供阻塞的方法,但是它可以在初始化的时候设置 Worker 的数量和任务数。

总结

如果你发现程序中有一种 GC 耗时特别高,有大量的相同类型的临时对象,不断地被创建销毁,这时,你就可以考虑看看,是不是可以通过池化的手段重用这些对象;

在分布式系统或者微服务框架中,可能会有大量的并发 Client 请求,如果 Client 的耗时占比很大,你也可以考虑池化 Client,以便重用;

如果系统中的 goroutine 数量非常多,程序的内存资源占用比较大,而且整体系统的耗时和 GC 也比较高,我建议你看看,是否能够通过 Worker Pool 解决大量 goroutine 的问题,从而降低这些指标

refs

https://github.com/golang/go/blob/master/src/net/rpc/server.go#L171

https://github.com/alitto/pond

https://github.com/Sherifabdlnaby/gpool

https://github.com/go-playground/pool

https://github.com/benmanns/goworker

https://github.com/Jeffail/tunny

https://github.com/panjf2000/ants

https://godoc.org/github.com/dpaks/goworkers

https://godoc.org/github.com/ivpusic/grpool

https://godoc.org/github.com/gammazero/workerpool

https://github.com/valyala/fasthttp/blob/9f11af296864153ee45341d3f2fe0f5178fd6210/workerpool.go#L16

https://github.com/golang/go/blob/f296b7a6f045325a230f77e9bda1470b1270f817/src/runtime/proc.go#L120

https://github.com/golang/go/blob/f296b7a6f045325a230f77e9bda1470b1270f817/src/runtime/proc.go#L120

https://github.com/bradfitz/gomemcache

https://github.com/golang/go/blob/4fc3896e7933e31822caa50e024d4e139befc75f/src/database/sql/sql.go#L1196

https://github.com/fatih/pool

https://github.com/oxtoacart/bpool

https://github.com/valyala/bytebufferpool

https://github.com/vitessio/vitess/blob/master/go/bucketpool/bucketpool.go

https://github.com/golang/go/blob/617f2c3e35cdc8483b950aa3ef18d92965d63197/src/net/http/server.go

https://github.com/golang/go/issues/23199

https://github.com/gohugoio/hugo/blob/master/bufferpool/bufpool.go




本博客所有文章采用的授权方式为 自由转载-非商用-非衍生-保持署名 ,转载请务必注明出处,谢谢。
声明:
本博客欢迎转发,但请保留原作者信息!
博客地址:邱文奇(qiuwenqi)的博客;
内容系本人学习、研究和总结,如有雷同,实属荣幸!
阅读次数:

文章评论

comments powered by Disqus


章节列表