Files
material-api/web/collectors/__init__.py

135 lines
6.3 KiB
Python

import datetime
from sqlalchemy import func
from calculators import Helper
from commons.models.price_publish import PricePublish
from commons.models.price_result import PriceResult
from commons.models.material import Material
class Collector:
def __init__(self, year, month, force=True):
self.year = year
self.month = month
self.force = True # todo-2 已发布的价格不在覆盖计算
self.digit_map = {}
# 获取所有材料信息
self.materials = Material.list(type=1)
self.material_codes = [m.code for m in self.materials if m.code]
# 缓存材料税率信息
self.tax_map = {m.code: m.tax for m in self.materials if m.code}
def get_avg(self):
query = PricePublish.get_query(material_id_in=self.material_codes)
query = PricePublish.query_previous_month(query, start_date=datetime.date(self.year, self.month, 1), count=6)
query = query.filter(PricePublish.type == '1')
query = query.with_entities(
PricePublish.material_id,
PricePublish.name,
PricePublish.spec,
func.avg(PricePublish.price),
func.avg(PricePublish.price_fuzhou),
func.avg(PricePublish.price_xiamen),
func.avg(PricePublish.price_putian),
func.avg(PricePublish.price_sanming),
func.avg(PricePublish.price_quanzhou),
func.avg(PricePublish.price_zhangzhou),
func.avg(PricePublish.price_nanpin),
func.avg(PricePublish.price_longyan),
func.avg(PricePublish.price_ningde),
func.avg(PricePublish.price_pintan),
func.avg(PricePublish.price_zhangzhoukfq),
PricePublish.tax,
PricePublish.unit,
)
query = query.filter(PricePublish.price != 0)
query = query.group_by(
PricePublish.material_id,
PricePublish.name,
PricePublish.spec,
PricePublish.tax,
PricePublish.unit,
)
data = query.all()
for item in data:
material_id, name, spec, price, price_fuzhou, price_xiamen, price_putian, price_sanming, price_quanzhou, \
price_zhangzhou, price_nanpin, price_longyan, price_ningde, price_pintan, price_zhangzhoukfq, tax, unit, = item
display_digit = self.digit_map.get(material_id, 1)
PricePublish(
year=self.year,
month=self.month,
material_id=material_id,
name=name,
spec=spec,
price=round(price, 2),
price_fuzhou=round(price_fuzhou or 0, 2),
price_xiamen=round(price_xiamen or 0, 2),
price_putian=round(price_putian or 0, 2),
price_sanming=round(price_sanming or 0, 2),
price_quanzhou=round(price_quanzhou or 0, 2),
price_zhangzhou=round(price_zhangzhou or 0, 2),
price_nanpin=round(price_nanpin or 0, 2),
price_longyan=round(price_longyan or 0, 2),
price_ningde=round(price_ningde or 0, 2),
price_pintan=round(price_pintan or 0, 2),
price_zhangzhoukfq=round(price_zhangzhoukfq or 0, 2),
tax=self.tax_map.get(material_id, tax), # 从材料表获取税率
type=2,
unit=unit,
display_digit=display_digit,
).upsert()
def get_from_result(self):
# 获取当月趋势表数据
query = PriceResult.get_query(self.year, self.month, material_id_in=self.material_codes)
data = query.all()
self.digit_map = {i.material_id: i.display_digit for i in data}
# 获取上月发布价数据
previous = PricePublish.get_query(*Helper.get_last_month(self.year, self.month), type=1)
previous_prices_map = {i.material_id: i for i in previous}
for item in data:
fluctuating = item.fluctuating_recommend
previous_prices = previous_prices_map.get(item.material_id, PricePublish())
result = {
'year': self.year,
'month': self.month,
'material_id': item.material_id,
'name': item.name,
'spec': item.spec,
'price': item.price_recommend,
'price_fuzhou': item.price_recommend,
'price_xiamen': previous_prices.price_xiamen + fluctuating if previous_prices.price_xiamen else item.price_recommend,
'price_putian': previous_prices.price_putian + fluctuating if previous_prices.price_putian else item.price_recommend,
'price_sanming': previous_prices.price_sanming + fluctuating if previous_prices.price_sanming else item.price_recommend,
'price_quanzhou': previous_prices.price_quanzhou + fluctuating if previous_prices.price_quanzhou else item.price_recommend,
'price_zhangzhou': previous_prices.price_zhangzhou + fluctuating if previous_prices.price_zhangzhou else item.price_recommend,
'price_nanpin': previous_prices.price_nanpin + fluctuating if previous_prices.price_nanpin else item.price_recommend,
'price_longyan': previous_prices.price_longyan + fluctuating if previous_prices.price_longyan else item.price_recommend,
'price_ningde': previous_prices.price_ningde + fluctuating if previous_prices.price_ningde else item.price_recommend,
'price_pintan': previous_prices.price_pintan + fluctuating if previous_prices.price_pintan else item.price_recommend,
'price_zhangzhoukfq': previous_prices.price_zhangzhoukfq + fluctuating if previous_prices.price_zhangzhoukfq else item.price_recommend,
'tax': self.tax_map.get(item.material_id), # 从材料表获取税率
'type': 1,
'unit': item.unit,
'display_digit': item.display_digit,
}
PricePublish(**result).upsert()
def run(self):
# 当月价
# self.get_from_survey()
self.get_from_result()
# 近半年平均价
self.get_avg()
if __name__ == '__main__':
from core.factory import ClientApp
with ClientApp().app_context():
collector = Collector(2023, 11)
collector.run()