from datetime import datetime import time from nc_http.tools.helpers import camelize from sqlalchemy.orm.exc import NoResultFound from commons.models.atomic import Atomic class BaseModel: query = None _unique_fields = [] _db = None __table__ = None @classmethod def atomic(cls): return Atomic(db=cls._db) @classmethod def get_by_id(cls, id_, session=None): if session: query = session.query(cls) else: query = cls.query result = query.filter((getattr(cls, 'id', None) or cls.__table__.primary_key) == id_).one_or_none() return result @classmethod def insert_many(cls, dicts, commit=True, session=None): session = session or cls._db.session() session.bulk_insert_mappings(cls, dicts) if commit: session.commit() @classmethod def insert_one(cls, values, commit=True, session=None, return_obj=False): session = session or cls._db.session() if not return_obj: session.bulk_insert_mappings(cls, [values]) else: obj = cls() for key, value in values.items(): setattr(obj, key, value) session.add(obj) if commit: session.commit() if return_obj: return obj return @classmethod def update_one(cls, values, commit=True, session=None): session = session or cls._db.session() session.bulk_update_mappings(cls, [values]) if commit: session.commit() @classmethod def update_many(cls, dicts, commit=True, session=None): session = session or cls._db.session() session.bulk_update_mappings(cls, dicts) if commit: session.commit() def to_dict(self, is_camelized=True): _ = {} for column in self.__table__.columns: key = getattr(column, 'quote', None) or column.name if is_camelized: _[camelize(key)] = getattr(self, key, None) else: _[key] = getattr(self, key, None) return _ @classmethod def select_one(cls, query, fields=None): try: result = query.one() except NoResultFound: return {} if not result: return {} return dict(zip([i.key for i in fields], result)) @classmethod def test_one(cls): result = cls.query.first() return result.to_dict() @classmethod def now(cls): return int(time.time()) @classmethod def now_datetime(cls): return datetime.fromtimestamp(int(time.time())) @classmethod def get_all(cls): objects = cls.query.all() for obj in objects: yield obj.to_dict() def update(self, commit=True, session=None, **kwargs): for key, value in kwargs.items(): if hasattr(self, key): setattr(self, key, value) if commit: session = session or self._db.session() session.commit() return def delete(self, session=None): session = session or self._db.session() session.delete(self) @classmethod def get_list(cls, query, paging=None, fields=None): if fields: query = query.with_entities(*fields.values()) if paging: page = query.paginate(paging['page'], paging['limit']) paging['total'] = page.total result = page.items else: result = query.all() if fields: columns = fields.keys() result = [dict(zip(columns, _)) for _ in result] else: result = [_.to_dict() for _ in result] if paging: return result, paging return result @classmethod def set_order_by(cls, query, sorting=None): """ 配置查询结果排序 :param query: :param sorting: :return: """ if not sorting or not isinstance(sorting, dict): return query order_field = sorting.get('order_field') if not order_field: return query order = sorting.get('order') if order == 'desc': query = query.order_by(getattr(cls, order_field).desc()) else: query = query.order_by(getattr(cls, order_field)) return query