55 lines
1.5 KiB
Python
55 lines
1.5 KiB
Python
from ast import Dict
|
|
from sqlalchemy.orm import Session
|
|
import kassa.schemas as schemas
|
|
import kassa.models as models
|
|
|
|
|
|
def create_doc(db: Session, doc: schemas.Doc, external_id: str = None):
|
|
d = doc.dict()
|
|
external_id = external_id.lower()
|
|
doc_query = db.query(models.Doc).filter(
|
|
models.Doc.external_id == external_id)
|
|
error = d.pop('error', None)
|
|
payload = d.pop('payload', None)
|
|
warnings = d.pop('warnings', None)
|
|
if error:
|
|
create_error(db, error, external_id)
|
|
if payload:
|
|
create_payload(db, payload, external_id)
|
|
if doc_query.first():
|
|
doc_query.update(values=d)
|
|
else:
|
|
doc_query = models.Doc(**d)
|
|
db.add(doc_query)
|
|
db.commit()
|
|
|
|
|
|
def create_error(db: Session, error: schemas.Error, external_id: str = None):
|
|
err = error
|
|
|
|
err['external_id'] = external_id
|
|
err_query = db.query(models.Error).filter(
|
|
models.Error.external_id == external_id)
|
|
if err_query.first():
|
|
|
|
err_query.update(values=err)
|
|
else:
|
|
err_query = models.Error(**err)
|
|
db.add(err_query)
|
|
db.commit()
|
|
|
|
|
|
def create_payload(db: Session, payload: schemas.Payload, external_id: str = None):
|
|
pay = payload
|
|
|
|
payload_query = db.query(models.Payload).filter(
|
|
models.Payload.external_id == external_id)
|
|
if payload_query.first():
|
|
payload_query.update(pay)
|
|
|
|
else:
|
|
pay['external_id'] = external_id
|
|
model = models.Payload(**pay)
|
|
db.add(model)
|
|
db.commit()
|