当先锋百科网

首页 1 2 3 4 5 6 7

Go-MySQL(二)Go实现MySQL连接池

连接池数据结构

利用channel来存储数据库连接,消费channel中的消息获取连接,连接池未满时则新建连接后将连接放入channel,采用的带缓冲区的channel,缓冲区大小就是连接池的最大容纳的连接数,如果缓冲区还有空间,那么获取和释放连接都不会阻塞,如果缓冲区为空,那么就是阻塞连接获取,从而走新建连接的逻辑;同理,缓冲区满了,就阻塞向channel放入连接的过程,需要先消费.

var (
	PoolUnInvaildSizeError = errors.New("pool size is unvaild")
	PoolIsClosedError = errors.New("pool had closed")
)

// 连接池定义
type Pool struct {
	sync.Mutex 				// 保证连接池线程安全
	Size int				// 连接池连接数量
	ConnChan chan io.Closer // 存储连接的管道
	IsClose bool
	ctx context.Context
}

初始化连接池:

// 初始化
func New(size int) (*Pool,error){
	if size <= 0{
		return nil,PoolUnInvaildSizeError
	}
	return &Pool{
		ConnChan: make(chan io.Closer,size),
		ctx: context.Background(),
	},nil
}

获取连接

获取连接:

  • 从管道中消费,如果没有连接则新建连接
  • 新建连接后放入连接池管道中
// 获取连接
func(pool *Pool) GetConnFromPool() (io.Closer,error){
	if pool.IsClose == true{
		return nil,PoolIsClosedError
	}
	select {
	// 从管道中消费
	case conn,ok := <- pool.ConnChan:
		if !ok{
			return nil,PoolIsClosedError
		}
		fmt.Println("获取到连接:",conn)
		return conn,nil
	default:
		// 连接池中没有连接,新建连接
		return pool.getNewConn(pool.ctx)
	}
}

构造新连接,这里使用的是go自带的sql.open获取到的DB对象的conn作为连接,其实DB对象自己也是由维护了一个连接池的,这里只是作为练习,获取其中的连接

// 构造新连接
func (pool *Pool) getNewConn(ctx context.Context) (io.Closer, error) {
	db,err := sql.Open("mysql","root:123@tcp(127.0.0.1:3306)/test?charset=utf8&parseTime=True")
	if err != nil{
		log.Fatal("数据库连接失败",err)
		return nil, err
	}
	conn,_ := db.Conn(ctx)
	select {
	case pool.ConnChan <- conn:
		fmt.Println("连接放入连接池")
	default:
		fmt.Println("连接池满了,连接丢弃")
		conn.Close()
	}
	return conn,nil
}

释放连接

释放连接,如果连接池没满则直接放入原来的channel中,满了则直接close丢弃

// 释放连接
func(pool *Pool) ReleaseConn(conn io.Closer) error{
	pool.Lock()
	defer pool.Unlock()

	if pool.IsClose == true{
		return PoolIsClosedError
	}

	select {
		case pool.ConnChan <- conn:
			fmt.Println("连接已放回",conn)
		default:
			fmt.Println("连接池满了,连接丢弃")
			conn.Close()
	}
	return nil
}

关闭连接池

关闭连接池除了需要修改连接池状态,关闭管道,还需要遍历管道中还存在的连接,将其一一关闭。

// 关闭连接池
func (pool *Pool) ClosePool() error{
	pool.Lock()
	defer pool.Unlock()

	if pool.IsClose == true{
		return PoolIsClosedError
	}

	pool.IsClose = true
	close(pool.ConnChan)

	for conn := range pool.ConnChan{
		conn.Close()
	}
	return nil
}

测试

开启20个协程模拟从连接池获取连接和释放连接的过程:

func Test() {
	wg := sync.WaitGroup{}
	pool, err := New(10)
	if err != nil {
		log.Fatal(err)
	}
	wg.Add(20)
	fmt.Println("开启20个协程获取连接")
	for i := 0; i < 20; i++ {
		go TestReleaseAndGetConn(pool,wg)
	}
	wg.Wait()
	fmt.Println("main end")
}

func TestReleaseAndGetConn(pool *Pool,wg sync.WaitGroup)  {
	s := rand.Int63n(2)
	time.Sleep(time.Duration(s) * time.Second)
	conn,err := pool.GetConnFromPool()
	if err != nil{
		log.Fatal(err)
	}
	fmt.Println("连接池连接数:",len(pool.ConnChan))
	time.Sleep(time.Duration(s) * time.Second)
	pool.ReleaseConn(conn)
	wg.Done()
}

结果:

在这里插入图片描述

完整代码

package go_mysql

import (
	"context"
	"database/sql"
	"errors"
	"fmt"
	"io"
	"log"
	"math/rand"
	"sync"
	"time"
)

var (
	PoolUnInvaildSizeError = errors.New("pool size is unvaild")
	PoolIsClosedError = errors.New("pool had closed")
)

// 连接池定义
type Pool struct {
	sync.Mutex 				// 保证连接池线程安全
	Size int				// 连接池连接数量
	ConnChan chan io.Closer // 存储连接的管道
	IsClose bool
	ctx context.Context
}

// 初始化
func New(size int) (*Pool,error){
	if size <= 0{
		return nil,PoolUnInvaildSizeError
	}
	return &Pool{
		ConnChan: make(chan io.Closer,size),
		ctx: context.Background(),
	},nil
}

// 获取连接
func(pool *Pool) GetConnFromPool() (io.Closer,error){
	if pool.IsClose == true{
		return nil,PoolIsClosedError
	}
	select {
	case conn,ok := <- pool.ConnChan:
		if !ok{
			return nil,PoolIsClosedError
		}
		fmt.Println("获取到连接:",conn)
		return conn,nil
	default:
		return pool.getNewConn(pool.ctx)
	}
}

// 关闭连接池
func (pool *Pool) ClosePool() error{
	pool.Lock()
	defer pool.Unlock()

	if pool.IsClose == true{
		return PoolIsClosedError
	}

	pool.IsClose = true
	close(pool.ConnChan)

	for conn := range pool.ConnChan{
		conn.Close()
	}
	return nil
}

// 释放连接
func(pool *Pool) ReleaseConn(conn io.Closer) error{
	pool.Lock()
	defer pool.Unlock()

	if pool.IsClose == true{
		return PoolIsClosedError
	}

	select {
		case pool.ConnChan <- conn:
			fmt.Println("连接已放回",conn)
		default:
			fmt.Println("连接池满了,连接丢弃")
			conn.Close()
	}
	return nil
}

// 构造新连接
func (pool *Pool) getNewConn(ctx context.Context) (io.Closer, error) {
	db,err := sql.Open("mysql","root:123@tcp(127.0.0.1:3306)/test?charset=utf8&parseTime=True")
	if err != nil{
		log.Fatal("数据库连接失败",err)
		return nil, err
	}
	conn,_ := db.Conn(ctx)
	select {
	case pool.ConnChan <- conn:
		fmt.Println("连接放入连接池")
	default:
		fmt.Println("连接池满了,连接丢弃")
		conn.Close()
	}
	return conn,nil
}

func Test() {
	wg := sync.WaitGroup{}
	pool, err := New(10)
	if err != nil {
		log.Fatal(err)
	}
	wg.Add(20)
	fmt.Println("开启20个协程获取连接")
	for i := 0; i < 20; i++ {
		go TestReleaseAndGetConn(pool,wg)
	}
	wg.Wait()
	fmt.Println("main end")
}

func TestReleaseAndGetConn(pool *Pool,wg sync.WaitGroup)  {
	s := rand.Int63n(2)
	time.Sleep(time.Duration(s) * time.Second)
	conn,err := pool.GetConnFromPool()
	if err != nil{
		log.Fatal(err)
	}
	fmt.Println("连接池连接数:",len(pool.ConnChan))
	time.Sleep(time.Duration(s) * time.Second)
	pool.ReleaseConn(conn)
	wg.Done()
}