from fastapi import Depends, FastAPI, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from sqlalchemy.orm import Session from . import crud, models, schemas, auth_helper from .database import SessionLocal, engine from typing import List from datetime import timedelta import jwt models.Base.metadata.create_all(bind=engine) #oauth2_scheme = OAuth2PasswordBearer(tokenUrl="tkn") oauth = OAuth2PasswordBearer(tokenUrl="tkn") app = FastAPI() # Dependency def get_db(): db = SessionLocal() try: yield db finally: db.close() async def get_current_user(token: str = Depends(oauth), db: Session = Depends(get_db)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, auth_helper.JWT_SECRET, algorithms=[auth_helper.JWT_ALGO]) username: str = payload.get("sub") if username is None: raise credentials_exception token_data = schemas.TokenData(username=username) except jwt.PyJWTError: raise credentials_exception user = crud.get_user_by_username(db, username=token_data.username) if user is None: raise credentials_exception return user async def get_current_active_user(current_user: schemas.User = Depends(get_current_user)): if not current_user.is_active: raise HTTPException(status_code=400, detail="Inactive user") return current_user @app.post("/users/reg", response_model=schemas.User, tags=['users']) def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)): db_user = crud.get_user_by_email(db, email=user.email) if db_user: raise HTTPException(status_code=400, detail="Email already registered") db_user = crud.get_user_by_username(db, username=user.username) if db_user: raise HTTPException(status_code=400, detail="Username already registerd") return crud.create_user(db=db, user=user) @app.get("/users/", response_model=List[schemas.User]) def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): users = crud.get_users(db, skip=skip, limit=limit) return users @app.get("/users/{user_id}", response_model=schemas.User) def read_user(user_id: int, db: Session = Depends(get_db)): db_user = crud.get_user(db, user_id=user_id) if db_user is None: raise HTTPException(status_code=404, detail="User not found") return db_user @app.post("/users/{user_id}/items/", response_model=schemas.IotEntity) def create_item_for_user( user_id: int, item: schemas.IotEntityCreate, db: Session = Depends(get_db) ): return crud.create_user_item(db=db, item=item, user_id=user_id) @app.get("/items", response_model=List[schemas.IotEntity]) def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): items = crud.get_items(db, skip=skip, limit=limit) return items @app.post("/tkn", response_model=schemas.Token) async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)): user = auth_helper.authenticate_user(db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) #access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = auth_helper.create_access_token( data={"sub": form_data.username}, expires_delta=timedelta(minutes=15) ) return {"access_token": access_token, "token_type": "bearer"} @app.get("/users/me/", response_model=schemas.User) async def read_users_me(current_user: schemas.User = Depends(get_current_active_user)): return current_user