from sqlalchemy import func from calculators import Calculator from commons.models.asphalt_imported import AsphaltImported from commons.models.fujian_survey import FujianSurvey from commons.models.price_result import PriceResult class AsphaltImportedCalculator(Calculator): name = "进口沥青" material_id = "06.60.60.00" unit = "t" spec = "" def __init__(self, year, month): self.year = year self.month = month def _get_network_price(self): items = AsphaltImported.get_items(year=self.year, month=self.month, name_in=('新加坡—华东',)) prices = {name: float(price or 0) for name, price in items} price = prices.get('新加坡—华东', 0) previous_price = int(getattr(self.previous_prices, 'price_network', 0)) fluctuating = price - previous_price return price, fluctuating def _get_survey_price(self): query = FujianSurvey.get_query(self.year, self.month, name='石油沥青', spec='进口 (路面用)') query = query.with_entities(func.avg(FujianSurvey.price)) result = query.all() if not result[0][0]: return 0, 0 price = int(result[0][0]) fluctuating = price - int(getattr(self.previous_prices, 'price_survey', price)) return price, fluctuating def _get_recommend_price(self): self._fluctuatings = [self.fluctuating_network, self.fluctuating_survey] return super()._get_recommend_price() def save(self): result = self.result() PriceResult(**result).upsert() if __name__ == '__main__': from core.factory import ClientApp with ClientApp().app_context(): calculator = AsphaltImportedCalculator(year=2023, month=10) result = calculator.run() calculator.save() print(result)