from sqlalchemy.orm import sessionmaker class Atomic: def __init__(self, db, session=None): self.session = session or sessionmaker(bind=db.engine)() # 此处 session 复用会导致事务异常 应创建 session def __enter__(self): return self.session def __exit__(self, exc_type, exc_value, exc_tb): if exc_tb is None: try: self.session.commit() except Exception as e: self.session.rollback() raise e finally: self.session.close() else: return False return True