import datetime from sqlalchemy import func from commons.models.fujian_survey import FujianSurvey from commons.models.price_publish import PricePublish from commons.models.price_result import PriceResult from commons.models.material import Material class Collector: def __init__(self, year, month, force=True): self.year = year self.month = month self.force = True # todo-2 已发布的价格不在覆盖计算 self.digit_map = {} # 获取所有材料信息 self.materials = Material.list(type=1) self.material_codes = [m.code for m in self.materials if m.code] # 缓存材料税率信息 self.tax_map = {m.code: m.tax for m in self.materials if m.code} def get_avg(self): query = PricePublish.get_query(material_id_in=self.material_codes) query = PricePublish.query_previous_month(query, start_date=datetime.date(self.year, self.month, 1), count=6) query = query.with_entities( PricePublish.material_id, PricePublish.name, PricePublish.spec, func.avg(PricePublish.price), func.avg(PricePublish.price_fuzhou), func.avg(PricePublish.price_xiamen), func.avg(PricePublish.price_putian), func.avg(PricePublish.price_sanming), func.avg(PricePublish.price_quanzhou), func.avg(PricePublish.price_zhangzhou), func.avg(PricePublish.price_nanpin), func.avg(PricePublish.price_longyan), func.avg(PricePublish.price_ningde), func.avg(PricePublish.price_pintan), func.avg(PricePublish.price_zhangzhoukfq), PricePublish.tax, PricePublish.unit, ) query = query.filter(PricePublish.price != 0) query = query.group_by( PricePublish.material_id, PricePublish.name, PricePublish.spec, PricePublish.tax, PricePublish.unit, ) data = query.all() for item in data: material_id, name, spec, price, price_fuzhou, price_xiamen, price_putian, price_sanming, price_quanzhou, \ price_zhangzhou, price_nanpin, price_longyan, price_ningde, price_pintan, price_zhangzhoukfq, tax, unit, = item display_digit = self.digit_map.get(material_id, 1) PricePublish( year=self.year, month=self.month, material_id=material_id, name=name, spec=spec, price=round(price, 2), price_fuzhou=round(price_fuzhou or 0, 2), price_xiamen=round(price_xiamen or 0, 2), price_putian=round(price_putian or 0, 2), price_sanming=round(price_sanming or 0, 2), price_quanzhou=round(price_quanzhou or 0, 2), price_zhangzhou=round(price_zhangzhou or 0, 2), price_nanpin=round(price_nanpin or 0, 2), price_longyan=round(price_longyan or 0, 2), price_ningde=round(price_ningde or 0, 2), price_pintan=round(price_pintan or 0, 2), price_zhangzhoukfq=round(price_zhangzhoukfq or 0, 2), tax=self.tax_map.get(material_id, tax), # 从材料表获取税率 type=2, unit=unit, display_digit=display_digit, ).upsert() def get_from_survey(self): query = FujianSurvey.get_query(self.year, self.month, material_id_in=self.material_codes) data = query.all() for item in data: PricePublish( year=self.year, month=self.month, material_id=item.material_id, name=item.name, spec=item.spec, price=item.price, price_fuzhou=item.price, price_xiamen=item.price, price_putian=item.price, price_sanming=item.price, price_quanzhou=item.price, price_zhangzhou=item.price, price_nanpin=item.price, price_longyan=item.price, price_ningde=item.price, price_pintan=item.price, price_zhangzhoukfq=item.price, tax=self.tax_map.get(item.material_id, item.tax), # 从材料表获取税率 type=1, unit=item.unit ).upsert() def get_from_result(self): query = PriceResult.get_query(self.year, self.month, material_id_in=self.material_codes) data = query.all() self.digit_map = {i.material_id:i.display_digit for i in data} for item in data: # 从材料表获取税率 PricePublish( year=self.year, month=self.month, material_id=item.material_id, name=item.name, spec=item.spec, price=item.price_recommend, price_fuzhou=item.price_recommend, price_xiamen=item.price_recommend, price_putian=item.price_recommend, price_sanming=item.price_recommend, price_quanzhou=item.price_recommend, price_zhangzhou=item.price_recommend, price_nanpin=item.price_recommend, price_longyan=item.price_recommend, price_ningde=item.price_recommend, price_pintan=item.price_recommend, price_zhangzhoukfq=item.price_recommend, tax=self.tax_map.get(item.material_id), type=1, unit=item.unit, display_digit=item.display_digit, ).upsert() def run(self): # 当月价 # self.get_from_survey() self.get_from_result() # 近半年平均价 self.get_avg() if __name__ == '__main__': from core.factory import ClientApp with ClientApp().app_context(): collector = Collector(2023, 11) collector.run()