import json import traceback from nc_http.core import Response from pydantic import BaseModel, Field from api.blueprint import data from commons.constants.material_task import MaterialTaskStatus from commons.models.material_task import MaterialTask from commons.services.data import DataService from core.extensions import siwa from tasks.once.calculate import calculate from tasks.once.collect import collect @data.route('/task//refresh', methods=['GET']) @siwa.doc(tags=[""], summary='刷新采集任务') def refresh_task(task_id): with MaterialTask.atomic() as session: data = MaterialTask.get_by_id(task_id, session) if not data: pass data.status = MaterialTaskStatus.DOING session.flush() try: content = DataService.get_content(type=data.type) data.content = json.dumps(content, ensure_ascii=False) data.status = MaterialTaskStatus.DONE session.flush() except Exception as e: traceback.print_exc() data.status = MaterialTaskStatus.FAILED session.flush() return Response() class CalculateQuery(BaseModel): year: str = Field(title='年份') month: str = Field(title='月份') @data.route('/calculate', methods=['GET']) @siwa.doc(tags=[""], summary='触发计算任务', query=CalculateQuery) def start_calculate(query: CalculateQuery): calculate(year=query.year, month=query.month) collect(year=query.year, month=query.month) return Response()