pymongo
from pymongo import *class TestMongo(object): ''' 初始化函数 ''' def __init__(self,ip,port): self.clinet = MongoClient(ip, port) self.db=self.clinet["mytest"].plan pass def add_one(self): import time post={ "title":'标题', "content":'内容描述', "creatTime":time.time() } res=self.db.insert_one(post) return res pass def select_data(self,where): ''' 查询数据 :param where: :return: ''' return self.db.find(where) pass def update(self,where,content): self.db.update(where,{"$set":content})# print(__name__)if __name__=="__main__": obj=TestMongo('192.168.136.134',27017) obj.add_one() print('添加成功....') rs=obj.select_data({}) for item in rs: print(item)# 建立连接对象# try:# clinet=MongoClient('192.168.136.134',27017)# #创建数据库# db=clinet.mytest# colTable=db.plan# # 插入一条数据# # colTable.insert_one({'name':'狄仁杰','age':18,'gender':1})# # 插入多条数据# # colTable.insert_many([# # {'name': '狄仁杰1', 'age': 18, 'gender': 1},# # {'name': '狄仁杰2', 'age': 18, 'gender': 1},# # {'name': '狄仁杰3', 'age': 18, 'gender': 1},# # {'name': '狄仁杰4', 'age': 18, 'gender': 1},# # {'name': '狄仁杰5', 'age': 18, 'gender': 1},# # {'name': '狄仁杰6', 'age': 18, 'gender': 1},# # ])# # 更新一条# # colTable.update_one({'name': '狄仁杰1'},{"$set":{'age':20}})# # //更新多条# # colTable.update_many({'gender': 1}, {"$set": {'age': 20}})# # 删除多条# # colTable.delete_many({'gender': 1})# # 查询的使用# # res=colTable.find({"name":"狄仁杰1"})# res = colTable.find_one({"name": "狄仁杰1"})# print(res)# # for item in res:# # print(item)# # print('sucess')# except Exception as ex:# print(ex)
pymysql
# 安装一个mysql中间模块 pip install pymysqlfrom pymysql import *class TestMmysql(object): ''' 初始化函数 ''' def __init__(self,address,user,pwd,database): res = address.split(":") self.clinet = conn = connect(host=res[0], user=user, password=pwd, database=database, port=res[1], charset='utf8') self.sql=conn.cursor() #创建游标 pass def add_one(self): import time post="insert into tb_student(title,content,creaTime) values('标题','内容描述',{})".format(time.time()) self.sql.execute(post) self.sql.commit() return "success" pass def select_data(self,where): ''' 查询数据 :param where: :return: ''' res=sql.execute('select * from {}'.format(where)) sql.fetchall() return res pass def update(self,where,content): self.sql.execute('update tb_student set content={} where creaTime={}'.format(content,where)) self.sql.commit()# print(__name__)if __name__=="__main__": obj=TestMmysql('192.168.136.134',3389) try: obj.add_one() print('添加成功....') rs=obj.select_data('tb_student') for item in rs: print(item) except Exception as ex: print(ex) finally: sql.close() conn.close()
redis
# 安装一个redis中间模块 pip install redisfrom redis import *import timetry: # 首先创建一个StrictRedis对象,便可以使用这个对象来操作redis数据库了 # 参数 host:redis服务器的ip地址 # port 6379 # password 根据配置的需要 可选的操作 # db:[0-15] redisObj=StrictRedis(host="192.168.136.134",port=6379,db=1,password='111111') print("连接redis服务器成功") redisObj.set('hobby','打篮球') 添加字符串类型的数据 print(redisObj.get('hobby')) # 添加多个键值对 redisObj.mset({'name1':'AA','name2':'BB','addr':'china.shenzhen'}) rs=redisObj.mget('name1','name2','addr') print(rs) # 设定超时时间 redisObj.setex('class',5,'three.class') print(redisObj.get('class')) time.sleep(7) print(redisObj.get('class')) print(redisObj.keys()) redisObj.delete('name1','name2') print(redisObj.keys()) # list 列表的使用 redisObj.lpush('class','liyang','weo','fly') rs=redisObj.rpush('class', 'lifei88888') redisObj.lrem('class',1,'lifei88888') #删除一个数据 redisObj.lset('class',3,555) #对索引对应的值进行修改 result=redisObj.lrange('class',0,-1) print(redisObj.llen('class')) for index in range(3): print(redisObj.lpop('class')) #返回并删除键为 class 的列表中的首元素 print(result) redisObj.lpush('AA', '1', 2) print(redisObj.keys()) redisObj.delete('class')except Exception as ex: print(ex)