Go并发编程--性能提升大杀器sync.Pool
虽然Go自带垃圾回收,但如果想使用Go开发一个高性能的应用程序的话,就必须考虑垃圾回收给性能带来的影响;
1.Go的自动垃圾回收机制还是有一个STW(stop-the-world,程序暂停)的时间;
2.大量地创建在堆上的对象,也会影响垃圾回收标记的时间;
一般做性能优化的时候,会采用对象池的方式,把不用的对象回收起来,避免被垃圾回收掉,这样使用的时候就不必在堆上重新创建了;
像数据库连接、TCP 的长连接,这些连接在创建的时候是一个非常耗时的操作。如果每次都创建一个新的连接对象,耗时较长,很可能整个业务的大部分耗时都花在了创建连接上; 如果能把这些连接保存下来,避免每次使用的时候都重新创建,不仅可以大大减少业务的耗时,还能提高应用程序的整体性能;
这个类型也有一些使用起来不太方便的地方,就是它池化的对象可能会被垃圾回收掉,这对于数据库长连接等场景是不合适的
sync.Pool
sync.Pool 本身就是线程安全的,多个 goroutine 可以并发地调用它的方法存取对象;
sync.Pool 不可在使用之后再复制使用。
使用方法
New
当调用 Pool 的 Get 方法从池中获取元素,没有更多的空闲元素可返回时,就会调用这个 New 方法来创建新的元素;
Get
从 Pool取走一个元素,这也就意味着,这个元素会从 Pool 中移除,返回给调用者。不过,除了返回值是正常实例化的元素,Get 方法的返回值还可能会是一个 nil(Pool.New 字段没有设置,又没有空闲元素可以返回);
Put
将一个元素返还给 Pool,Pool 会把这个元素保存到池中,并且可以复用。但如果 Put 一个 nil 值,Pool 就会忽略这个值;
var buffers = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}
func GetBuffer() *bytes.Buffer {
return buffers.Get().(*bytes.Buffer)
}
func PutBuffer(buf *bytes.Buffer) {
buf.Reset()
buffers.Put(buf)
}
示例
type Node struct {
Name string
}
var nodePool = sync.Pool{
New : func() interface{}{
return new(Node)
},
}
func BenchmarkWithoutSyncPool1(b *testing.B) {
var node *Node
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i ++{
for j := 0; j < 10000; j ++{
node = new(Node)
node.Name = "test"
}
}
}
// goos: darwin
// goarch: amd64
// pkg: go_practice/go_concurrency
// BenchmarkWithoutSyncPool1-16 4324 246215 ns/op 160001 B/op 10000 allocs/op
// PASS
func BenchmarkWithSyncPool1(b *testing.B) {
var node *Node
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i ++{
for j := 0; j < 10000; j ++{
node = nodePool.Get().(*Node)
node.Name = "test"
nodePool.Put(node)
}
}
}
// goos: darwin
// goarch: amd64
// pkg: go_practice/go_concurrency
// BenchmarkWithSyncPool1-16 8138 157292 ns/op 0 B/op 0 allocs/op
// PASS
使用sync.Pool比简单的初始化要慢得多
实现原理
Go 1.13 之前的 sync.Pool 的实现有 2 大问题:
-
每次 GC 都会回收创建的对象。
-
底层实现使用了 Mutex,对这个锁并发请求竞争激烈的时候,会导致性能的下降。
在 Go 1.13 中,sync.Pool 做了大量的优化。前几讲中我提到过,提高并发程序性能的优化点是尽量不要使用锁,如果不得已使用了锁,就把锁 Go 的粒度降到最低; Go 对 Pool 的优化就是避免使用锁,同时将加锁的 queue 改成 lock-free 的 queue 的实现,给即将移除的元素再多一次“复活”的机会
Pool 最重要的两个字段是 local 和 victim,因为它们两个主要用来存储空闲的元素。弄清楚这两个字段的处理逻辑,你就能完全掌握 sync.Pool 的实现了;
func poolCleanup() {
// 丢弃当前victim, STW所以不用加锁
for _, p := range oldPools {
p.victim = nil
p.victimSize = 0
}
// 将local复制给victim, 并将原local置为nil
for _, p := range allPools {
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
}
oldPools, allPools = allPools, nil
}
Get
如果调用这个方法,就会从Pool取走一个元素;
func (p *Pool) Get() interface{} {
// 把当前goroutine固定在当前的P上
l, pid := p.pin()
x := l.private // 优先从local的private字段取,快速
l.private = nil
if x == nil {
// 从当前的local.shared弹出一个,注意是从head读取并移除
x, _ = l.shared.popHead()
if x == nil { // 如果没有,则去偷一个
x = p.getSlow(pid)
}
}
runtime_procUnpin()
// 如果没有获取到,尝试使用New函数生成一个新的
if x == nil && p.New != nil {
x = p.New()
}
return x
}
getSlow 方法的主要逻辑:
func (p *Pool) getSlow(pid int) interface{} {
size := atomic.LoadUintptr(&p.localSize)
locals := p.local
// 从其它proc中尝试偷取一个元素
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i+1)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// 如果其它proc也没有可用元素,那么尝试从vintim中获取
size = atomic.LoadUintptr(&p.victimSize)
if uintptr(pid) >= size {
return nil
}
locals = p.victim
l := indexLocal(locals, pid)
if x := l.private; x != nil { // 同样的逻辑,先从vintim中的local private获取
l.private = nil
return x
}
for i := 0; i < int(size); i++ { // 从vintim其它proc尝试偷取
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// 如果victim中都没有,则把这个victim标记为空,以后的查找可以快速跳过了
atomic.StoreUintptr(&p.victimSize, 0)
return nil
}
Put
用于将一个元素返还给Pool,Pool会把这个元素保存到池中,并且可以复用;
func (p *Pool) Put(x interface{}) {
if x == nil { // nil值直接丢弃
return
}
l, _ := p.pin()
if l.private == nil { // 如果本地private没有值,直接设置这个值即可
l.private = x
x = nil
}
if x != nil { // 否则加入到本地队列中
l.shared.pushHead(x)
}
runtime_procUnpin()
}
sync.Pool 的坑
内存泄漏
取出来的 bytes.Buffer 在使用的时候,我们可以往这个元素中增加大量的 byte 数据,这会导致底层的 byte slice 的容量可能会变得很大。这个时候,即使 Reset 再放回到池子中,这些 byte slice 的容量不会改变,所占的空间依然很大。而且,因为 Pool 回收的机制,这些大的 Buffer 可能不被回收,而是会一直占用很大的空间,这属于内存泄漏的问题
在使用 sync.Pool 回收 buffer 的时候,一定要检查回收的对象的大小。如果 buffer 太大,就不要回收了,否则就太浪费了;
内存浪费
除了内存泄漏以外,还有一种浪费的情况,就是池子中的 buffer 都比较大,但在实际使用的时候,很多时候只需要一个小的 buffer,这也是一种浪费现象;
要做到物尽其用,尽可能不浪费的话,我们可以将 buffer 池分成几层。首先,小于 512 byte 的元素的 buffer 占一个池子;其次,小于 1K byte 大小的元素占一个池子;再次,小于 4K byte 大小的元素占一个池子。这样分成几个池子以后,就可以根据需要,到所需大小的池子中获取 buffer 了;
第三方库
bytebufferpool
fasthttp 作者 valyala 提供的一个 buffer 池,基本功能和 sync.Pool 相同。它的底层也是使用 sync.Pool 实现的,包括会检测最大的 buffer,超过最大尺寸的 buffer,就会被丢弃
oxtoacart/bpool
提供了以下几种类型的 buffer。
bpool.BufferPool: 提供一个固定元素数量的 buffer 池,元素类型是 bytes.Buffer,如果超过这个数量,Put 的时候就丢弃,如果池中的元素都被取光了,会新建一个返回。Put 回去的时候,不会检测 buffer 的大小。
bpool.BytesPool:提供一个固定元素数量的 byte slice 池,元素类型是 byte slice。Put 回去的时候不检测 slice 的大小。
bpool.SizedBufferPool: 提供一个固定元素数量的 buffer 池,如果超过这个数量,Put 的时候就丢弃,如果池中的元素都被取光了,会新建一个返回。Put 回去的时候,会检测 buffer 的大小,超过指定的大小的话,就会创建一个新的满足条件的 buffer 放回去。
bpool 最大的特色就是能够保持池子中元素的数量,一旦 Put 的数量多于它的阈值,就会自动丢弃,而 sync.Pool 是一个没有限制的池子,只要 Put 就会收进去。
bpool 是基于 Channel 实现的,不像 sync.Pool 为了提高性能而做了很多优化,所以,在性能上比不过 sync.Pool。 不过,它提供了限制 Pool 容量的功能,所以,如果你想控制 Pool 的容量的话,可以考虑这个库;
连接池
TCP 连接池
// 工厂模式,提供创建连接的工厂方法
factory := func() (net.Conn, error) { return net.Dial("tcp", "127.0.0.1:4000") }
// 创建一个tcp池,提供初始容量和最大容量以及工厂方法
p, err := pool.NewChannelPool(5, 30, factory)
// 获取一个连接
conn, err := p.Get()
// Close并不会真正关闭这个连接,而是把它放回池子,所以你不必显式地Put这个对象到池子中
conn.Close()
// 通过调用MarkUnusable, Close的时候就会真正关闭底层的tcp的连接了
if pc, ok := conn.(*pool.PoolConn); ok {
pc.MarkUnusable()
pc.Close()
}
// 关闭池子就会关闭=池子中的所有的tcp连接
p.Close()
// 当前池子中的连接的数量
current := p.Len()
type PoolConn struct {
net.Conn
mu sync.RWMutex
c *channelPool
unusable bool
}
//拦截Close
func (p *PoolConn) Close() error {
p.mu.RLock()
defer p.mu.RUnlock()
if p.unusable {
if p.Conn != nil {
return p.Conn.Close()
}
return nil
}
return p.c.put(p.Conn)
}
type channelPool struct {
// 存储连接池的channel
mu sync.RWMutex
conns chan net.Conn
// net.Conn 的产生器
factory Factory
}
数据库连接池
Memcached Client 连接池
// 放回一个待重用的连接
func (c *Client) putFreeConn(addr net.Addr, cn *conn) {
c.lk.Lock()
defer c.lk.Unlock()
if c.freeconn == nil { // 如果对象为空,创建一个map对象
c.freeconn = make(map[string][]*conn)
}
freelist := c.freeconn[addr.String()] //得到此地址的连接列表
if len(freelist) >= c.maxIdleConns() {//如果连接已满,关闭,不再放入
cn.nc.Close()
return
}
c.freeconn[addr.String()] = append(freelist, cn) // 加入到空闲列表中
}
// 得到一个空闲连接
func (c *Client) getFreeConn(addr net.Addr) (cn *conn, ok bool) {
c.lk.Lock()
defer c.lk.Unlock()
if c.freeconn == nil {
return nil, false
}
freelist, ok := c.freeconn[addr.String()]
if !ok || len(freelist) == 0 { // 没有此地址的空闲列表,或者列表为空
return nil, false
}
cn = freelist[len(freelist)-1] // 取出尾部的空闲连接
c.freeconn[addr.String()] = freelist[:len(freelist)-1]
return cn, true
}
Worker Pool
一个 goroutine 初始的栈大小是 2048 个字节,并且在需要的时候可以扩展到 1GB(具体的内容你可以课下看看代码中的配置:不同的架构最大数会不同),所以,大量的 goroutine 还是很耗资源的。同时,大量的 goroutine 对于调度和垃圾回收的耗时还是会有影响的,因此,goroutine 并不是越多越好;
大部分的 Worker Pool 都是通过 Channel 来缓存任务的,因为 Channel 能够比较方便地实现并发的保护,有的是多个 Worker 共享同一个任务 Channel,有些是每个 Worker 都有一个独立的 Channel;
gammazero/workerpool:gammazero/workerpool 可以无限制地提交任务,提供了更便利的 Submit 和 SubmitWait 方法提交任务,还可以提供当前的 worker 数和任务数以及关闭 Pool 的功能。
ivpusic/grpool:grpool 创建 Pool 的时候需要提供 Worker 的数量和等待执行的任务的最大数量,任务的提交是直接往 Channel 放入任务。
dpaks/goworkers:dpaks/goworkers 提供了更便利的 Submi 方法提交任务以及 Worker 数、任务数等查询方法、关闭 Pool 的方法。它的任务的执行结果需要在 ResultChan 和 ErrChan 中去获取,没有提供阻塞的方法,但是它可以在初始化的时候设置 Worker 的数量和任务数。
总结
如果你发现程序中有一种 GC 耗时特别高,有大量的相同类型的临时对象,不断地被创建销毁,这时,你就可以考虑看看,是不是可以通过池化的手段重用这些对象;
在分布式系统或者微服务框架中,可能会有大量的并发 Client 请求,如果 Client 的耗时占比很大,你也可以考虑池化 Client,以便重用;
如果系统中的 goroutine 数量非常多,程序的内存资源占用比较大,而且整体系统的耗时和 GC 也比较高,我建议你看看,是否能够通过 Worker Pool 解决大量 goroutine 的问题,从而降低这些指标
refs
https://github.com/golang/go/blob/master/src/net/rpc/server.go#L171
https://github.com/alitto/pond
https://github.com/Sherifabdlnaby/gpool
https://github.com/go-playground/pool
https://github.com/benmanns/goworker
https://github.com/Jeffail/tunny
https://github.com/panjf2000/ants
https://godoc.org/github.com/dpaks/goworkers
https://godoc.org/github.com/ivpusic/grpool
https://godoc.org/github.com/gammazero/workerpool
https://github.com/valyala/fasthttp/blob/9f11af296864153ee45341d3f2fe0f5178fd6210/workerpool.go#L16
https://github.com/golang/go/blob/f296b7a6f045325a230f77e9bda1470b1270f817/src/runtime/proc.go#L120
https://github.com/golang/go/blob/f296b7a6f045325a230f77e9bda1470b1270f817/src/runtime/proc.go#L120
https://github.com/bradfitz/gomemcache
https://github.com/golang/go/blob/4fc3896e7933e31822caa50e024d4e139befc75f/src/database/sql/sql.go#L1196
https://github.com/fatih/pool
https://github.com/oxtoacart/bpool
https://github.com/valyala/bytebufferpool
https://github.com/vitessio/vitess/blob/master/go/bucketpool/bucketpool.go
https://github.com/golang/go/blob/617f2c3e35cdc8483b950aa3ef18d92965d63197/src/net/http/server.go
https://github.com/golang/go/issues/23199
https://github.com/gohugoio/hugo/blob/master/bufferpool/bufpool.go