Go-MySQL(二)Go实现MySQL连接池
连接池数据结构
利用channel来存储数据库连接,消费channel中的消息获取连接,连接池未满时则新建连接后将连接放入channel,采用的带缓冲区的channel,缓冲区大小就是连接池的最大容纳的连接数,如果缓冲区还有空间,那么获取和释放连接都不会阻塞,如果缓冲区为空,那么就是阻塞连接获取,从而走新建连接的逻辑;同理,缓冲区满了,就阻塞向channel放入连接的过程,需要先消费.
var (
PoolUnInvaildSizeError = errors.New("pool size is unvaild")
PoolIsClosedError = errors.New("pool had closed")
)
// 连接池定义
type Pool struct {
sync.Mutex // 保证连接池线程安全
Size int // 连接池连接数量
ConnChan chan io.Closer // 存储连接的管道
IsClose bool
ctx context.Context
}
初始化连接池:
// 初始化
func New(size int) (*Pool,error){
if size <= 0{
return nil,PoolUnInvaildSizeError
}
return &Pool{
ConnChan: make(chan io.Closer,size),
ctx: context.Background(),
},nil
}
获取连接
获取连接:
- 从管道中消费,如果没有连接则新建连接
- 新建连接后放入连接池管道中
// 获取连接
func(pool *Pool) GetConnFromPool() (io.Closer,error){
if pool.IsClose == true{
return nil,PoolIsClosedError
}
select {
// 从管道中消费
case conn,ok := <- pool.ConnChan:
if !ok{
return nil,PoolIsClosedError
}
fmt.Println("获取到连接:",conn)
return conn,nil
default:
// 连接池中没有连接,新建连接
return pool.getNewConn(pool.ctx)
}
}
构造新连接,这里使用的是go自带的sql.open获取到的DB对象的conn作为连接,其实DB对象自己也是由维护了一个连接池的,这里只是作为练习,获取其中的连接
// 构造新连接
func (pool *Pool) getNewConn(ctx context.Context) (io.Closer, error) {
db,err := sql.Open("mysql","root:123@tcp(127.0.0.1:3306)/test?charset=utf8&parseTime=True")
if err != nil{
log.Fatal("数据库连接失败",err)
return nil, err
}
conn,_ := db.Conn(ctx)
select {
case pool.ConnChan <- conn:
fmt.Println("连接放入连接池")
default:
fmt.Println("连接池满了,连接丢弃")
conn.Close()
}
return conn,nil
}
释放连接
释放连接,如果连接池没满则直接放入原来的channel中,满了则直接close丢弃
// 释放连接
func(pool *Pool) ReleaseConn(conn io.Closer) error{
pool.Lock()
defer pool.Unlock()
if pool.IsClose == true{
return PoolIsClosedError
}
select {
case pool.ConnChan <- conn:
fmt.Println("连接已放回",conn)
default:
fmt.Println("连接池满了,连接丢弃")
conn.Close()
}
return nil
}
关闭连接池
关闭连接池除了需要修改连接池状态,关闭管道,还需要遍历管道中还存在的连接,将其一一关闭。
// 关闭连接池
func (pool *Pool) ClosePool() error{
pool.Lock()
defer pool.Unlock()
if pool.IsClose == true{
return PoolIsClosedError
}
pool.IsClose = true
close(pool.ConnChan)
for conn := range pool.ConnChan{
conn.Close()
}
return nil
}
测试
开启20个协程模拟从连接池获取连接和释放连接的过程:
func Test() {
wg := sync.WaitGroup{}
pool, err := New(10)
if err != nil {
log.Fatal(err)
}
wg.Add(20)
fmt.Println("开启20个协程获取连接")
for i := 0; i < 20; i++ {
go TestReleaseAndGetConn(pool,wg)
}
wg.Wait()
fmt.Println("main end")
}
func TestReleaseAndGetConn(pool *Pool,wg sync.WaitGroup) {
s := rand.Int63n(2)
time.Sleep(time.Duration(s) * time.Second)
conn,err := pool.GetConnFromPool()
if err != nil{
log.Fatal(err)
}
fmt.Println("连接池连接数:",len(pool.ConnChan))
time.Sleep(time.Duration(s) * time.Second)
pool.ReleaseConn(conn)
wg.Done()
}
结果:
完整代码
package go_mysql
import (
"context"
"database/sql"
"errors"
"fmt"
"io"
"log"
"math/rand"
"sync"
"time"
)
var (
PoolUnInvaildSizeError = errors.New("pool size is unvaild")
PoolIsClosedError = errors.New("pool had closed")
)
// 连接池定义
type Pool struct {
sync.Mutex // 保证连接池线程安全
Size int // 连接池连接数量
ConnChan chan io.Closer // 存储连接的管道
IsClose bool
ctx context.Context
}
// 初始化
func New(size int) (*Pool,error){
if size <= 0{
return nil,PoolUnInvaildSizeError
}
return &Pool{
ConnChan: make(chan io.Closer,size),
ctx: context.Background(),
},nil
}
// 获取连接
func(pool *Pool) GetConnFromPool() (io.Closer,error){
if pool.IsClose == true{
return nil,PoolIsClosedError
}
select {
case conn,ok := <- pool.ConnChan:
if !ok{
return nil,PoolIsClosedError
}
fmt.Println("获取到连接:",conn)
return conn,nil
default:
return pool.getNewConn(pool.ctx)
}
}
// 关闭连接池
func (pool *Pool) ClosePool() error{
pool.Lock()
defer pool.Unlock()
if pool.IsClose == true{
return PoolIsClosedError
}
pool.IsClose = true
close(pool.ConnChan)
for conn := range pool.ConnChan{
conn.Close()
}
return nil
}
// 释放连接
func(pool *Pool) ReleaseConn(conn io.Closer) error{
pool.Lock()
defer pool.Unlock()
if pool.IsClose == true{
return PoolIsClosedError
}
select {
case pool.ConnChan <- conn:
fmt.Println("连接已放回",conn)
default:
fmt.Println("连接池满了,连接丢弃")
conn.Close()
}
return nil
}
// 构造新连接
func (pool *Pool) getNewConn(ctx context.Context) (io.Closer, error) {
db,err := sql.Open("mysql","root:123@tcp(127.0.0.1:3306)/test?charset=utf8&parseTime=True")
if err != nil{
log.Fatal("数据库连接失败",err)
return nil, err
}
conn,_ := db.Conn(ctx)
select {
case pool.ConnChan <- conn:
fmt.Println("连接放入连接池")
default:
fmt.Println("连接池满了,连接丢弃")
conn.Close()
}
return conn,nil
}
func Test() {
wg := sync.WaitGroup{}
pool, err := New(10)
if err != nil {
log.Fatal(err)
}
wg.Add(20)
fmt.Println("开启20个协程获取连接")
for i := 0; i < 20; i++ {
go TestReleaseAndGetConn(pool,wg)
}
wg.Wait()
fmt.Println("main end")
}
func TestReleaseAndGetConn(pool *Pool,wg sync.WaitGroup) {
s := rand.Int63n(2)
time.Sleep(time.Duration(s) * time.Second)
conn,err := pool.GetConnFromPool()
if err != nil{
log.Fatal(err)
}
fmt.Println("连接池连接数:",len(pool.ConnChan))
time.Sleep(time.Duration(s) * time.Second)
pool.ReleaseConn(conn)
wg.Done()
}