使用方法:
在Vscode中,会有代码提示,很容易看懂
import mysqldbdb = mysqldb.connect_db(mysqldb.conn_confing)# 执行sql,会有提示db.query("update table1 set col='' ")# 获取多个结果,返回 dictrows = db.fetch_rows(table='table')
源代码 mysqldb.py
from pymysql import (connect, cursors, err, escape_sequence)conn_confing = {'host': 'localhost','port': '3306','user': 'root','password': 'root','db': 'your_db','charset': 'utf-8'}def connect_db(conn_confing):# msyql dababase connection infodbconn = MYSQL(dbhost=conn_confing.get('host'),dbport=conn_confing.get('port'),dbuser=conn_confing.get('user'),dbpwd=conn_confing.get('password'),dbname=conn_confing.get('db'),dbcharset=conn_confing.get('charset'))return dbconndef connect_ssdc(conn_confing):"""Connect to the database return SSDictCursor dbsession"""connection = connect(host=conn_confing.get('host'),port=int(conn_confing.get('port')) or 3306,user=conn_confing.get('user'),password=conn_confing.get('password'),db=conn_confing.get('db'),charset=conn_confing.get('charset'),cursorclass=cursors.SSDictCursor)return connectionclass MYSQL:"""A Friendly pymysql Class, Provide CRUD functionality"""def __init__(self, dbhost, dbuser, dbpwd, dbname, dbcharset='utf-8', dbport=3306,):self.dbhost = dbhostself.dbport = int(dbport)self.dbuser = dbuserself.dbpwd = dbpwdself.dbname = dbnameself.dbcharset = dbcharsetself.connection = self.session()def session(self):"""Connect to the database return dbsession"""connection = connect(host=self.dbhost,port=self.dbport,user=self.dbuser,password=self.dbpwd,db=self.dbname,# charset=self.dbcharset,cursorclass=cursors.DictCursor)return connectiondef insert(self, table, data):"""mysql insert() function"""with self.connection.cursor() as cursor:params = self.join_field_value(data)sql = "INSERT IGNORE INTO {table} SET {params}".format(table=table, params=params)# print(sql)try:cursor.execute(sql, tuple(data.values()))last_id = cursor.lastrowidself.connection.commit()except Exception as err:print(err)return last_iddef bulk_insert(self, table, data):assert isinstance(data, list) and data != [], "data_format_error"with self.connection.cursor() as cursor:params = []for param in data:params.append(escape_sequence(param.values(), 'utf-8'))values = ', '.join(params)fields = ', '.join('`{}`'.format(x) for x in param.keys())sql = u"INSERT IGNORE INTO {table} ({fields}) VALUES {values}".format(fields=fields, table=table, values=values)cursor.execute(sql)last_id = cursor.lastrowidself.connection.commit()return last_iddef delete(self, table, condition=None, limit=None):"""mysql delete() functionsql.PreparedStatement method"""with self.connection.cursor() as cursor:prepared = []if not condition:where = '1'elif isinstance(condition, dict):where = self.join_field_value(condition, ' AND ')prepared.extend(condition.values())else:where = conditionlimits = "LIMIT {limit}".format(limit=limit) if limit else ""sql = "DELETE FROM {table} WHERE {where} {limits}".format(table=table, where=where, limits=limits)if not prepared:result = cursor.execute(sql)else:result = cursor.execute(sql, tuple(prepared))self.connection.commit()return resultdef update(self, table, data, condition=None):"""mysql update() functionUse sql.PreparedStatement method"""with self.connection.cursor() as cursor:prepared = []params = self.join_field_value(data)prepared.extend(data.values())if not condition:where = '1'elif isinstance(condition, dict):where = self.join_field_value(condition, ' AND ')prepared.extend(condition.values())else:where = conditionsql = "UPDATE IGNORE {table} SET {params} WHERE {where}".format(table=table, params=params, where=where)# check PreparedStatementif not prepared:result = cursor.execute(sql)else:result = cursor.execute(sql, tuple(prepared))self.connection.commit()return resultdef count(self, table, condition=None):"""count database recordUse sql.PreparedStatement method"""with self.connection.cursor() as cursor:prepared = []if not condition:where = '1'elif isinstance(condition, dict):where = self.join_field_value(condition, ' AND ')prepared.extend(condition.values())else:where = conditionsql = "SELECT COUNT(*) as cnt FROM {table} WHERE {where}".format(table=table, where=where)if not prepared:cursor.execute(sql)else:cursor.execute(sql, tuple(prepared))self.connection.commit()return cursor.fetchone().get('cnt')def fetch_rows(self, table, fields=None, condition=None, order=None, limit=None, fetchone=False):"""mysql select() functionUse sql.PreparedStatement method"""with self.connection.cursor() as cursor:prepared = []if not fields:fields = '*'elif isinstance(fields, tuple) or isinstance(fields, list):fields = '`{0}`'.format('`, `'.join(fields))else:fields = fieldsif not condition:where = '1'elif isinstance(condition, dict):where = self.join_field_value(condition, ' AND ')prepared.extend(condition.values())else:where = conditionif not order:orderby = ''else:orderby = 'ORDER BY {order}'.format(order=order)limits = "LIMIT {limit}".format(limit=limit) if limit else ""sql = "SELECT {fields} FROM {table} WHERE {where} {orderby} {limits}".format(fields=fields, table=table, where=where, orderby=orderby, limits=limits)if not prepared:cursor.execute(sql)else:cursor.execute(sql, tuple(prepared))self.connection.commit()return cursor.fetchone() if fetchone else cursor.fetchall()def query(self, sql, fetchone=False, execute=False):"""execute custom sql query"""with self.connection.cursor() as cursor:cursor.execute(sql)self.connection.commit()if execute:returnreturn cursor.fetchone() if fetchone else cursor.fetchall()def join_field_value(self, data, glue=', '):sql = comma = ''for key in data.keys():sql += "{}`{}` = %s".format(comma, key)comma = gluereturn sqldef close(self):if getattr(self, 'connection', 0):return self.connection.close()def __del__(self):"""close mysql database connection"""self.close()
