import datetime import decimal from decimal import Decimal from sqlalchemy import func from calculators import Helper from commons.models.price_publish import PricePublish from commons.models.price_result import PriceResult from commons.models.material import Material class Collector: def __init__(self, year, month, force=True): self.year = year self.month = month self.force = True # todo-2 已发布的价格不在覆盖计算 self.digit_map = {} # 获取所有材料信息 self.materials = Material.list(type=1) self.material_codes = [m.code for m in self.materials if m.code] # 缓存材料税率信息 self.tax_map = {m.code: m.tax for m in self.materials if m.code} self.unit_map = {m.code: m.unit for m in self.materials if m.code} self.name_map = {m.code: m.name for m in self.materials if m.code} self.spec_map = {m.code: m.spec for m in self.materials if m.code} self.round_bit_map = {m.code: m.round_bit for m in self.materials if m.code} def get_avg(self): query = PricePublish.get_query(material_id_in=self.material_codes) query = PricePublish.query_previous_month(query, start_date=datetime.date(self.year, self.month, 1), count=6) query = query.filter(PricePublish.type == '1') query = query.with_entities( PricePublish.material_id, func.avg(PricePublish.price), func.avg(PricePublish.price_fuzhou), func.avg(PricePublish.price_xiamen), func.avg(PricePublish.price_putian), func.avg(PricePublish.price_sanming), func.avg(PricePublish.price_quanzhou), func.avg(PricePublish.price_zhangzhou), func.avg(PricePublish.price_nanpin), func.avg(PricePublish.price_longyan), func.avg(PricePublish.price_ningde), func.avg(PricePublish.price_pintan), func.avg(PricePublish.price_zhangzhoukfq), ) query = query.filter(PricePublish.price != 0) query = query.group_by( PricePublish.material_id, ) data = query.all() for item in data: material_id, price, price_fuzhou, price_xiamen, price_putian, price_sanming, price_quanzhou, \ price_zhangzhou, price_nanpin, price_longyan, price_ningde, price_pintan, price_zhangzhoukfq = item display_digit = self.digit_map.get(material_id, 1) round_bit = self.round_bit_map.get(material_id, 2) q_ext = Decimal('0.' + '0' * round_bit) PricePublish( year=self.year, month=self.month, material_id=material_id, name=self.name_map.get(material_id, ''), spec=self.spec_map.get(material_id, ''), price=price.quantize(q_ext, decimal.ROUND_HALF_UP) if price else 0, price_fuzhou=price_fuzhou.quantize(q_ext, decimal.ROUND_HALF_UP) if price_fuzhou else 0, price_xiamen=price_xiamen.quantize(q_ext, decimal.ROUND_HALF_UP) if price_xiamen else 0, price_putian=price_putian.quantize(q_ext, decimal.ROUND_HALF_UP) if price_putian else 0, price_sanming=price_sanming.quantize(q_ext, decimal.ROUND_HALF_UP) if price_sanming else 0, price_quanzhou=price_quanzhou.quantize(q_ext, decimal.ROUND_HALF_UP) if price_quanzhou else 0, price_zhangzhou=price_zhangzhou.quantize(q_ext, decimal.ROUND_HALF_UP) if price_zhangzhou else 0, price_nanpin=price_nanpin.quantize(q_ext, decimal.ROUND_HALF_UP) if price_nanpin else 0, price_longyan=price_longyan.quantize(q_ext, decimal.ROUND_HALF_UP) if price_longyan else 0, price_ningde=price_ningde.quantize(q_ext, decimal.ROUND_HALF_UP) if price_ningde else 0, price_pintan=price_pintan.quantize(q_ext, decimal.ROUND_HALF_UP) if price_pintan else 0, price_zhangzhoukfq=price_zhangzhoukfq.quantize(q_ext, decimal.ROUND_HALF_UP) if price_zhangzhoukfq else 0, tax=self.tax_map.get(material_id, 0), # 从材料表获取税率 type=2, unit=self.unit_map.get(material_id, ''), display_digit=display_digit, ).upsert() def get_from_result(self): # 获取当月趋势表数据 query = PriceResult.get_query(self.year, self.month, material_id_in=self.material_codes) data = query.all() self.digit_map = {i.material_id: i.display_digit for i in data} # 获取上月发布价数据 previous = PricePublish.get_query(*Helper.get_last_month(self.year, self.month), type=1) previous_prices_map = {i.material_id: i for i in previous} for item in data: fluctuating = item.fluctuating_recommend previous_prices = previous_prices_map.get(item.material_id, PricePublish()) result = { 'year': self.year, 'month': self.month, 'material_id': item.material_id, 'name': item.name, 'spec': item.spec, 'price': item.price_recommend, 'price_fuzhou': item.price_recommend, 'price_xiamen': previous_prices.price_xiamen + fluctuating if previous_prices.price_xiamen else item.price_recommend, 'price_putian': previous_prices.price_putian + fluctuating if previous_prices.price_putian else item.price_recommend, 'price_sanming': previous_prices.price_sanming + fluctuating if previous_prices.price_sanming else item.price_recommend, 'price_quanzhou': previous_prices.price_quanzhou + fluctuating if previous_prices.price_quanzhou else item.price_recommend, 'price_zhangzhou': previous_prices.price_zhangzhou + fluctuating if previous_prices.price_zhangzhou else item.price_recommend, 'price_nanpin': previous_prices.price_nanpin + fluctuating if previous_prices.price_nanpin else item.price_recommend, 'price_longyan': previous_prices.price_longyan + fluctuating if previous_prices.price_longyan else item.price_recommend, 'price_ningde': previous_prices.price_ningde + fluctuating if previous_prices.price_ningde else item.price_recommend, 'price_pintan': previous_prices.price_pintan + fluctuating if previous_prices.price_pintan else item.price_recommend, 'price_zhangzhoukfq': previous_prices.price_zhangzhoukfq + fluctuating if previous_prices.price_zhangzhoukfq else item.price_recommend, 'tax': self.tax_map.get(item.material_id), # 从材料表获取税率 'type': 1, 'unit': item.unit, 'display_digit': item.display_digit, } PricePublish(**result).upsert() def run(self): # 当月价 # self.get_from_survey() self.get_from_result() # 近半年平均价 self.get_avg() if __name__ == '__main__': from core.factory import ClientApp with ClientApp().app_context(): collector = Collector(2023, 11) collector.run()