forked from VinokurovVE/tests
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
106 lines
3.8 KiB
106 lines
3.8 KiB
from sqlalchemy import select, update, delete
|
|
from fastapi.exceptions import HTTPException
|
|
from backend_fastapi.stored import exec_procedure
|
|
import backend_fastapi.models as models
|
|
from .database import async_session
|
|
import backend_fastapi.schemas as schemas
|
|
|
|
|
|
async def add_role(role: schemas.RoleCreate):
|
|
async with async_session() as session:
|
|
model = models.Role(name = role.name)
|
|
session.add(model)
|
|
await session.flush()
|
|
await session.commit()
|
|
return model
|
|
|
|
async def get_role_all(limit:int=10,page:int=0):
|
|
async with async_session() as session:
|
|
result = await session.scalars(select(models.Role).order_by(models.Role.id).limit(limit).offset(page*limit))
|
|
return result.all()
|
|
|
|
async def delete_role(id: int):
|
|
async with async_session() as session:
|
|
data = await session.scalars(select(models.Role).filter(models.Role.id == id))
|
|
result = data.one_or_none()
|
|
if not result:
|
|
raise HTTPException(status_code=404, detail="Item not found")
|
|
await session.execute(delete(models.Role).filter(models.Role.id == id))
|
|
await session.commit()
|
|
return result
|
|
|
|
|
|
async def update_role(role: schemas.Role, id: int):
|
|
async with async_session() as session:
|
|
data = await session.scalars(select(models.Role).filter(models.Role.id == id))
|
|
result = data.one_or_none()
|
|
if not result:
|
|
raise HTTPException(status_code=404, detail="Item not found")
|
|
|
|
query = update(models.Role).filter(models.Role.id == id).values(name = role.name)
|
|
await session.execute(query)
|
|
await session.commit()
|
|
return {f"{id=} был обновлен"}
|
|
|
|
async def add_user(user: schemas.UserCreate):
|
|
async with async_session() as session:
|
|
model = models.User(
|
|
**user.model_dump()
|
|
)
|
|
session.add(model)
|
|
await session.flush()
|
|
await session.commit()
|
|
return model
|
|
|
|
|
|
async def get_users(limit:int=10,page:int=0):
|
|
async with async_session() as session:
|
|
result = await session.scalars(select(models.User).order_by(models.User.id).limit(limit).offset(page*limit))
|
|
return result.all()
|
|
|
|
async def delete_user(id: int):
|
|
async with async_session() as session:
|
|
data = await session.scalars(select(models.User).filter(models.User.id == id))
|
|
result = data.one_or_none()
|
|
if not result:
|
|
raise HTTPException(status_code=404, detail="Item not found")
|
|
await session.execute(delete(models.User).filter(models.User.id == id))
|
|
await session.commit()
|
|
return result
|
|
|
|
|
|
|
|
async def update_user(user: schemas.User, id: int):
|
|
async with async_session() as session:
|
|
data = await session.scalars(select(models.User).filter(models.User.id == id))
|
|
result = data.one_or_none()
|
|
if not result:
|
|
raise HTTPException(status_code=404, detail="Item not found")
|
|
query = update(models.User).filter(models.User.id == id)\
|
|
.values(
|
|
firstname = user.firstname,
|
|
lastname = user.lastname,
|
|
email = user.email,
|
|
hashed_password= user.hashed_password,
|
|
is_active = user.is_active,
|
|
role_id = user.role_id,
|
|
)
|
|
await session.execute(query)
|
|
await session.commit()
|
|
return {f"{id=} был обновлен"}
|
|
|
|
|
|
async def get_objects():
|
|
async with async_session() as session:
|
|
result = await session.scalars(select(models.Object))
|
|
return result.all()
|
|
|
|
|
|
async def get_values():
|
|
async with async_session() as session:
|
|
result = await session.scalars(select(models.Value))
|
|
return result.all()
|
|
|
|
|
|
async def get_stored_roles():
|
|
return await exec_procedure('get_roles')
|