Files
service-app/kassa/new.py
2022-07-14 16:21:44 +09:00

389 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from calendar import month
from time import sleep
from typing import Dict, List, Tuple
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine, Table, Column, engine
from sqlalchemy.orm.session import Session
from sqlalchemy.orm import query, sessionmaker
from sqlalchemy.types import BigInteger, Integer, String, Numeric, Boolean, DateTime
from sqlalchemy.sql.sqltypes import DateTime
from sqlalchemy import desc, cast, case, func
import kassa.schemas as schemas
import kassa.models as models
from kassa.cruds import doc
from kassa.database import SessionLocal
from kassa.atol import Atol
import datetime
Base = declarative_base()
metadata = Base.metadata
ignore_list = ["5A1A84DD-34AC-4D16-A91C-3DB9782F4828"]
class PaymentDetails(Base):
__tablename__ = "payment_details"
id = Column(Integer, primary_key=True, autoincrement=True)
external_id = Column(String(36), nullable=False, index=True)
is_refund = Column(Boolean, nullable=False, index=True)
is_taken = Column(Boolean, nullable=False, index=True)
added_date = Column(DateTime, nullable=True,
default=datetime.datetime.now())
# class PaymentDetailsBaseModel(BaseModel):
class DBEngine:
def __init__(self, server, dbname, user, password):
self.dbname = dbname
self.user = user
self.password = password
self.server = server
self.get_mssql_engine()
def get_mssql_engine(self):
query = f'mssql+pyodbc://{self.user}:{self.password}@{self.server}/{self.dbname}?driver=SQL+Server'
self.engine = create_engine(
query, connect_args={'check_same_thread': False})
def get_table(self, tablename: str) -> Table:
self.metadata = Base.metadata
self.metadata.reflect(bind=self.engine)
return Table(tablename, self.metadata, schema=self.dbname+'.dbo', autoload=True, autoload_with=self.engine)
def get_columns(self, tablename: str, columns: List[str] = None, labels: Dict = None) -> List[Column]:
table = self.get_table(tablename)
if not labels:
labels = {}
if columns:
lst = [(table.c.get(column)).label(labels.get(column, column) or column)
for column in columns]
return lst
return table.c.items()
def get_db(self):
db = Session(autocommit=False, autoflush=False, bind=self.engine)
try:
yield db
finally:
db.close()
def get_payment_details(db: Session, engine_class: DBEngine):
pd = engine_class.get_table('payment_details')
year, month, external_id, is_refund, is_taken = engine_class.get_columns(
'payment_details', ['year','month','external_id', 'is_refund', 'is_taken'])
query = db.query(pd).filter(is_taken == False).filter(external_id.notin_(ignore_list)).with_entities(
external_id, is_refund, is_taken).distinct().order_by(year, month,external_id, desc(is_refund))
return query
def update_payment_details(db: Session, engine_class: DBEngine, payment_details: Dict):
external_id_value = payment_details.get("external_id")
is_refund_value = payment_details.get("is_refund")
is_taken_value = payment_details.get("is_taken")
pd = engine_class.get_table('payment_details')
external_id, is_refund, is_taken = engine_class.get_columns(
'payment_details', ['external_id', 'is_refund', 'is_taken'])
query = db.query(pd).filter(external_id == external_id_value,
is_taken == is_taken_value, is_refund == is_refund_value)
query = query.update({'is_taken': True}, synchronize_session='fetch')
db.commit()
return query
def get_payment(db: Session, engine_class: DBEngine, payment_details: Tuple):
external_id_value, is_refund_value, is_taken_value = payment_details
pd = engine_class.get_table('payment_details')
external_id, is_refund, is_taken = engine_class.get_columns(
'payment_details', ['external_id', 'is_refund', 'is_taken'])
query = db.query(pd).filter(external_id == external_id_value,
is_taken == is_taken_value, is_refund == is_refund_value)
return query
"""
#Место сбора запроса на items с заполнением по справочникам
"""
def get_payment_details_items(db: Session, engine_class: DBEngine, dict_engine: DBEngine, payment_details: Tuple):
payment = get_payment(
db, engine_class, payment_details).subquery('payment')
services = dict_engine.get_table('vServices')
units = dict_engine.get_table('units')
payment_method = dict_engine.get_table('payment_method')
payment_object = dict_engine.get_table('payment_object')
vats = dict_engine.get_table('payment_object_vat_type')
agents = dict_engine.get_table('agents')
company = dict_engine.get_table('providers')
payment_type = dict_engine.get_table('payment_type')
query = db.query(
payment.c.external_id.label('external_id'),
services.c.sname.label('name'),
units.c["sname"].label('measurement_unit'),
payment.c.price.label('price'),
payment.c.quantity.label('quantity'),
payment.c.date_operation.label('date_operation'),
payment.c.phone.label('phone'),
company.c.inn,
payment.c.summa.label('sum'),
#case(
# (payment.c.payment_method.in_([5, 6, 7]), 3),
# (payment.c.payment_method.in_([1, 2, 3]), 2),
# (payment.c.payment_method == 4, 1)
#).label("payment_group"),
payment_type.c["name"].label("payment_group"),
payment_method.c["name"].label('payment_method'),
payment_object.c["name"].label('payment_object'),
vats.c["name"].label("vat"),
payment.c.agent_type.label('agent_info'),
case((payment.c.agent_type != None, payment.c.supplier_info),
else_=None).label('supplier_info'),
agents.c["inn"].label("supplier_inn"),
agents.c["name"].label("supplier_name")
).select_from(payment)\
.join(services, services.c["id_service"] == payment.c.id_item)\
.join(units, services.c["id_unit"] == units.c["id_unit"])\
.join(payment_method, payment.c.payment_method == payment_method.c["id"])\
.join(payment_object, payment.c.payment_object == payment_object.c["id"])\
.join(vats, payment.c.vat == vats.c["id"])\
.join(company, payment.c.id_company == company.c["id_provider"])\
.join(agents, payment.c.supplier_info == agents.c["id_agent"], isouter=True)\
.join(payment_type, payment.c.payments == payment_type.c["id"])
return query
"""
#Функция для заполнения payments
"""
def get_payments(data: List):
d = {}
for row in data:
type_id = row.get("payment_group")
cur_sum = row.get("sum")
d[type_id] = cur_sum + d.get(type_id, 0.0)
res = [{'type': i[0], 'sum': round(i[1],2)} for i in d.items()]
return res
def get_token(db: Session, db_dict: DBEngine):
token_dict = db_dict.get_table("vAtolToken")
rows = db.query(token_dict.c["token"].label("token")).select_from(token_dict)
return dict(rows.first()).get("token")
"""
#Функция для заполнения total
"""
def get_total(data: List):
total = 0.0
for i in data:
total += i.get("sum")
return total
"""
#Функция для заполнения items в dicts
"""
def items_convert(data: query):
items = []
external_id = 0
total = 0.0
d = {}
payments = []
phone = ""
inn = 0
for row in data.all():
item = dict(row)
external_id = item.get("external_id")
item["vat"] = get_vat(item.get("vat"))
agent_info = item.pop("agent_info", None)
total += item.get("sum")
type_id = item.get("payment_group")
cur_sum = item.get("sum")
phone = item.get("phone", '+79111111111')
inn = int(item.get("inn"))
d[type_id] = cur_sum + d.get(type_id, 0.0)
if agent_info:
item["agent_info"] = {"type": agent_info}
item["supplier_info"] = {
"inn": item.pop("supplier_inn", None),
"name": item.pop("supplier_name", None)
}
else:
for i in ["supplier_info", 'supplier_inn', 'supplier_name']:
del item[i]
items.append(item)
payments = [{'type': i[0], 'sum': round(i[1],2)} for i in d.items()]
client = get_client(phone)
company = get_company(inn)
return external_id, items, payments, client, company, round(total,2)
def get_company(inn: int):
company = {}
company['inn'] = inn
company['email'] = 'ocnkp@jkhsakha.ru'
company['payment_address'] = 'http://jkhsakha.ru/'
company['sno'] = 'osn'
return company
def get_receipt(data: query):
receipt = {}
external_id = 0
external_id, items, payments, client, company, total = items_convert(
data)
receipt['client'] = client
receipt["payments"] = payments
receipt["company"] = company
receipt['items'] = items
receipt['total'] = round(total,2)
return external_id, receipt
def get_sell(data: query):
sell = {}
current_datetime = datetime.datetime.now()
external_id, sell['receipt'] = get_receipt(data)
sell["timestamp"] = current_datetime.strftime(
'%d.%m.%Y %H:%M:%S')
sell["external_id"] = external_id
sell["service"] = {
'callback_url': f"http://api.jkhsakha.ru/kassa/{external_id}"
}
return sell
def get_client(phone: str):
if phone:
return {
'phone': phone
}
return { 'email': 'test@test.ru'}
def get_vat(vat: str):
return {'type': vat}
def add_atol(db: Session, engine: DBEngine, atol: Dict):
aa = engine.get_table("Atol")
atol.pop("is_taken", None)
a = aa.insert().values(atol)
db.execute(a)
db.commit()
def clear_dict(d):
if d is None:
return None
elif isinstance(d, list):
return list(filter(lambda x: x is not None, map(clear_dict, d)))
elif not isinstance(d, dict):
return d
else:
r = dict(
filter(lambda x: x[1] is not None,
map(lambda x: (x[0], clear_dict(x[1])),
d.items())))
if not bool(r):
return None
return r
def add_doc(sell: schemas.Sell, is_refund, token):
atol_model = Atol(token)
session = SessionLocal()
a = atol_model.set_sell(clear_dict(sell.dict()), is_refund)
ext_id = sell.external_id
err = a.pop('error', None)
if err:
print(err)
return doc.create_error(session, error = schemas.Error(**err), external_id = ext_id )
a["external_id"]= sell.external_id
check = models.Atol(**a)
session.add(check)
session.commit()
return a
def get_check(db: Session, uuid: str, external_id: str, token: str):
atol_model = Atol(token)
a = atol_model.get_reciepts(uuid)
doc.create_doc(db, schemas.Doc(**a), external_id)
return a
def get_atol_wo_doc(db: Session):
doc_sq = db.query(func.upper(models.Doc.external_id)).subquery()
return db.query(models.Atol.uuid, models.Atol.external_id).\
filter(func.upper(models.Atol.external_id).notin_(doc_sq))
def get_main_payment():
server = 'Sanctuary'
user = 'sa'
password = '159357'
dbname = 'fz54_details'
db_dicts_name = 'fz54'
db = DBEngine(server, dbname, user, password)
db_dicts = DBEngine(server, db_dicts_name, user, password)
session = Session(autocommit=False, autoflush=False, bind=db.engine)
payment_query = get_payment_details(session, db)
payment = payment_query.first()
if payment:
payments = get_payment_details_items(session, db, db_dicts, payment)
body = get_sell(payments)
payment = dict(payment)
print(payment.get("external_id"))
sell = schemas.Sell(**body)
atol = {
"external_id": payment.get("external_id"),
"is_refund": payment.get("is_refund"),
"is_taken": payment.get("is_taken"),
"body": str(body)
}
add_atol(session, db, atol)
update_payment_details(session, db, payment)
return sell, payment.get("is_refund"), get_token(session, db_dicts)
return False, False, False
def main():
while True:
payment, is_refund, token = get_main_payment()
if payment == False:
break
sleep(8)
doc = add_doc(sell=payment, is_refund=is_refund, token = token)
print(doc)
def get_actual_token():
server = 'Sanctuary'
user = 'sa'
password = '159357'
dbname = 'fz54_details'
db_dicts_name = 'fz54'
db = DBEngine(server, dbname, user, password)
db_dicts = DBEngine(server, db_dicts_name, user, password)
session = Session(autocommit=False, autoflush=False, bind=db.engine)
token = get_token(session, db_dicts)
return token
def run_get_check():
session = SessionLocal()
print(get_atol_wo_doc(session))
atols = get_atol_wo_doc(session).all()
while len(atols)>0:
sleep(0.2)
uuid, ext_id = atols.pop()
token = get_actual_token()
check = get_check(session, uuid, ext_id, token)
print(check)