from lib2to3.pgen2 import token from time import sleep from typing import Dict, List, Tuple from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import create_engine, Table, Column, engine from sqlalchemy.orm.session import Session from sqlalchemy.orm import query, sessionmaker from sqlalchemy.types import BigInteger, Integer, String, Numeric, Boolean, DateTime from sqlalchemy.sql.sqltypes import DateTime from sqlalchemy import desc, cast, case, func import kassa.schemas as schemas import kassa.models as models from kassa.databases import SessionLocal from kassa.atol import Atol import datetime Base = declarative_base() metadata = Base.metadata class PaymentDetails(Base): __tablename__ = "payment_details" id = Column(Integer, primary_key=True, autoincrement=True) external_id = Column(String(36), nullable=False, index=True) is_refund = Column(Boolean, nullable=False, index=True) is_taken = Column(Boolean, nullable=False, index=True) added_date = Column(DateTime, nullable=True, default=datetime.datetime.now()) # class PaymentDetailsBaseModel(BaseModel): class DBEngine: def __init__(self, server, dbname, user, password): self.dbname = dbname self.user = user self.password = password self.server = server self.get_mssql_engine() def get_mssql_engine(self): query = f'mssql+pyodbc://{self.user}:{self.password}@{self.server}/{self.dbname}?driver=SQL+Server' self.engine = create_engine( query, connect_args={'check_same_thread': False}) def get_table(self, tablename: str) -> Table: self.metadata = Base.metadata self.metadata.reflect(bind=self.engine) return Table(tablename, self.metadata, schema=self.dbname+'.dbo', autoload=True, autoload_with=self.engine) def get_columns(self, tablename: str, columns: List[str] = None, labels: Dict = None) -> List[Column]: table = self.get_table(tablename) if not labels: labels = {} if columns: lst = [(table.c.get(column)).label(labels.get(column, column) or column) for column in columns] return lst return table.c.items() def get_db(self): db = Session(autocommit=False, autoflush=False, bind=self.engine) try: yield db finally: db.close() def get_payment_details(db: Session, engine_class: DBEngine): pd = engine_class.get_table('payment_details') external_id, is_refund, is_taken = engine_class.get_columns( 'payment_details', ['external_id', 'is_refund', 'is_taken']) query = db.query(pd).filter(is_taken == 0).with_entities( external_id, is_refund, is_taken).distinct().order_by(external_id, desc(is_refund)) return query def update_payment_details(db: Session, engine_class: DBEngine, payment_details: Dict): external_id_value = payment_details.get("external_id") is_refund_value = payment_details.get("is_refund") is_taken_value = payment_details.get("is_taken") pd = engine_class.get_table('payment_details') external_id, is_refund, is_taken = engine_class.get_columns( 'payment_details', ['external_id', 'is_refund', 'is_taken']) query = db.query(pd).filter(external_id == external_id_value, is_taken == is_taken_value, is_refund == is_refund_value) query = query.update({'is_taken': True}, synchronize_session='fetch') db.commit() return query def get_payment(db: Session, engine_class: DBEngine, payment_details: Tuple): external_id_value, is_refund_value, is_taken_value = payment_details pd = engine_class.get_table('payment_details') external_id, is_refund, is_taken = engine_class.get_columns( 'payment_details', ['external_id', 'is_refund', 'is_taken']) query = db.query(pd).filter(external_id == external_id_value, is_taken == is_taken_value, is_refund == is_refund_value) return query """ #Место сбора запроса на items с заполнением по справочникам """ def get_payment_details_items(db: Session, engine_class: DBEngine, dict_engine: DBEngine, payment_details: Tuple): payment = get_payment( db, engine_class, payment_details).subquery('payment') services = dict_engine.get_table('services') units = dict_engine.get_table('units') payment_method = dict_engine.get_table('payment_method') payment_object = dict_engine.get_table('payment_object') vats = dict_engine.get_table('payment_object_vat_type') agents = dict_engine.get_table('agents') company = dict_engine.get_table('providers') query = db.query( payment.c.external_id.label('external_id'), services.c.sname.label('name'), units.c["sname"].label('measurement_unit'), payment.c.price.label('price'), payment.c.quantity.label('quantity'), payment.c.date_operation.label('date_operation'), payment.c.phone.label('phone'), company.c.inn, payment.c.summa.label('sum'), case( (payment.c.payment_method.in_([5, 6, 7]), 3), (payment.c.payment_method.in_([1, 2, 3]), 2), (payment.c.payment_method == 4, 1) ).label("payment_group"), payment_method.c["name"].label('payment_method'), payment_object.c["name"].label('payment_object'), vats.c["name"].label("vat"), payment.c.agent_type.label('agent_info'), case((payment.c.agent_type != None, payment.c.supplier_info), else_=None).label('supplier_info'), agents.c["inn"].label("supplier_inn"), agents.c["name"].label("supplier_name") ).select_from(payment)\ .join(services, services.c["id_service"] == payment.c.id_item)\ .join(units, services.c["id_unit"] == units.c["id_unit"])\ .join(payment_method, payment.c.payment_method == payment_method.c["id"])\ .join(payment_object, payment.c.payment_object == payment_object.c["id"])\ .join(vats, payment.c.vat == vats.c["id"])\ .join(company, payment.c.id_company == company.c["id_provider"])\ .join(agents, payment.c.supplier_info == agents.c["id_agent"]) return query """ #Функция для заполнения payments """ def get_payments(data: List): d = {} for row in data: type_id = row.get("payment_group") cur_sum = row.get("sum") d[type_id] = cur_sum + d.get(type_id, 0.0) res = [{'type': i[0], 'sum': i[1]} for i in d.items()] return res def get_token(db: Session, db_dict: DBEngine): token_dict = db_dict.get_table("vAtolToken") rows = db.query(token_dict).first() return dict(rows).get("token") """ #Функция для заполнения total """ def get_total(data: List): total = 0.0 for i in data: total += i.get("sum") return total """ #Функция для заполнения items в dicts """ def items_convert(data: query): items = [] external_id = 0 total = 0.0 d = {} payments = [] phone = "" inn = 0 for row in data.all(): item = dict(row) external_id = item.get("external_id") item["vat"] = get_vat(item.get("vat")) agent_info = item.pop("agent_info", None) total += item.get("sum") type_id = item.get("payment_group") cur_sum = item.get("sum") phone = item.get("phone", '+79111111111') inn = int(item.get("inn")) d[type_id] = cur_sum + d.get(type_id, 0.0) if agent_info: item["agent_info"] = {"type": agent_info} item["supplier_info"] = { "inn": item.pop("supplier_inn", None), "name": item.pop("supplier_name", None) } else: for i in ["supplier_info", 'supplier_inn', 'supplier_name']: del item[i] items.append(item) payments = [{'type': i[0], 'sum': i[1]} for i in d.items()] client = get_client(phone) company = get_company(inn) return external_id, items, payments, client, company, total def get_company(inn: int): company = {} company['inn'] = inn company['email'] = 'ocnkp@jkhsakha.ru' company['payment_address'] = 'http://jkhsakha.ru/' company['sno'] = 'osn' return company def get_receipt(data: query): receipt = {} external_id = 0 external_id, items, payments, client, company, total = items_convert( data) receipt['client'] = client receipt["payments"] = payments receipt["company"] = company receipt['items'] = items receipt['total'] = total return external_id, receipt def get_sell(data: query): sell = {} current_datetime = datetime.datetime.now() external_id, sell['receipt'] = get_receipt(data) sell["timestamp"] = current_datetime.strftime( '%d.%m.%Y %H:%M:%S') sell["external_id"] = external_id sell["service"] = { 'callback_url': f"http://api.jkhsakha.ru/kassa/{external_id}" } return sell def get_client(phone: str): return { 'phone': phone } def get_vat(vat: str): return {'type': vat} def add_atol(db: Session, engine: DBEngine, atol: Dict): aa = engine.get_table("Atol") atol.pop("is_taken", None) a = aa.insert().values(atol) db.execute(a) db.commit() def clear_dict(d): if d is None: return None elif isinstance(d, list): return list(filter(lambda x: x is not None, map(clear_dict, d))) elif not isinstance(d, dict): return d else: r = dict( filter(lambda x: x[1] is not None, map(lambda x: (x[0], clear_dict(x[1])), d.items()))) if not bool(r): return None return r def add_doc(sell: schemas.Sell, is_refund, token): atol_model = Atol(token) session = SessionLocal() a = atol_model.set_sell(clear_dict(sell.dict()), is_refund) check = models.Atol(schemas.AtolSell(**a)) check.external_id = sell.external_id session.add(check) session.commit() def get_main_payment(): server = 'Sanctuary' user = 'sa' password = '159357' dbname = 'fz54_details' db_dicts_name = 'fz54' db = DBEngine(server, dbname, user, password) db_dicts = DBEngine(server, db_dicts_name, user, password) session = Session(autocommit=False, autoflush=False, bind=db.engine) payment = get_payment_details(session, db).first() if payment: payments = get_payment_details_items(session, db, db_dicts, payment) body = get_sell(payments) sell = schemas.Sell(**body) payment = dict(payment) atol = { "external_id": payment.get("external_id"), "is_refund": payment.get("is_refund"), "is_taken": payment.get("is_taken"), "body": str(body) } add_atol(session, db, atol) update_payment_details(session, db, payment) return sell, payment.get("is_refund"), get_token(session, db_dicts) return False, False, False def main(): while True: payment, is_refund, token = get_main_payment() if payment == False: break sleep(1) add_doc(sell=payment, is_refund=is_refund, token = token) break