ReDisaster 2019-11-19
大家都知道go语言中的goroutine虽然消耗资源很小,并且是一个用户线程。但是goroutine也不是无限开的,所以我们会有很多关于协程池的库,当然啊我们自己也可以完成一些简单的携程池。redis也是相同的,redis的链接也是不推荐无限制的打开,否则会造成redis负荷加重。
先看一下Redigo 中的连接池的使用
package main import ( "fmt" "github.com/panlei/redigo/redis" "time" ) func main() { pool := &redis.Pool{ MaxIdle: 4, MaxActive: 4, Dial: func() (redis.Conn, error) { rc, err := redis.Dial("tcp", "127.0.0.1:6379") if err != nil { return nil, err } return rc, nil }, IdleTimeout: time.Second, Wait: true, } con := pool.Get() str, err := redis.String(con.Do("get", "aaa")) con.Close() fmt.Println("value: ", str, " err:", err) }
我们可以看到Redigo使用连接池还是很简单的步骤:
type Pool struct { // 拨号函数 从外部注入 Dial func() (Conn, error) // DialContext is an application supplied function for creating and configuring a DialContext func(ctx context.Context) (Conn, error) // 检测连接的可用性,从外部注入。如果返回error 则直接关闭连接 TestOnBorrow func(c Conn, t time.Time) error // 最大闲置连接数量 MaxIdle int // 最大活动连接数 MaxActive int // 闲置过期时间 在get函数中会有逻辑 删除过期的连接 IdleTimeout time.Duration // 设置如果活动连接达到上限 再获取时候是等待还是返回错误 // 如果是false 系统会返回redigo: connection pool exhausted // 如果是true 会利用p 的ch 属性让线程等待 知道有连接释放出来 Wait bool // 连接最长生存时间 如果超过时间会被从链表中删除 MaxConnLifetime time.Duration // 判断ch 是否被初始化了 chInitialized uint32 // set to 1 when field ch is initialized // 锁 mu sync.Mutex // mu protects the following fields closed bool // set to true when the pool is closed. active int // the number of open connections in the pool ch chan struct{} // limits open connections when p.Wait is true // 存放闲置连接的链表 idle idleList // idle connections // 等待获取连接的数量 waitCount int64 // total number of connections waited for. waitDuration time.Duration // total time waited for new connections. } // 连接池中的具体连接对象 type conn struct { // 锁 mu sync.Mutex pending int err error // http 包中的conn对象 conn net.Conn // 读入过期时间 readTimeout time.Duration // bufio reader对象 用于读取redis服务返回的结果 br *bufio.Reader // 写入过期时间 writeTimeout time.Duration // bufio writer对象 带buf 用于往服务端写命令 bw *bufio.Writer // Scratch space for formatting argument length. // '*' or '$', length, "\r\n" lenScratch [32]byte // Scratch space for formatting integers and floats. numScratch [40]byte }
我们可以看到,其中有几个关键性的字段比如最大活动连接数、最大闲置连接数、闲置链接过期时间、连接生存时间等。
我们知道 连接池最重要的就是两个方法,一个是获取连接,一个是关闭连接。这个跟sync.Pool。我们来看一下代码:
GET:
func (p *Pool) get(ctx context.Context) (*poolConn, error) { // 处理是否需要等待 pool Wait如果是true 则等待连接释放 var waited time.Duration if p.Wait && p.MaxActive > 0 { // 重新初始化pool的ch channel p.lazyInit() // wait indicates if we believe it will block so its not 100% accurate // however for stats it should be good enough. wait := len(p.ch) == 0 var start time.Time if wait { start = time.Now() } // 获取pool 的ch通道,一旦有连接被close 则可以继续返回连接 if ctx == nil { <-p.ch } else { select { case <-p.ch: case <-ctx.Done(): return nil, ctx.Err() } } if wait { waited = time.Since(start) } } p.mu.Lock() // 等待数量加1 增加等待时间 if waited > 0 { p.waitCount++ p.waitDuration += waited } // Prune stale connections at the back of the idle list. // 删除链表尾部的陈旧连接,删除超时的连接 // 连接close之后,连接会回到pool的idle(闲置)链表中 if p.IdleTimeout > 0 { n := p.idle.count for i := 0; i < n && p.idle.back != nil && p.idle.back.t.Add(p.IdleTimeout).Before(nowFunc()); i++ { pc := p.idle.back p.idle.popBack() p.mu.Unlock() pc.c.Close() p.mu.Lock() p.active-- } } // Get idle connection from the front of idle list. // 获取链表空闲连接 拿链表第一个 for p.idle.front != nil { pc := p.idle.front p.idle.popFront() p.mu.Unlock() // 调用验证函数如果返回错误不为nil 关闭连接拿下一个 // 判断连接生存时间 大于生存时间则关闭拿下一个 if (p.TestOnBorrow == nil || p.TestOnBorrow(pc.c, pc.t) == nil) && (p.MaxConnLifetime == 0 || nowFunc().Sub(pc.created) < p.MaxConnLifetime) { return pc, nil } pc.c.Close() p.mu.Lock() p.active-- } // Check for pool closed before dialing a new connection. // 判断连接池是否被关闭 如果关闭则解锁报错 if p.closed { p.mu.Unlock() return nil, errors.New("redigo: get on closed pool") } // Handle limit for p.Wait == false. // 如果活动连接大于最大连接解锁 返回错误 if !p.Wait && p.MaxActive > 0 && p.active >= p.MaxActive { p.mu.Unlock() return nil, ErrPoolExhausted } // 如果在链表中没有获取到可用的连接 并添加active数量添加 p.active++ p.mu.Unlock() c, err := p.dial(ctx) // 如果调用失败 则减少active数量 if err != nil { c = nil p.mu.Lock() p.active-- if p.ch != nil && !p.closed { p.ch <- struct{}{} } p.mu.Unlock() } // 创建连接 设置创建时间 return &poolConn{c: c, created: nowFunc()}, err }
Put:
// 关闭方法 func (ac *activeConn) Close() error { pc := ac.pc if pc == nil { return nil } ac.pc = nil // 判断连接的状态 发送取消事务 取消watch if ac.state&connectionMultiState != 0 { pc.c.Send("DISCARD") ac.state &^= (connectionMultiState | connectionWatchState) } else if ac.state&connectionWatchState != 0 { pc.c.Send("UNWATCH") ac.state &^= connectionWatchState } if ac.state&connectionSubscribeState != 0 { pc.c.Send("UNSUBSCRIBE") pc.c.Send("PUNSUBSCRIBE") // To detect the end of the message stream, ask the server to echo // a sentinel value and read until we see that value. sentinelOnce.Do(initSentinel) pc.c.Send("ECHO", sentinel) pc.c.Flush() for { p, err := pc.c.Receive() if err != nil { break } if p, ok := p.([]byte); ok && bytes.Equal(p, sentinel) { ac.state &^= connectionSubscribeState break } } } pc.c.Do("") // 把连接放入链表 ac.p.put(pc, ac.state != 0 || pc.c.Err() != nil) return nil } // 将连接 重新放入限制链表 func (p *Pool) put(pc *poolConn, forceClose bool) error { p.mu.Lock() if !p.closed && !forceClose { pc.t = nowFunc() p.idle.pushFront(pc) if p.idle.count > p.MaxIdle { pc = p.idle.back p.idle.popBack() } else { pc = nil } } if pc != nil { p.mu.Unlock() pc.c.Close() p.mu.Lock() p.active-- } // 如果连接的ch 不为空 并且连接池没有关闭 则给channel中输入一个struct{}{} // 如果在连接打到最大活动数量之后 再获取连接并且pool的Wait为ture 会阻塞线程等待返回连接 if p.ch != nil && !p.closed { p.ch <- struct{}{} } p.mu.Unlock() return nil }
整个Pool整体流程,我大概画了一个图。
从初始化 =》获取 -》创建连接 =》返回连接 =》关闭连接 =》
其中还有一条线是Pool.Wait = true 会一直阻塞 一直到有连接Close 释放活动连接数 线程被唤醒返回闲置的连接
其实大部分的连接池都是类似的流程,比如goroutine,redis。