Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
davidpfli committed Jul 7, 2021
1 parent 961f126 commit 6252252
Show file tree
Hide file tree
Showing 5 changed files with 438 additions and 0 deletions.
67 changes: 67 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,69 @@
# go-pool
golang连接池
A golang universal network connection pool.

## Feature:

- More versatile, The connection type in the connection pool is `interface{}`, making it more versatile
- More configurable, The connection supports setting the maximum idle time, the timeout connection will be closed and discarded, which can avoid the problem of automatic connection failure when idle
- Support user setting `ping` method, used to check the connectivity of connection, invalid connection will be discarded
- Support connection waiting, When the connection pool is full, support for connection waiting (like the go db connection pool)

## Basic Usage:

```go

//factory Specify the method to create the connection
factory := func() (interface{}, error) { return net.Dial("tcp", "127.0.0.1:4000") }

//close Specify the method to close the connection
close := func(v interface{}) error { return v.(net.Conn).Close() }

//ping Specify the method to detect whether the connection is invalid
//ping := func(v interface{}) error { return nil }

//Create a connection pool: Initialize the number of connections to 5, the maximum idle connection is 20, and the maximum concurrent connection is 30
poolConfig := &pool.Config{
InitialCap: 5,
MaxIdle: 20,
MaxCap: 30,
Factory: factory,
Close: close,
//Ping: ping,
//The maximum idle time of the connection, the connection exceeding this time will be closed, which can avoid the problem of automatic failure when connecting to EOF when idle
IdleTimeout: 15 * time.Second,
}
p, err := pool.NewChannelPool(poolConfig)
if err != nil {
fmt.Println("err=", err)
}

//Get a connection from the connection pool
v, err := p.Get()

//do something
//conn=v.(net.Conn)

//Put the connection back into the connection pool, when the connection is no longer in use
p.Put(v)

//Release all connections in the connection pool, when resources need to be destroyed
p.Release()

//View the number of connections in the current connection pool
current := p.Len()

//create new connection
p.Connect()


```


#### Remarks:
The connection pool implementation refers to pool [https://github.com/fatih/pool](https://github.com/fatih/pool) , thanks.


## License

The MIT License (MIT) - see LICENSE for more details
259 changes: 259 additions & 0 deletions channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
package pool

import (
"errors"
"fmt"
"sync"
"time"
)

var (
//ErrMaxActiveConnReached 连接池超限
ErrMaxActiveConnReached = errors.New("MaxActiveConnReached")
)

// Config 连接池相关配置
type Config struct {
//连接池中拥有的最小连接数
InitialCap int
//最大并发存活连接数
MaxCap int
//最大空闲连接
MaxIdle int
//生成连接的方法
Factory func() (interface{}, error)
//关闭连接的方法
Close func(interface{}) error
//检查连接是否有效的方法
Ping func(interface{}) error
//连接最大空闲时间,超过该事件则将失效
IdleTimeout time.Duration
}

type connReq struct {
idleConn *idleConn
}

// channelPool 存放连接信息
type channelPool struct {
mu sync.RWMutex
conns chan *idleConn
factory func() (interface{}, error)
close func(interface{}) error
ping func(interface{}) error
idleTimeout, waitTimeOut time.Duration
maxActive int
openingConns int
connQueue []chan connReq
}

type idleConn struct {
conn interface{}
t time.Time
}

// NewChannelPool 初始化连接
func NewChannelPool(poolConfig *Config) (Pool, error) {
if !(poolConfig.InitialCap <= poolConfig.MaxIdle && poolConfig.MaxCap >= poolConfig.MaxIdle && poolConfig.InitialCap >= 0) {
return nil, errors.New("invalid capacity settings")
}
if poolConfig.Factory == nil {
return nil, errors.New("invalid factory func settings")
}
if poolConfig.Close == nil {
return nil, errors.New("invalid close func settings")
}

c := &channelPool{
conns: make(chan *idleConn, poolConfig.MaxIdle),
factory: poolConfig.Factory,
close: poolConfig.Close,
idleTimeout: poolConfig.IdleTimeout,
maxActive: poolConfig.MaxCap,
openingConns: poolConfig.InitialCap,
}

if poolConfig.Ping != nil {
c.ping = poolConfig.Ping
}

for i := 0; i < poolConfig.InitialCap; i++ {
conn, err := c.factory()
if err != nil {
c.Release()
return nil, fmt.Errorf("factory is not able to fill the pool: %s", err)
}
c.conns <- &idleConn{conn: conn, t: time.Now()}
}

return c, nil
}

// getConns 获取所有连接
func (c *channelPool) getConns() chan *idleConn {
c.mu.Lock()
conns := c.conns
c.mu.Unlock()
return conns
}

// Get 从pool中取一个连接
func (c *channelPool) Get() (interface{}, error) {
conns := c.getConns()
if conns == nil {
return nil, ErrClosed
}
for {
select {
case wrapConn := <-conns:
if wrapConn == nil {
return nil, ErrClosed
}
//判断是否超时,超时则丢弃
if timeout := c.idleTimeout; timeout > 0 {
if wrapConn.t.Add(timeout).Before(time.Now()) {
//丢弃并关闭该连接
c.Close(wrapConn.conn)
continue
}
}
//判断是否失效,失效则丢弃,如果用户没有设定 ping 方法,就不检查
if c.ping != nil {
if err := c.Ping(wrapConn.conn); err != nil {
c.Close(wrapConn.conn)
continue
}
}
return wrapConn.conn, nil
default:
c.mu.Lock()
if c.openingConns >= c.maxActive {
req := make(chan connReq, 1)
c.connQueue = append(c.connQueue, req)
c.mu.Unlock()
ret, ok := <-req
if !ok {
return nil, ErrMaxActiveConnReached
}
if timeout := c.idleTimeout; timeout > 0 {
if ret.idleConn.t.Add(timeout).Before(time.Now()) {
//丢弃并关闭该连接
c.Close(ret.idleConn.conn)
continue
}
}
return ret.idleConn.conn, nil
}
if c.factory == nil {
c.mu.Unlock()
return nil, ErrClosed
}
conn, err := c.factory()
if err != nil {
c.mu.Unlock()
return nil, err
}
c.openingConns++
c.mu.Unlock()
return conn, nil
}
}
}

func (c *channelPool) Connect() (interface{}, error) {
c.mu.Lock()
defer c.mu.Unlock()

if c.factory == nil {
return nil, ErrClosed
}
conn, err := c.factory()
if err != nil {
return nil, err
}
c.openingConns++
return conn, nil
}

// Put 将连接放回pool中
func (c *channelPool) Put(conn interface{}) error {
if conn == nil {
return errors.New("connection is nil. rejecting")
}

c.mu.Lock()

if c.conns == nil {
c.mu.Unlock()
return c.Close(conn)
}

if l := len(c.connQueue); l > 0 {
req := c.connQueue[0]
copy(c.connQueue, c.connQueue[1:])
c.connQueue = c.connQueue[:l-1]
req <- connReq{
idleConn: &idleConn{conn: conn, t: time.Now()},
}
c.mu.Unlock()
return nil
} else {
select {
case c.conns <- &idleConn{conn: conn, t: time.Now()}:
c.mu.Unlock()
return nil
default:
c.mu.Unlock()
//连接池已满,直接关闭该连接
return c.Close(conn)
}
}
}

// Close 关闭单条连接
func (c *channelPool) Close(conn interface{}) error {
if conn == nil {
return errors.New("connection is nil. rejecting")
}
c.mu.Lock()
defer c.mu.Unlock()
if c.close == nil {
return nil
}
c.openingConns--
return c.close(conn)
}

// Ping 检查单条连接是否有效
func (c *channelPool) Ping(conn interface{}) error {
if conn == nil {
return errors.New("connection is nil. rejecting")
}
return c.ping(conn)
}

// Release 释放连接池中所有连接
func (c *channelPool) Release() {
c.mu.Lock()
conns := c.conns
c.conns = nil
c.factory = nil
c.ping = nil
closeFun := c.close
c.close = nil
c.mu.Unlock()

if conns == nil {
return
}

close(conns)
for wrapConn := range conns {
closeFun(wrapConn.conn)
}
}

// Len 连接池中已有的连接
func (c *channelPool) Len() int {
return len(c.getConns())
}
Loading

0 comments on commit 6252252

Please sign in to comment.