写在文章开头
我们希望通过go语言实现一个简单的资源池,而这个资源池的资源包括但不限于:
- 数据库连接池
- 线程池
- 协程池
- 网络连接池
只要这些资源实现我们指定的关闭方法,则都可以通过我们封装的资源池进行统一管理,需要简单说明一下这个资源池的要求:
- 需要用户指定资源以及资源的创建方法。
- 当协程通过
Acquire
方法获取资源时,若发现当前池中有资源可以分配则直接返回,若没有足够的资源则基于传入的创建方法创建一个全新的资源分配。 - 支持资源释放和资源池关闭。
听起来很像是Java的无界线程池,接下来我们就基于这个需求实现一个版本。
需求落地
给出资源池结构
我们首先需要给出资源池的结构,很明显作为一个资源池它需要有一个管理资源池的channel,为了保证多协程竞争资源的协程安全,我们还需要通过一把Mutex完成操作互斥,同时给出创建资源的工厂方法要求这个工厂方法创建的资源具备资源关闭能力:
// Pool 定义一个结构体 包含重量级锁 有缓冲区Chanel 工厂方法 连接池关闭状态 type Pool struct { m sync.Mutex resource chan io.Closer factory func() (io.Closer, error) closed bool }
创建资源池
有个上述的定义之后,我们的创建方法就很容易实现了,只需基于外部的size和工厂方法完成Pool成员变量初始化即可:
var ErrPoolClosed = errors.New("连接池已关闭") func New(fn func() (io.Closer, error), size uint) (*Pool, error) { //判断size大小是否合法 if size <= 0 { return nil, errors.New("size不合法") } //基于工厂方法和size创建资源池 return &Pool{ resource: make(chan io.Closer, size), factory: fn, }, nil }
获取资源
当协程需要获取资源时,会查看当前缓冲通道是否有足够的资源,如果有则在正确运行的情况下返回出去,反之基于我们上文传入的工厂方法完成资源创建并返回:
func (p *Pool) Acquire() (io.Closer, error) { select { //如果channel有足够的资源分配则直接返回 case r, ok := <-p.resource: if !ok { log.Println("连接池已关闭") return nil, ErrPoolClosed } log.Println("拿到连接池共享资源") return r, nil //基于工厂方法创建全新的资源返回出去 default: log.Println("资源不足,创建新的连接资源") return p.factory() } }
释放与关闭
这里我们将资源的释放和关闭放在一起说明,在进行资源释放和关闭时我们需要考虑3个问题即:
- 已关闭的资源池无需归还资源。
- 正在关闭资源池时不可归还资源。
- 正在归还资源时不可关闭资源池。
所以进行这两个操作时,我们需要通过互斥锁确保两个操作互斥:
// Release 上锁 设置方法退出后解锁 查看当前连接池是否已关闭,若关闭则直接将资源关闭 ,反之select查看能否将其存入缓冲区,若可以输出入队成功,反之输出队列已满 func (p *Pool) Release(r io.Closer) { //上锁确保关闭和归还资源操作互斥 p.m.Lock() //函数退出时解锁 defer p.m.Unlock() //如果资源池关闭则直接将当前资源关闭销毁 if p.closed { log.Println("连接池已关闭,直接销毁当前资源") r.Close() } //将连接归还,如果满了则直接关闭销毁 select { case p.resource <- r: log.Println("连接归还成功") default: log.Println("连接池已满,资源直接销毁") r.Close() } } // Close 方法 上锁 设置方法退出后解锁 遍历所有资源将其关闭 然后再关闭连接池 func (p *Pool) Close() { p.m.Lock() defer p.m.Unlock() if p.closed { log.Println("连接池已关闭,直接销毁当前资源") return } //设置为关闭 p.closed = true //关闭资源 close(p.resource) //遍历资源池资源 for r := range p.resource { r.Close() } }
测试代码与输出
最后我们给出测试代码,可以看到我们基于资源池工具类模拟数据库连接池的管理:
//设置最大协程数与资源池数为24 const maxGoroutines = 24 const poolResources = 24 //创建可关闭的数据库连接 type dbConnection struct { ID int32 } //对应的关闭方法 func (d *dbConnection) Close() error { log.Println("当前数据库连接", d.ID, "已关闭") return nil } var idCounter int32 func createConnection() (io.Closer, error) { id := atomic.AddInt32(&idCounter, 1) return &dbConnection{ID: id}, nil } func main() { //创建maxGoroutines个WaitGroup var wg sync.WaitGroup wg.Add(maxGoroutines) //传入createConnection方法和连接池大小poolResources创建数据库连接池 p, err := pool.New(createConnection, poolResources) if err != nil { log.Println(err) } //创建24个协程获取资源 for i := 0; i < maxGoroutines; i++ { go func(queryParam int) { queryData(queryParam, p) defer wg.Done() }(i) } //等待操作完成关闭连接池 wg.Wait() log.Println("查询完成") p.Close() } //queryData 基于连接池Acquire获取资源,完成后通过Release归还资源 func queryData(queryParam int, p *pool.Pool) { r, e := p.Acquire() if e != nil { log.Println(e) return } defer p.Release(r) time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) log.Println("查询", queryParam, "使用连接", r.(*dbConnection).ID) }
同时我们给出输出结果:
2024/05/05 23:36:10 资源不足,创建新的连接资源
2024/05/05 23:36:10 资源不足,创建新的连接资源
2024/05/05 23:36:10 资源不足,创建新的连接资源
2024/05/05 23:36:10 资源不足,创建新的连接资源
2024/05/05 23:36:10 资源不足,创建新的连接资源
2024/05/05 23:36:10 资源不足,创建新的连接资源
2024/05/05 23:36:10 资源不足,创建新的连接资源
2024/05/05 23:36:10 资源不足,创建新的连接资源
2024/05/05 23:36:10 资源不足,创建新的连接资源
2024/05/05 23:36:10 资源不足,创建新的连接资源
2024/05/05 23:36:10 资源不足,创建新的连接资源
2024/05/05 23:36:10 查询 17 使用连接 14
2024/05/05 23:36:10 连接归还成功
2024/05/05 23:36:10 查询 5 使用连接 5
2024/05/05 23:36:10 连接归还成功
2024/05/05 23:36:10 查询 3 使用连接 2
2024/05/05 23:36:10 连接归还成功
2024/05/05 23:36:10 查询 19 使用连接 19
2024/05/05 23:36:10 连接归还成功
.......