cleanerxiaoqiang 2019-11-05
前几天看了epoll 使用,今天写了一个测试脚本,测试一下epoll加连接池的性能
50万个请求,连接池使用2000连接,发送 "test" 服务端接受后 转成大写返回,处理完所有的请求耗时3.731506996s,性能很强大(注意:需要在linux环境下测试)
拿数据库举例,频繁的建立、关闭连接,会极大的降低mysql的性能,因为建立连接,释放连接引起的大量性能开销。
连接池技术带来的优势:
1、资源重用
由于tcp得到重用,避免了频繁创建、释放连接引起的大量性能开销。在减少系统消耗的基础上,另一方面也增进了系统运行环境的平稳性(减少内存碎片以及数据库临时进程/线程的数量)。
2、更快的系统响应速度
连接池在初始化后运行中。对于业务请求处理而言,大部分请求可以直接利用现有可用连接,避免了数据库连接初始化和释放过程的时间开销,从而缩减了系统整体响应时间。
3、连接数量的控制
过多的连接数量会拖垮整个服务,连接池可以设定activeConne连接数量,从客户端阻塞过多的连接,保证系统服务的平稳。
4、统一的连接管理,避免数据库连接泄漏
根据预先的连接占用超时设定,强制收回被占用连接。从而避免了常规数据库连接操作中可能出现的资源泄漏。
首先对于一个tcp连接,操作系统会为每一个连接分配一定的内存空间外(主要是内部网络数据结构sk_buff的大小、连接的读写缓存,sof),虽然这些可以进行调优,但是如果想使用正常的操作系统的TCP/IP栈的话,这些是硬性的需求。刨去这些,不同的编程语言不同的框架的设计,甚至是不同的需求场景,都会极大的影响TCP服务器内存的占用和处理。
一般Go语言的TCP(和HTTP)的处理都是每一个连接启动一个goroutine去处理,因为我们被教导goroutine的不像thread, 它是很便宜的,可以在服务器上启动成千上万的goroutine。但是对于一百万的连接,这种goroutine-per-connection的模式就至少要启动一百万个goroutine,这对资源的消耗也是极大的。针对不同的操作系统和不同的Go版本,一个goroutine锁使用的最小的栈大小是2KB ~ 8 KB (go stack),如果在每个goroutine中在分配byte buffer用以从连接中读写数据,几十G的内存轻轻松松就分配出去了。
func main() { connections:=50000 addr:="127.0.0.1:8972" var conns []net.Conn for i := 0; i < connections; i++ { c, err := net.DialTimeout("tcp", addr, 10*time.Second) if err != nil { fmt.Println("failed to connect", i, err) i-- continue } conns = append(conns, c) time.Sleep(time.Millisecond) } defer func() { for _, c := range conns { c.Close() } }() log.Printf("完成初始化 %d 连接", len(conns)) tts := time.Millisecond * 5 for { for i := 0; i < len(conns); i++ { time.Sleep(tts) conn := conns[i] conn.Write([]byte("hello world\r\n")) } } }
server.go
func main() { ln, err := net.Listen("tcp", "127.0.0.1:8972") if err != nil { panic(err) } var connections []net.Conn defer func() { for _, conn := range connections { conn.Close() } }() for { conn, e := ln.Accept() if e != nil { if ne, ok := e.(net.Error); ok && ne.Temporary() { log.Printf("accept temp err: %v", ne) continue } log.Printf("accept err: %v", e) return } go handleConn(conn) connections = append(connections, conn) if len(connections)%100 == 0 { log.Printf("total number of connections: %v", len(connections)) } } } func handleConn(conn net.Conn) { io.Copy(ioutil.Discard, conn) }
5万个tcp消耗的内存情况
5万个tcp消耗的内存情况
使用连接池,server使用epoll,使用2000个连接,处理完50万个请求,发送test ,返回TEST大写,耗时3.7s,处理完所有的请求,qps 15万+
github上有详细代码,地址:https://github.com/shanlongpa...
testPoll.go 是连接池的使用
pool := &pools.Pool{ MaxIdle: 100, MaxActive: 2000, IdleTimeout: 20 * time.Second, MaxConnLifetime: 100 * time.Second, Dial: func() (net.Conn, error) { c, err := net.Dial("tcp", "127.0.0.1:8972") if err != nil { return nil, err } return c, err }, } defer pool.Close() t := time.Now() worklist := make(chan int) var wg sync.WaitGroup for i := 0; i < 2000; i++ { go func() { for range worklist { wg.Done() cli,err:=pool.Get() if err!=nil{ log.Println(err) return } str:="test" err=pools.Write(cli.C,[]byte(str)) if err!=nil{ log.Println(err) pool.Put(cli,true) return } _,err=pools.Read(cli.C) if err!=nil{ log.Println(err) }else{ //if i%500==0{ // fmt.Println(string(receByte)) //} } pool.Put(cli,false) } }() } for i := 0; i < 500000; i++ { wg.Add(1) worklist <- i } fmt.Println("pool建立,连接数:",pool.Active) close(worklist) wg.Wait() // 调用服务 fmt.Println(time.Since(t))
-连接池结构
type Pool struct { // 建立tcp连接 Dial func() (net.Conn, error) // 健康检测,判断连接是否断开 TestOnBorrow func(c net.Conn, t time.Time) error // 连接池中最大空闲连接数 MaxIdle int // 打开最大的连接数 MaxActive int // Idle多久断开连接,小于服务器超时时间 IdleTimeout time.Duration // 配置最大连接数的时候,并且wait是true的时候,超过最大的连接,get的时候会阻塞,知道有连接放回到连接池 Wait bool // 超过多久时间 链接关闭 MaxConnLifetime time.Duration chInitialized uint32 // set to 1 when field ch is initialized 原子锁ch初始化一次 mu sync.Mutex // 锁 closed bool // set to true when the pool is closed. Active int // 连接池中打开的连接数 ch chan struct{} // limits open connections when p.Wait is true Idle idleList // idle 连接 } // 空闲连,记录poolConn的头和尾 type idleList struct { count int front, back *poolConn } // 连接的双向链表 type poolConn struct { C net.Conn t time.Time // idle 时间,即放会pool的时间 created time.Time //创建时间 next, prev *poolConn }
主要有两个方法Get(),获取一个可用的连接。 Put() 把连接放回到连接池
func (p *Pool) Get() (*poolConn, error) { // p.Wait == true. 的时候限制最大连接数 if p.Wait && p.MaxActive > 0 { p.lazyInit() <-p.ch } p.mu.Lock() // 删除idle超时的连接,删除掉 if p.IdleTimeout > 0 { n := p.Idle.count for i := 0; i < n && p.Idle.back != nil && p.Idle.back.t.Add(p.IdleTimeout).Before(nowFunc()); i++ { pc := p.Idle.back p.Idle.popBack() p.mu.Unlock() pc.C.Close() p.mu.Lock() p.Active-- } } //从Idle list 获取一个可用的空闲链接. for p.Idle.front != nil { pc := p.Idle.front p.Idle.popFront() p.mu.Unlock() if (p.TestOnBorrow == nil || p.TestOnBorrow(pc.C, pc.t) == nil) && (p.MaxConnLifetime == 0 || nowFunc().Sub(pc.created) < p.MaxConnLifetime) { return pc, nil } pc.C.Close() p.mu.Lock() p.Active-- } //pool关闭后直接return error if p.closed { p.mu.Unlock() return nil, errors.New("get on closed pool") } // Handle limit for p.Wait == false. if !p.Wait && p.MaxActive > 0 && p.Active >= p.MaxActive { p.mu.Unlock() return nil, errors.New("pool 耗尽了") } p.Active++ p.mu.Unlock() c, err := p.Dial() if err != nil { c = nil p.mu.Lock() p.Active-- if p.ch != nil && !p.closed { p.ch <- struct{}{} } p.mu.Unlock() } return &poolConn{C: c, created: nowFunc()}, err } func (p *Pool) Put(pc *poolConn, forceClose bool) error { p.mu.Lock() if !p.closed && !forceClose { pc.t = nowFunc() p.Idle.pushFront(pc) if p.Idle.count > p.MaxIdle { pc = p.Idle.back p.Idle.popBack() } else { pc = nil } } if pc != nil { p.mu.Unlock() pc.C.Close() p.mu.Lock() p.Active-- } if p.ch != nil && !p.closed { p.ch <- struct{}{} } p.mu.Unlock() return nil }
epoll 使用主要分为三部,第一步创建epoll,第二部,添加事件 EPOLL_CTL_ADD,第三步,等待EpollEvent.
func main() { setLimit() ln, err := net.Listen("tcp", "127.0.0.1:8972") if err != nil { panic(err) } epoller, err = MkEpoll() if err != nil { panic(err) } go start() for { conn, e := ln.Accept() if e != nil { if ne, ok := e.(net.Error); ok && ne.Temporary() { log.Printf("accept temp err: %v", ne) continue } log.Printf("accept err: %v", e) return } if err := epoller.Add(conn); err != nil { log.Printf("failed to add connection %v", err) conn.Close() } } } //返回接受的信息,小写转成大写字母 func replyConn(c net.Conn) error { data,err:= pools.Read(c) if err!=nil{ return err } err= pools.Write(c,[]byte(strings.ToUpper(string(data)))) return err }