from sqlalchemy import create_engine, Column, MetaData, ForeignKey, insert, func import datetime from contextlib import asynccontextmanager from pydantic import BaseModel, Field from typing import Sequence from dotenv import load_dotenv import asyncio import os from clickhouse_sqlalchemy import ( Table, make_session, get_declarative_base, types, engines, select ) from fastapi import FastAPI load_dotenv() uri = os.getenv("DATABASE_URL") engine = create_engine(uri) session = make_session(engine) metadata = MetaData() Base = get_declarative_base(metadata=metadata) class Hosts(Base): __tablename__ = "hosts" url = Column(types.String, unique=False,primary_key=True) delay = Column(types.Int16, default=60) description = Column(types.String, nullable=True) is_alive = Column(types.Boolean, default=False) last_seen = Column(types.DateTime, server_default=func.now()) last_alive = Column(types.DateTime, server_default=func.now()) __table_args__ = ( engines.Memory(), ) class HostBase(BaseModel): url: str class HostDescription(BaseModel): description: str | None delay: int | None class HostLive(BaseModel): is_alive: bool | None last_seen: datetime.datetime | None last_alive: datetime.datetime | None class HostCreate(HostBase, HostDescription): pass class HostSchema(HostBase,HostDescription, HostLive): pass async def get_hosts(): return session.query(Hosts).all() async def get_host(host_url:str): if host:=session.get(Hosts, host_url): return host return None async def insert_host(host: HostCreate): data= Hosts( **host.dict() ) session.add(data) session.commit() return data async def remove_host(host_url: int): data = session.get(Hosts, host_url) if data: session.delete(data) session.commit() return data return None async def update_host_description(host_url: str, update_date: HostDescription): data = session.get(Hosts, host_url) if data: if update_date.description: data.description = update_date.description if update_date.delay: data.delay = update_date.delay session.add(data) session.commit() return data return None async def update_host_live(host_url: str, is_alive: bool): data = session.get(Hosts, host_url) if data: data.is_alive = is_alive data.last_alive =datetime.datetime.now() if is_alive: data.last_seen =datetime.datetime.now() session.add(data) session.commit() return data return None async def check_alive(url: str): from ping_checker import sock_ch is_alive = sock_ch(url) await update_host_live(url, is_alive) async def delayed_alive(url:str, timeout:int): await asyncio.sleep(timeout) host =await get_host(url) if host: await check_alive(url) return asyncio.create_task(delayed_alive(url, host.delay)) class BackgroundTask: def __init__(self): pass async def my_task(self): hosts = await get_hosts() for host in hosts: asyncio.create_task(delayed_alive(host.url, 0)) bgtask = BackgroundTask() @asynccontextmanager async def lifespan(app: FastAPI): # Load the ML model asyncio.create_task(bgtask.my_task()) yield # Clean up the ML models and release the resources tasks = asyncio.all_tasks() # cancel all tasks for task in tasks: # request the task cancel task.cancel() app = FastAPI(lifespan=lifespan) @app.get("/") async def show_hosts() -> Sequence[HostSchema]: data = await get_hosts() return data @app.post("/") async def add_host(host: HostCreate) ->HostSchema: asyncio.create_task(delayed_alive(host.url, host.delay)) return await insert_host(host) @app.delete("/") async def delete_host(host_url: str) ->HostSchema | None: return await remove_host(host_url) @app.put("/") async def update_description(host_url: str, update_date: HostDescription)->HostSchema: return await update_host_description(host_url, update_date) if __name__ == "__main__": import uvicorn Base.metadata.create_all(engine, checkfirst=True) port = os.getenv("PORT") or 4004 uvicorn.run(app=app, host="0.0.0.0", port=int(port))