Files
service-app/kv/crud.py
2022-06-16 09:38:24 +09:00

422 lines
15 KiB
Python

from sqlalchemy.orm import Session
from sqlalchemy import text, sql
from kv.database import get_table
from kv import schemas
def exec_procedure(session, proc_name, params, database: str = None):
sql_params = ",".join([f"@{key} = :{key}"
for key, value in params.items()])
dbstr = ""
if database:
dbstr = f"[{database}]."
sql_string = sql.text(f'''
DECLARE @return_value int;
EXEC @return_value = {dbstr}[dbo].[{proc_name}] {sql_params};
SELECT 'Return Value' = @return_value;
''')
data = session.execute(
sql_string, params).fetchall()
session.commit()
session.expire_all()
return data
def exec_procedure_wo_result(session, proc_name, params, database: str = None):
sql_params = ",".join([f"@{key} = :{key}"
for key, value in params.items()])
dbstr = ""
if database:
dbstr = f"[{database}]."
sql_string = sql.text(f'''
EXEC @return_value = {dbstr}[dbo].[{proc_name}] {sql_params};
''')
session.execute(
sql_string, params).fetchall()
session.commit()
"""PersonalAccountViewSet"""
class PersonalAccountViewSet:
def get_turn_over(db: Session, IDPersonalAccount: int = None):
return exec_procedure(db, 'uspGetPersonalAccountTurnOver', {'IDPersonalAccount': IDPersonalAccount})
def get_details(db: Session, id: int = None):
table = get_table("PersonalAccount")
return db.query(table).filter(table.c["ID"] == id).first()
def get_services(db: Session, IDPersonalAccount: int = None):
return exec_procedure(db, 'uspGetPersonalAccountServices', {'IDPersonalAccount': IDPersonalAccount})
def get_debt(db: Session, data: schemas.PersonalAccountInit = None):
return exec_procedure(db, 'uspGetPersonalAccountDebt', data.dict())
def get_debts(db: Session, data: schemas.PersonalAccountType = None):
return exec_procedure(db, 'uspGetPersonalAccountDebts', data.dict())
def get_financial_account(db: Session, data: schemas.PersonalAccountReportTOFinancialInit = None):
return exec_procedure(db, 'uspGetPersonalAccountReportTOFinancialAccount', data.dict())
"""AddressInfoViewSet"""
class AddressInfoViewSet:
"""get"""
def get_details(db: Session, id: int = None):
table = get_table("RefAddresses", "General")
return db.query(table).filter(table.c["ID"] == id).first()
def get_personal_accounts(db: Session, IDCity: int = None):
return exec_procedure(db, 'uspGetPersonalAccountsList', {'IDCity': IDCity})
def get_addresses(db: Session, IDCity: int = None):
return exec_procedure(db, 'uspGetAddressesByCity', {'IDCity': IDCity})
def get_address_personal_accounts(db: Session, IDAddress: int = None):
return exec_procedure(db, 'uspGetAddressPersonalAccounts', {'IDAddress': IDAddress})
def get_address_object_details(db: Session, id: int = None):
return exec_procedure(db, 'uspGetAddressAndObjectDetails', {'ID': id})
def get_metering_devices(db: Session, IDAddress: int = None):
return exec_procedure(db, 'uspGetAddressMeteringDevices', {'IDAddress': IDAddress})
"""post"""
def edit(db: Session, data: schemas.EditAdressInit = None):
try:
exec_procedure_wo_result(db, 'uspEditAddress', **data)
return {'msg': 'success'}
except:
return {'msg': 'error'}
def edit_personal_account(db: Session, data: schemas.PersonalAccountDetailsSerializer = None):
try:
exec_procedure_wo_result(db, 'uspEditPersonalAccount', **data)
return {'msg': 'success'}
except:
return {'msg': 'error'}
def edit_personal_account_address(db: Session, data: schemas.EditPersonalAccountAddressInit = None):
try:
exec_procedure_wo_result(
db, 'uspEditPersonalAccountAddress', **data)
return {'msg': 'success'}
except:
return {'msg': 'error'}
"""ObjectViewSet"""
class ObjectViewSet:
"""get"""
def get_details(db: Session, IDObject: str = None):
print(IDObject)
return exec_procedure(db, 'uspGetObjectDetails', {'IDObject': IDObject})
def get_municipals(db: Session, IDCity: int = None):
return exec_procedure(db, 'uspGetObjectMunicipals', {'IDCity': IDCity})
def get_services(db: Session, IDObject: str = None):
return exec_procedure(db, 'uspGetObjectServices', {'IDObject': IDObject})
def get_addresses(db: Session, IDObject: str = None):
return exec_procedure(db, 'uspGetObjectAddresses', {'IDObject': IDObject})
def get_metering_devices(db: Session, IDObject: str = None):
return exec_procedure(db, 'uspGetObjectMeteringDevices', {'IDObject': IDObject})
"""post"""
def edit(db: Session, data: schemas.ObjectDetailsSerializer = None):
try:
exec_procedure_wo_result(db, 'uspEditObject', **data)
return {'msg': 'success'}
except:
return {'msg': 'error'}
def edit_service(db: Session, data: schemas.EditObjectServiceInit = None):
try:
exec_procedure_wo_result(db, 'uspEditObject', **data)
return {'msg': 'success'}
except:
return {'msg': 'error'}
def edit_temp_metering_device_address(db: Session, data: schemas.EditTempMeteringDeviceAddressInit = None):
try:
exec_procedure_wo_result(
db, 'uspEditTempMeteringDeviceAddress', **data, database='General')
return {'msg': 'success'}
except:
return {'msg': 'error'}
def add_object(db: Session, data: schemas.AddObjectInit = None):
try:
exec_procedure_wo_result(db, 'uspAddObject', **data)
return {'msg': 'success'}
except:
return {'msg': 'error'}
"""TurnOverViewSet"""
class TurnOverViewSet:
"""get"""
"""post"""
def get_to_archive(db: Session, data: schemas.ArchiveInit = None):
return exec_procedure(db, 'uspGetTOArchive', **data)
def make_storno(db: Session, data: schemas.StronoInit = None):
try:
exec_procedure_wo_result(db, 'uspStorno', **data)
return {'msg': 'success'}
except:
return {'msg': 'error'}
"""PaymentViewSet"""
class PaymentViewSet:
"""get"""
def get_money_types(db: Session):
return db.query(get_table('RefMoneyTypes', 'General')).all()
def get_worker_types(db: Session):
return db.query(get_table('RefWorkerTypes', 'General')).all()
def get_payer_category_types(db: Session):
return db.query(get_table('RefPayerCategoryTypes', 'General')).all()
def get_services_additional(db: Session):
uslugi_tb = get_table('uslugi', 'spr')
data_query = db.query(uslugi_tb).filter(uslugi_tb.c["vid_usl"] == 2).order_by(
uslugi_tb.c["name"]).with_entities(uslugi_tb.c["kod"].label("ID"), uslugi_tb.c["name"].label("Name"))
return data_query.all()
"""post"""
def get_receipt_calculate(db: Session, data: schemas.RecieptCalculateInit):
return exec_procedure(db, 'uspGetReceiptCalculate', **data)
def save_receipt(db: Session, data: schemas.ReceiptSaveInit):
return exec_procedure(db, 'uspSaveReceipt', data.dict())
def repayment_info(db: Session, data: schemas.RepaymentInfoInit):
return exec_procedure(db, 'uspGetRepaymentInfo', **data)
def repayment(db: Session, data: schemas.RepaymentInit):
return exec_procedure(db, 'uspRepayment', **data)
"""ReportViewSet"""
class ReportViewSet:
"""get"""
"""post"""
def repayment(db: Session, data: schemas.RentRegisterNoticeInit):
return exec_procedure(db, 'uspGetRentRegisterNotices', **data)
"""AccrualViewSet"""
class AccrualViewSet:
"""get"""
def get_municipals(db: Session, data: schemas.AccountInit):
return exec_procedure(db, 'uspCalculateAccrued', data.dict())
"""post"""
"""WithdrawingViewSet"""
class RecalculationViewSet:
"""get"""
def get_withdrawing_types(db: Session):
return db.query(get_table('RefWithdrawingTypes', 'General')).all()
"""post"""
def set_report_filter_list(db: Session, data: schemas.WithdrawingCheckInit):
return exec_procedure(db, 'uspGetWithdrawingCheck', **data)
def get_withdrawing_temp(db: Session, data: schemas.WithdrawingTempInit):
return exec_procedure(db, 'uspGetWithdrawingTemp', **data)
def get_accruals_can_be_recalculated(db: Session, data: schemas.AccrualsCanBeRecalculatedInit):
return exec_procedure(db, 'uspGetAccrualsCanBeRecalculated', data.dict())
def save_withdrawing(db: Session, data: schemas.WithdrawingSave = None):
try:
exec_procedure_wo_result(db, 'uspSaveWithdrawing', **data)
return {'msg': 'success'}
except:
return {'msg': 'error'}
def save_recalculation(db: Session, data: schemas.RecalculationSave = None):
try:
exec_procedure_wo_result(db, 'uspSaveRecalculation', data.dict())
return {'msg': 'success'}
except:
return {'msg': 'error'}
"""ReferenceViewSet"""
class ReferenceViewSet:
"""get"""
def get_personal_accounts(db: Session):
tb = get_table('PersonalAccount')
return db.query(tb).with_entities(tb.c["ID"]).all()
def get_objects(db: Session, data: schemas.CityInit):
return exec_procedure(db, 'uspGetObjects', data.dict())
def get_services(db: Session):
tb = get_table('uslugi', 'spr')
query = db.query(tb).with_entities(
tb.c["kod"].label("ID"), tb.c["name"].label("Name"))
return query.all()
def get_provider_types(db: Session):
tb = get_table('RefProviderTypes', 'General')
return db.query(tb).with_entities(tb.c["ID"], tb.c["Name"]).all()
def get_providers(db: Session):
tb = get_table('RefProviders', 'General')
return db.query(tb).with_entities(tb.c["ID"], tb.c["Name"]).all()
def get_systems_water(db: Session):
tb = get_table('RefSystemsWater')
return db.query(tb).all()
def get_systems_water_period_types(db: Session):
tb = get_table('RefSystemsWaterPeriodTypes')
return db.query(tb).all()
def get_provider_contracts(db: Session, data: schemas.PAInit):
return exec_procedure(db, 'uspGetProviderContracts', data.dict())
def get_providers_utility_services(db: Session, data: schemas.CityInit):
tb = get_table('RefProvidersUtilityServices', 'General')
data_query = db.query(tb).filter(tb.c["IDCity"] == data.IDCity).with_entities(
tb.c["ID"], tb.c["ShortName"].label("Name"))
return data_query.all()
def get_management_companies(db: Session):
tb = get_table('RefManagementCompany', 'General')
return db.query(tb).with_entities(tb.c["ID"], tb.c["Name"]).all()
def get_boilers_by_city(db: Session, data: schemas.CityInit):
tb = get_table('vBoilers', 'General')
data_query = db.query(tb).filter(tb.c["id_city"] == data.IDCity).with_entities(
tb.c["ID"], tb.c["Name"])
return data_query.all()
def get_streets_by_city(db: Session, data: schemas.CityInit):
tb = get_table('vStreets', 'General')
data_query = db.query(tb).filter(tb.c["id_city"] == data.IDCity).with_entities(
tb.c["ID"], tb.c["Name"])
return data_query.all()
def get_metering_devices_services(db: Session):
qu = text(
"select kod ID, case when kod=3 then 'Горячая вода' else sname end Name from spr..uslugi where uchet=1 order by kod")
return db.execute(qu).fetchall()
def get_owhership_types(db: Session):
tb = get_table('RefObjectOwnershipTypes', 'General')
return db.query(tb).with_entities(tb.c["ID"], tb.c["Name"]).all()
def get_object_improvement_degree_types(db: Session):
tb = get_table('RefObjectImprovementDegreeTypes', 'General')
return db.query(tb).all()
def get_object_gas_types(db: Session):
tb = get_table('RefObjectGasTypes', 'General')
return db.query(tb).with_entities(tb.c["ID"], tb.c["Name"]).all()
def get_object_heating_types(db: Session):
tb = get_table('RefObjectHeatingTypes', 'General')
return db.query(tb).with_entities(tb.c["ID"], tb.c["Name"]).all()
def get_object_wiring_types(db: Session):
tb = get_table('RefObjectWiringTypes', 'General')
return db.query(tb).with_entities(tb.c["ID"], tb.c["Name"]).all()
def get_object_overlapping_types(db: Session):
tb = get_table('RefObjectOverlappingTypes', 'General')
return db.query(tb).with_entities(tb.c["ID"], tb.c["Name"]).all()
def get_object_wall_material_types(db: Session):
tb = get_table('RefObjectWallMaterialTypes', 'General')
return db.query(tb).with_entities(tb.c["ID"], tb.c["Name"]).all()
def get_object_roof_types(db: Session):
tb = get_table('RefObjectRoofTypes', 'General')
return db.query(tb).with_entities(tb.c["ID"], tb.c["Name"]).all()
def get_object_foundation_types(db: Session):
tb = get_table('RefObjectFoundationTypes', 'General')
return db.query(tb).with_entities(tb.c["ID"], tb.c["Name"]).all()
def get_tariffs_population(db: Session):
tb = get_table('TariffPopulation')
return db.query(tb).with_entities(tb.c["ID"], tb.c["Name"]).all()
def get_courts_types(db: Session):
tb = get_table('RefCourtsTypes')
return db.query(tb).with_entities(tb.c["ID"], tb.c["Name"]).all()
def get_address_types(db: Session):
tb = get_table('RefAddressType')
return db.query(tb).with_entities(tb.c["ID"], tb.c["Name"]).all()
"""post"""
def get_advanced_search(db: Session, data: schemas.AdvancedSearchInit):
return exec_procedure(db, 'uspGetAdvancedSearch', **data)
def get_metering_devices_types(db: Session, data: schemas.ServiceIDInit):
tb = get_table('RefMeteringDevicesTypes', 'General')
data_query = db.query(tb).filter(tb.c["IDService"] == data.IDService).order_by(
tb.c["Name"]).with_entities(tb.c["ID"], tb.c["Name"])
return data_query.all()
def get_object_metering_devices_types(db: Session, data: schemas.ObjectMeteringDevicesTypesInit):
return exec_procedure(db, 'uspGetMeteringDeviceTypes', params=data, database='General')
def get_object_metering_device_addresses(db: Session, data: schemas.ObjectMeteringDeviceAddressInit):
return exec_procedure(db, 'uspGetMeteringDeviceTypes', params=data, database='General')
def get_object_water_system_volumes(db: Session, data: schemas.ObjectWaterSystemVolumesInit):
return exec_procedure(db, 'uspGetObjectWaterSystemVolumes', **data)
def get_tariff_population(db: Session, data: schemas.TariffsPopulationInit):
return exec_procedure(db, 'uspGetTariffsPopulation', **data)
def get_object_tariff_provider(db: Session, data: schemas.TariffProviderInit):
return exec_procedure(db, 'uspGetObjectTariffProvider', **data)
def edit_metering_device(db: Session, data: schemas.MeteringDeviceInit):
return exec_procedure_wo_result(db, 'uspEditMeteringDevice', **data)