本文目录
1、pythondbutils数据库连接池(代码片段)
2、连接池
一、连接池的作用
Python可以使用MySQLdb进行数据库的连接及查询/插入/更新等操作,但是每次连接MySQL数据库请求时,都是独立的去请求访问,相当浪费资源而且访问数量达到一定数量时,对mysql的性能会产生较大的影响。因此,实际使用中通常会使用数据库的连接池技术,来访问数据库达到资源复用的目的
二、什么是DBUtils
DBUtils是一套Python数据库连接池包,并允许对非线程安全的数据库接口进行线程安全包装
DBUtils提供两种外部接口:
- PersistentDB:提供线程专用的数据库连接,并自动管理连接
- PooledDB:提供线程间可共享的数据库连接,并自动管理连接
需要安装的库
1. DBUtils
pip install DBUtils==1.2
2. pymysql
pip install pymysql
三、DBUtils简单实用
3.1 PersistentDB
每个线程创立一个连接(不推荐)
from DBUtils.PersistentDB import PersistentDB
import pymysql
POOL = PersistentDB(
creator=pymysql, # 使用链接数据库的模块
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。
ping=0,
# ping MySQL服务端,检查是否服务可用。
closeable=False,
# 如果为False时, conn.close() 实际上被忽略,供下次使用,再线程关闭时,才会自动关闭链接。如果为True时, conn.close()则关闭链接,那么再次调用pool.connection时就会报错,因为已经真的关闭了连接(pool.steady_connection()可以获取一个新的链接)
threadlocal=None, # 本线程独享值得对象,用于保存链接对象,如果链接对象被重置
host='127.0.0.1',
port=3306,
user='root',
password='123456',
database='test',
charset='utf8'
)
def func():
conn = POOL.connection(shareable=False)
cursor = conn.cursor()
cursor.execute('select * from user')
result = cursor.fetchall()
print(result)
cursor.close()
conn.close()
if __name__ == '__main__':
func()
POOL = PersistentDB()
- 实例化池对象,写入池的配置信息,为每个线程创立连接conn = POOL.connection()
- 池连接conn.cursor().execute('select * from user')
- 执行SQL语句conn.cursor().fetchall()
- 获取执行SQL后的返回conn.close()
- 未关闭连接,只是将连接放回池子,仅供自己的线程再次使用,当线程终止时连接自动关闭
3.2 PooledDB
创建一批连接,供所有线程共享使用(推荐)
import pymysql
from DBUtils.PooledDB import PooledDB
POOL = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
maxshared=3,
# 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。
ping=0,
# ping MySQL服务端,检查是否服务可用。
host='127.0.0.1',
port=3306,
user='root',
password='123456',
database='test',
charset='utf8'
)
def func():
# 检测当前正在运行连接数的是否小于最大链接数,如果不小于则:等待或报raise TooManyConnections异常
# 否则 则优先去初始化时创建的链接中获取链接 SteadyDBConnection。
# 然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。
# 如果最开始创建的链接没有链接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回。
# 一旦关闭链接后,连接就返回到连接池让后续线程继续使用。
conn = POOL.connection()
# print(th, '链接被拿走了', conn1._con)
# print(th, '池子里目前有', pool._idle_cache, '\\r\\n')
cursor = conn.cursor()
cursor.execute('select * from user')
result = cursor.fetchall()
print(result)
conn.close()
if __name__ == '__main__':
func()
POOL = PooledDB()
- 实例化池对象,写入池的配置信息;创建一批连接放入连接池,共享使用conn = POOL.connection()
- 池连接conn.cursor().execute('select * from user')
- 执行SQL语句conn.cursor().fetchall()
- 获取执行SQL后的返回conn.close()
- 未关闭连接,只是将连接放回池子,供所有线程共享使用,当线程终止时连接自动关闭
PersistentDB和PooledDB两个模块的接口和参数非常相似,上面的代码把PersistentDB互换PooledDB完全没有问题,只要别出线程安全的问题就行
三、详细封装
- dbConfig.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author: lluozh
# @Date : 2021-07-25
# @Desc : mysql数据库配置
import pymysql
#TEST数据库信息
def dbConfig():
DB_TEST_HOST="127.0.0.1";
DB_TEST_PORT=3306;
DB_TEST_DBNAME="lluozh";
DB_TEST_USER="root";
DB_TEST_PASSWORD="123456";
# 数据库连接编码
DB_CHARSET = "utf8"
# mincached : 启动时开启的闲置连接数量(缺省值 0 开始时不创建连接)
DB_MIN_CACHED = 10
# maxcached : 连接池中允许的闲置的最多连接数量(缺省值 0 代表不闲置连接池大小)
DB_MAX_CACHED = 10
# maxshared : 共享连接数允许的最大数量(缺省值 0 代表所有连接都是专用的)如果达到了最大数量,被请求为共享的连接将会被共享使用
DB_MAX_SHARED = 20
# maxconnecyions : 创建连接池的最大数量(缺省值 0 代表不限制)
DB_MAX_CONNECYIONS = 100
# blocking : 设置在连接池达到最大数量时的行为(缺省值 0 或 False 代表返回一个错误<toMany......> 其他代表阻塞直到连接数减少,连接被分配)
DB_BLOCKING = True
# maxusage : 单个连接的最大允许复用次数(缺省值 0 或 False 代表不限制的复用).当达到最大数时,连接会自动重新连接(关闭和重新打开)
DB_MAX_USAGE = 0
# setsession : 一个可选的SQL命令列表用于准备每个会话,如["set datestyle to german", ...]
DB_SET_SESSION = None
# ping MySQL服务端,检查是否服务可用
DB_PING = 0
# creator : 使用连接数据库的模块
DB_CREATOR = pymysql
- dbPool.py
from DBUtils.PooledDB import PooledDB
from config.dbConfig import DB_CONFIG,dbConfig
'''
数据库连接池
'''
PTConnectionPool = PooledDB(
creator=DB_CONFIG.DB_CREATOR, # 使用链接数据库的模块
maxconnections=DB_CONFIG.DB_MAX_CONNECYIONS, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=DB_CONFIG.DB_MIN_CACHED, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=DB_CONFIG.DB_MIN_CACHED, # 链接池中最多闲置的链接,0和None不限制
maxshared=DB_CONFIG.DB_MAX_SHARED, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的threadsafety都为1,所有值无论设置为多少_maxcached永远为0,所以永远是所有链接都共享
blocking=DB_CONFIG.DB_BLOCKING, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=DB_CONFIG.DB_MAX_USAGE, # 一个链接最多被重复使用的次数,None表示无限制
setsession=DB_CONFIG.DB_SET_SESSION, # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=DB_CONFIG.DB_PING, # ping MySQL服务端,检查是否服务可用 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
**dbConfig()
)
- sqlUtil.py
import os
import pymysql
import traceback
from util.dbPool import PTConnectionPool
from pymysql import OperationalError, InternalError
class SQLUtil(object):
@staticmethod
def open():
# POOL = PTConnectionPool.PYMYSQL_POOL
try:
conn = PTConnectionPool.connection()
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
return conn, cursor
except Exception as e:
traceback.print_exc()
@classmethod
def close(cls, conn, cursor):
try:
conn.commit()
cursor.close()
conn.close()
except Exception as e:
traceback.print_exc()
@staticmethod
def close_only(conn, cursor):
try:
cursor.close()
conn.close()
except Exception as e:
traceback.print_exc()
@classmethod
def operate(cls, sql, param, reConn=0):
"""
数据库操作
:param sql:
:param param:
:param reConn:
:return:
"""
try:
conn, cursor = cls.open()
cursor.execute(sql, param)
logger.info(sql + str(param))
cls.close(conn, cursor)
except (OperationalError, InternalError):
conn.rollback()
if reConn <= 2:
reConn+=1
cls.operate(sql, param, reConn=reConn)
except Exception as e:
conn.rollback()
traceback.print_exc()
@classmethod
def query(cls, sql, param, reConn=0):
try:
conn, cursor = cls.open()
cursor.execute(sql, param)
result = cursor.fetchall()
cls.close(conn, cursor)
return result
except (OperationalError, InternalError) as e:
if reConn <= 2:
reConn += 1
cls.query(sql, param, reConn=reConn)
except Exception as e:
traceback.print_exc()
@classmethod
def query_one(cls, sql, param, reConn=0):
try:
conn, cursor = cls.open()
cursor.execute(sql, param)
result = cursor.fetchone()
cls.close(conn, cursor)
return result
except (OperationalError, InternalError) as e:
if reConn <= 2:
reConn += 1
cls.query_one(sql, param, reConn=reConn)
except Exception as e:
traceback.print_exc()
连接池