这里只实现了部分功能,可以基于此工具类,加入更多功能,如事物、锁等。
# db.pyimport pymysqlfrom dbutils.pooled_db import PooledDBclass DBHelper(object):def __init__(self):# TODO 此处配置,可以去配置文件中读取。self.pool = PooledDB(creator=pymysql, # 使用链接数据库的模块maxconnections=5, # 连接池允许的最大连接数,0和None表示不限制连接数mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建maxcached=3, # 链接池中最多闲置的链接,0和None不限制blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]ping=0,# ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = alwayshost='127.0.0.1',port=3306,user='root',password='root',database='db1',charset='utf8')def get_conn_cursor(self):conn = self.pool.connection()cursor = conn.cursor(pymysql.cursors.DictCursor)return conn, cursordef close_conn_cursor(self, *args):for item in args:item.close()def exec(self, sql, **kwargs):conn, cursor = self.get_conn_cursor()cursor.execute(sql, kwargs)conn.commit()self.close_conn_cursor(conn, cursor)def fetch_one(self, sql, **kwargs):conn, cursor = self.get_conn_cursor()cursor.execute(sql, kwargs)result = cursor.fetchone()self.close_conn_cursor(conn, cursor)return resultdef fetch_all(self, sql, **kwargs):conn, cursor = self.get_conn_cursor()cursor.execute(sql, kwargs)result = cursor.fetchall()self.close_conn_cursor(conn, cursor)return resultdb = DBHelper()
from db import dbdb.exec("insert into d1(name) values(%(name)s)", name="武沛齐666")ret = db.fetch_one("select * from d1")print(ret)ret = db.fetch_one("select * from d1 where id=%(nid)s", nid=3)print(ret)ret = db.fetch_all("select * from d1")print(ret)ret = db.fetch_all("select * from d1 where id>%(nid)s", nid=2)print(ret)
上下文管理
如果你想要让他也支持 with 上下文管理。
好处:多个SQL共用同一个连接,进一步提升性能。
with 获取连接:执行SQL(执行完毕后,自动将连接交还给连接池)
# db_context.pyimport threadingimport pymysqlfrom dbutils.pooled_db import PooledDBPOOL = PooledDB(creator=pymysql, # 使用链接数据库的模块maxconnections=5, # 连接池允许的最大连接数,0和None表示不限制连接数mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建maxcached=3, # 链接池中最多闲置的链接,0和None不限制blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]ping=0,host='127.0.0.1',port=3306,user='root',password='root',database='db1',charset='utf8')class Connect(object):def __init__(self):self.conn = conn = POOL.connection()self.cursor = conn.cursor(pymysql.cursors.DictCursor)def __enter__(self):return selfdef __exit__(self, exc_type, exc_val, exc_tb):self.cursor.close()self.conn.close()def exec(self, sql, **kwargs):self.cursor.execute(sql, kwargs)self.conn.commit()def fetch_one(self, sql, **kwargs):self.cursor.execute(sql, kwargs)result = self.cursor.fetchone()return resultdef fetch_all(self, sql, **kwargs):self.cursor.execute(sql, kwargs)result = self.cursor.fetchall()return result
from db_context import Connectwith Connect() as obj:# print(obj.conn)# print(obj.cursor)ret = obj.fetch_one("select * from d1")print(ret)ret = obj.fetch_one("select * from d1 where id=%(id)s", id=3)print(ret)
