from time import sleep from typing import Dict, List, Tuple from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import create_engine, Table, Column, engine from sqlalchemy.orm.session import Session from sqlalchemy.orm import query, sessionmaker from sqlalchemy.types import BigInteger, Integer, String, Numeric, Boolean, DateTime from sqlalchemy.sql.sqltypes import DateTime from sqlalchemy import desc, cast, case, func import schemas import models import datetime Base = declarative_base() metadata = Base.metadata class PaymentDetails(Base): __tablename__ = "payment_details" id = Column(Integer, primary_key=True, autoincrement=True) external_id = Column(String(36), nullable=False, index=True) is_refund = Column(Boolean, nullable=False, index=True) is_taken = Column(Boolean, nullable=False, index=True) added_date = Column(DateTime, nullable=True, default=datetime.datetime.now()) # class PaymentDetailsBaseModel(BaseModel): class DBEngine: def __init__(self, server, dbname, user, password): self.dbname = dbname self.user = user self.password = password self.server = server self.get_mssql_engine() def get_mssql_engine(self): query = f'mssql+pyodbc://{self.user}:{self.password}@{self.server}/{self.dbname}?driver=SQL+Server' self.engine = create_engine( query, connect_args={'check_same_thread': False}) def get_table(self, tablename: str) -> Table: self.metadata = Base.metadata self.metadata.reflect(bind=self.engine) return Table(tablename, self.metadata, schema=self.dbname+'.dbo', autoload=True, autoload_with=self.engine) def get_columns(self, tablename: str, columns: List[str] = None, labels: Dict = None) -> List[Column]: table = self.get_table(tablename) if not labels: labels = {} if columns: lst = [(table.c.get(column)).label(labels.get(column, column) or column) for column in columns] return lst return table.c.items() def get_db(self): db = Session(autocommit=False, autoflush=False, bind=self.engine) try: yield db finally: db.close() def get_payment_details(db: Session, engine_class: DBEngine): pd = engine_class.get_table('payment_details') external_id, is_refund, is_taken = engine_class.get_columns( 'payment_details', ['external_id', 'is_refund', 'is_taken']) query = db.query(pd).filter(is_taken == 0).with_entities( external_id, is_refund, is_taken).distinct().order_by(external_id, desc(is_refund)) return query def update_payment_details(db: Session, engine_class: DBEngine, payment_details: Dict): external_id_value = payment_details.get("external_id") is_refund_value = payment_details.get("is_refund") is_taken_value = payment_details.get("is_taken") pd = engine_class.get_table('payment_details') external_id, is_refund, is_taken = engine_class.get_columns( 'payment_details', ['external_id', 'is_refund', 'is_taken']) query = db.query(pd).filter(external_id == external_id_value, is_taken == is_taken_value, is_refund == is_refund_value) query = query.update({'is_taken': True}, synchronize_session='fetch') db.commit() return query def get_payment(db: Session, engine_class: DBEngine, payment_details: Tuple): external_id_value, is_refund_value, is_taken_value = payment_details pd = engine_class.get_table('payment_details') external_id, is_refund, is_taken = engine_class.get_columns( 'payment_details', ['external_id', 'is_refund', 'is_taken']) query = db.query(pd).filter(external_id == external_id_value, is_taken == is_taken_value, is_refund == is_refund_value) return query """ #Место сбора запроса на items с заполнением по справочникам """ def get_payment_details_items(db: Session, engine_class: DBEngine, dict_engine: DBEngine, payment_details: Tuple): payment = get_payment( db, engine_class, payment_details).subquery('payment') services = dict_engine.get_table('services') units = dict_engine.get_table('units') payment_method = dict_engine.get_table('payment_method') payment_object = dict_engine.get_table('payment_object') vats = dict_engine.get_table('payment_object_vat_type') agents = dict_engine.get_table('agents') company = dict_engine.get_table('providers') query = db.query( payment.c.external_id.label('external_id'), services.c.sname.label('name'), units.c["sname"].label('measurement_unit'), payment.c.price.label('price'), payment.c.quantity.label('quantity'), payment.c.date_operation.label('date_operation'), payment.c.phone.label('phone'), company.c.inn, payment.c.summa.label('sum'), case( (payment.c.payment_method.in_([5, 6, 7]), 3), (payment.c.payment_method.in_([1, 2, 3]), 2), (payment.c.payment_method == 4, 1) ).label("payment_group"), payment_method.c["name"].label('payment_method'), payment_object.c["name"].label('payment_object'), vats.c["name"].label("vat"), payment.c.agent_type.label('agent_info'), case((payment.c.agent_type != None, payment.c.supplier_info), else_=None).label('supplier_info'), agents.c["inn"].label("supplier_inn"), agents.c["name"].label("supplier_name") ).select_from(payment)\ .join(services, services.c["id_service"] == payment.c.id_item)\ .join(units, services.c["id_unit"] == units.c["id_unit"])\ .join(payment_method, payment.c.payment_method == payment_method.c["id"])\ .join(payment_object, payment.c.payment_object == payment_object.c["id"])\ .join(vats, payment.c.vat == vats.c["id"])\ .join(company, payment.c.id_company == company.c["id_provider"])\ .join(agents, payment.c.supplier_info == agents.c["id_agent"]) return query """ #Функция для заполнения payments """ def get_payments(data: List): d = {} for row in data: type_id = row.get("payment_group") cur_sum = row.get("sum") d[type_id] = cur_sum + d.get(type_id, 0.0) res = [{'type': i[0], 'sum': i[1]} for i in d.items()] return res def get_token(db: Session, db_dict: DBEngine): token_dict = db_dict.get_table("vAtolToken") return token_dict.get("token") """ #Функция для заполнения total """ def get_total(data: List): total = 0.0 for i in data: total += i.get("sum") return total """ #Функция для заполнения items в dicts """ def items_convert(data: query): items = [] external_id = 0 total = 0.0 d = {} payments = [] phone = "" inn = 0 for row in data.all(): item = dict(row) external_id = item.get("external_id") item["vat"] = get_vat(item.get("vat")) agent_info = item.pop("agent_info", None) total += item.get("sum") type_id = item.get("payment_group") cur_sum = item.get("sum") phone = item.get("phone", '+79111111111') inn = int(item.get("inn")) d[type_id] = cur_sum + d.get(type_id, 0.0) if agent_info: item["agent_info"] = {"type": agent_info} item["supplier_info"] = { "inn": item.pop("supplier_inn", None), "name": item.pop("supplier_name", None) } else: for i in ["supplier_info", 'supplier_inn', 'supplier_name']: del item[i] items.append(item) payments = [{'type': i[0], 'sum': i[1]} for i in d.items()] client = get_client(phone) company = get_company(inn) return external_id, items, payments, client, company, total def get_company(inn: int): company = {} company['inn'] = inn company['email'] = 'ocnkp@jkhsakha.ru' company['payment_address'] = 'http://jkhsakha.ru/' company['sno'] = 'osn' return company def get_receipt(data: query): receipt = {} external_id = 0 external_id, items, payments, client, company, total = items_convert( data) receipt['client'] = client receipt["payments"] = payments receipt["company"] = company receipt['items'] = items receipt['total'] = total return external_id, receipt def get_sell(data: query): sell = {} current_datetime = datetime.datetime.now() external_id, sell['receipt'] = get_receipt(data) sell["timestamp"] = current_datetime.strftime( '%d.%m.%Y %H:%M:%S') sell["external_id"] = external_id sell["service"] = { 'callback_url': f"http://api.jkhsakha.ru/kassa/{external_id}" } return sell def get_client(phone: str): return { 'phone': phone } def get_vat(vat: str): return {'type': vat} def add_atol(db: Session, engine: DBEngine, atol: Dict): aa = engine.get_table("Atol") a = aa.insert().values(atol) db.execute(a) db.commit() def add_doc(sell: schemas.Sell, is_refund): from databases import SessionLocal from atol import Atol atol_model = Atol session = SessionLocal() a = Atol.set_sell(atol_model, sell, is_refund) check = models.Atol(**a) session.add(check) session.commit() session.close() def get_payment(): server = 'Sanctuary' user = 'sa' password = '159357' dbname = 'fz54_details' db_dicts_name = 'fz54' db = DBEngine(server, dbname, user, password) db_dicts = DBEngine(server, db_dicts_name, user, password) session = Session(autocommit=False, autoflush=False, bind=db.engine) payment = get_payment_details(session, db).first() if payment: payments = get_payment_details_items(session, db, db_dicts, payment) body = get_sell(payments) sell = schemas.Sell(**body) payment = dict(payment) atol = { "external_id": payment.get("external_id"), "is_refund": payment.get("is_refund"), "is_taken": payment.get("is_taken"), "body": str(body) } add_atol(session, db, atol) update_payment_details(session, db, payment) return sell, payment.get("is_refund") return False, False if __name__ == "__main__": while True: payment, is_refund = get_payment() if payment == False: break sleep(1) add_doc(sell=payment, is_refund=is_refund) break