django中ORM有三个类:
Manager:模型管理器,其中Manager类实现了和数据库之间的操作,Manager类中从QuerySet类中拷贝了许多表级方法,通过这些方法实现数据库的查询,并且将查询到的结果封装成一个Queryset对象返回,然后通过Queryset对象中的表级方法可以获取到我们需要的数据,如果我们不满足于现有的表级方法,可以通过重写Manager实现。
Queryset:查询结果集,同数据库的所有查询以及更新交互都是通过QuerySet来完成的。
Model:模型类,实现与数据库表的字段映射。
自定义Queryset
重写Queryset的update和delete方法可以帮助我们记录数据库的更新和删除
class BaseQuerySet(QuerySet):def update(self, **kwargs):# 重写QuerySet的update方法 记录数据库更新日志log_list = []for model in self:update_log = dict()name = model.__module__.split('.')[-1]update_log['table_name'] = nameupdate_log[name + '_pk'] = model.pkupdate_before = {} # 更新之前update_after = {} # 更新之后for key, value in kwargs.items():if not hasattr(model, key):continuemodel_value = getattr(model, key)if isinstance(model_value, decimal.Decimal): # 取出的类型有可能mongodb无法存储update_before[key] = float(model_value)else:update_before[key] = model_valueif isinstance(value, models.Model): # 判断key是否为外键value = value.pkupdate_after[key] = valueif key == 'update_operator' or key == 'update_operator_id':update_log['update_operator'] = valueif [i for i in log_handler.log.important_table if i.match(name)]:update_log['update_before_all_info'] = str(model_to_dict(model))update_log['update_before_info'] = str(update_before)update_log['update_after_info'] = str(update_after)update_log['operating'] = 'update'log_list.append(update_log)result = super(BaseQuerySet, self).update(**kwargs)log_handler.log.record_business_log(log_list)return resultdef delete(self):log_list = []for info in self:delete_log = dict()name = info.__module__.split('.')[-1]delete_log['table_name'] = namedelete_log[name + '_pk'] = info.pkdelete_info = dict()for k, v in info.__dict__.items():if k == '_state':continuev = mysql_conversion_mongo_utils.conversion(v)delete_info[k] = vdelete_log['delete_info'] = delete_infodelete_log['delete_datetime'] = datetime.now()delete_log['operating'] = 'delete'log_list.append(delete_log)log_handler.log.record_business_log(log_list=log_list)return super(BaseQuerySet, self).delete()
自定义Manager
可以自定义Manager类来实现自己的方法,简化数据库的DML操作
class Manager(DjangoManager.from_queryset(BaseQuerySet)):"""继承 BaseQuerySet将QuerySet的方法当参数传给了 Manager"""passclass BaseManager(Manager):"""通用的orm管理器"""def get_one_object(self, db_select=None, **kwargs):"""base manager 获取一条数据:param db_select: 为数据库分离做准备:param kwargs: 查询条件 k=v:return:"""try:k_obj = self.db_manager(db_select).get(**kwargs)except (ObjectDoesNotExist, MultipleObjectsReturned) as e:k_obj = Noneexcept Exception as e:print(e)raise EtError(type=EtDBError.read_error, value='获取一条数据连接数据库有误')return k_objdef create_one_object(self, create_info=None, db_select=None, **kwargs):"""base manager 添加一个对象 判断传来的参数是否为该管理器对应模型类的字段:param db_select: 为数据库分离做准备:param kwargs: k=v:return:"""create_info = create_info if type(create_info) == dict else dict()res_attr_list = self.__get_all_attr_list()kwg_copy = copy.deepcopy(create_info)# 如果不是管理器对应模型类的字段则删除for kw in create_info:if kw not in res_attr_list:kwg_copy.pop(kw)# 创建对象ftry:k_obj = self.db_manager(db_select).model(**kwg_copy)k_obj.save(force_insert=True)except Exception as e:print(e)k_obj = Nonereturn k_objdef update_object(self, filters=None, excludes=None, updates=None, db_select=None, **kwargs):"""base manager 根据条件更新数据库的参数update info:param filters: 查询条件:param db_select: 为数据库分离做准备:param exclude: 查询条件:param update: 更新的内容:return:"""filters = filters if type(filters) == dict else dict()excludes = excludes if type(excludes) == dict else dict()updates = updates if type(updates) == dict else dict()if len(filters) == 0 and len(excludes) == 0:return Noneif len(updates) == 0:return Nonetry:k_objects = self.db_manager(db_select).filter(**filters).exclude(**excludes).update(**updates)except Exception as e:print(e)k_objects = Nonereturn k_objectsdef get_filter_list(self, filters=None, excludes=None, order_by=('-pk',), db_select=None, **kwargs):"""base manager 批量获取数据:param filters: 查询条件:param excludes: 取反条件:param db_select: 为数据库分离做准备:param order_by: 排序提交:param kwargs::return:"""filters = filters if type(filters) == dict else dict()excludes = excludes if type(excludes) == dict else dict()try:select_related = kwargs.pop('select_related')except KeyError as e:select_related = (None,)try:only = kwargs.pop('only')k_object_list = self.db_manager(db_select).filter(**filters).exclude(**excludes).order_by(*order_by).select_related(*select_related).only(*only)except KeyError as e:k_object_list = self.db_manager(db_select).filter(**filters).exclude(**excludes).order_by(*order_by).select_related(*select_related)except Exception as e:print(e)k_object_list = Nonereturn k_object_listdef get_filter_values(self, q_objs=None, filters=None, e_q_objs=None, excludes=None, order_by=None, values=None, db_select=None):"""wangge只查询某些字段,获取到的是QuerySet,跨表查询用的是 Inner Join:param q_objs: 元组形式的查询条件:param filters: 字典形式的查询条件:param e_q_objs: 元组形式的排除查询条件:param excludes: 排除条件:param order_by: 排序条件:param values: 只查询的某些字段:param db_select::return: 返回值是QuerySet,元素是字典,查询报错返回 None"""filters = filters if isinstance(filters, dict) else dict()excludes = excludes if isinstance(excludes, dict) else dict()q_objs = tuple(q_objs) if isinstance(q_objs, (tuple, list, set)) else tuple()e_q_objs = tuple(e_q_objs) if isinstance(e_q_objs, (tuple, list, set)) else tuple()order_by = tuple(order_by) if isinstance(order_by, (tuple, list, set)) else tuple()values = tuple(values) if isinstance(values, (tuple, list, set)) else tuple()try:dict_list = self.db_manager(db_select).filter(*q_objs, **filters).exclude(*e_q_objs, **excludes).order_by(*order_by).values(*values)except Exception as e:print(e)dict_list = Nonereturn dict_listdef get_filter_and_fuzzy_queries_list(self, filters=None, q_objs=(), e_q_objs=(), excludes=None, order_by=None, db_select=None, **kwargs):"""过滤出查询结果后,对结果进行模糊查询,再将结果返回:param filters: 过滤条件 --> 字典:param excludes: 排除条件 --> 字典:param q_objs: 查询条件 --> Q对象:param e_q_objs: 排除条件 --> Q对象:param order_by: 排序条件:param db_select: 数据库选择:param kwargs: 预留参数:return:"""filters = filters if type(filters) == dict else dict()excludes = excludes if isinstance(excludes, dict) else dict()if not order_by:order_by = ('-pk',)try:k_object_list = self.db_manager(db_select).filter(**filters).filter(*q_objs).exclude(*e_q_objs, **excludes).order_by(*order_by)except Exception as e:print(e)k_object_list = Nonereturn k_object_listdef get_count(self, condition=None, db_select=None):"""manager 获取count:param db_select: 为数据库分离做准备:param condition::return:"""condition = condition if type(condition) == dict else dict()try:count = self.db_manager(db_select).filter(**condition).count()except Exception as e:count = Noneprint(e)return countdef delete_mul_obj(self, filters, excludes, db_select, **kwargs):"""删除数据库:param filters::param excludes::param db_select::param kwargs::return:"""filters = filters if type(filters) == dict else dict()excludes = excludes if type(excludes) == dict else dict()if len(filters) == 0 and len(excludes) == 0:return Nonetry:res = self.db_manager(db_select).filter(**filters).exclude(**excludes).delete()except Exception as e:print(e)res = Nonereturn resdef del_info(self, condition, db_select=None, **kwargs):"""删除一些临时数据:param condition::param db_select::param kwargs::return:"""condition = condition if type(condition) == dict else dict()if len(condition) == 0:return 0try:result = self.db_manager(db_select).filter(**condition).delete()result = result[0]except Exception as e:print(e)result = 0return resultclass BaseModel(models.Model):objects = BaseManager()def __getitem__(self, item):if hasattr(self, item):return getattr(self, item)else:return Noneclass Meta:abstract = True
