admin: All admin path functions require an APIKey

Signed-off-by: HeshamTB <hishaminv@gmail.com>
This commit is contained in:
HeshamTB 2022-04-14 07:16:28 +03:00
parent bacf7bd1f0
commit b08a24bedf
Signed by: Hesham
GPG Key ID: 74876157D199B09E
2 changed files with 19 additions and 10 deletions

View File

@ -3,8 +3,9 @@ from typing import Optional
from decouple import config from decouple import config
from datetime import datetime, timedelta from datetime import datetime, timedelta
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from fastapi import Depends from fastapi import Depends, Security, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from fastapi.security.api_key import APIKey, APIKeyHeader
from . import crud, crypto, schemas from . import crud, crypto, schemas
import jwt import jwt
@ -14,7 +15,10 @@ import time
JWT_SECRET = config('jwt_secret') JWT_SECRET = config('jwt_secret')
JWT_ALGO = config('jwt_algorithm') JWT_ALGO = config('jwt_algorithm')
__API_KEY = config('API_KEY')
__API_KEY_NAME = config('API_KEY_NAME')
api_key_header = APIKeyHeader(name=__API_KEY_NAME)
def create_access_token(data : dict, expires_delta : Optional[timedelta] = None): def create_access_token(data : dict, expires_delta : Optional[timedelta] = None):
# TODO: Consider making non-expiring token # TODO: Consider making non-expiring token
@ -33,3 +37,7 @@ def authenticate_user(db: Session, username : str, password : str):
return False return False
return crypto.verify_key(password, user.passwd_salt, user.hashed_password) return crypto.verify_key(password, user.passwd_salt, user.hashed_password)
def valid_api_key(key = Security(api_key_header)):
if not __API_KEY == key:
raise HTTPException(401, detail="invalid key")
return

View File

@ -1,5 +1,6 @@
from fastapi import Depends, FastAPI, HTTPException, status from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from fastapi.security.api_key import APIKey
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from . import crud, models, schemas, auth_helper from . import crud, models, schemas, auth_helper
@ -65,23 +66,23 @@ def get_user_details(current_user: schemas.User = Depends(get_current_active_use
return current_user return current_user
@app.get("/admin/users/", response_model=List[schemas.User], tags=['Admin']) @app.get("/admin/users/", response_model=List[schemas.User], tags=['Admin'])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db), api_key: APIKey = Depends(auth_helper.valid_api_key)):
users = crud.get_users(db, skip=skip, limit=limit) users = crud.get_users(db, skip=skip, limit=limit)
return users return users
@app.get("/admin/iotentities/", response_model=List[schemas.IotEntity], tags=['Admin']) @app.get("/admin/iotentities/", response_model=List[schemas.IotEntity], tags=['Admin'])
def read_iot_entities(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): def read_iot_entities(skip: int = 0, limit: int = 100, db: Session = Depends(get_db), api_key: APIKey = Depends(auth_helper.valid_api_key)):
iot_entities = crud.get_iot_entities(db, skip=skip, limit=limit) iot_entities = crud.get_iot_entities(db, skip=skip, limit=limit)
return iot_entities return iot_entities
# TODO: Can duplicate # TODO: Can duplicate
@app.post("/admin/iotentities/create", response_model=schemas.IotEntity, tags=['Admin']) @app.post("/admin/iotentities/create", response_model=schemas.IotEntity, tags=['Admin'])
def create_iot_entities(iot_entity: schemas.IotEntityCreate, db: Session = Depends(get_db)): def create_iot_entities(iot_entity: schemas.IotEntityCreate, db: Session = Depends(get_db), api_key: APIKey = Depends(auth_helper.valid_api_key)):
iot_entities = crud.create_iot_entity(db, iot_entity) iot_entities = crud.create_iot_entity(db, iot_entity)
return iot_entities return iot_entities
@app.get("/admin/users/{user_id}", response_model=schemas.User, tags=['Admin']) @app.get("/admin/users/{user_id}", response_model=schemas.User, tags=['Admin'])
def read_user(user_id: int, db: Session = Depends(get_db)): def read_user(user_id: int, db: Session = Depends(get_db), api_key: APIKey = Depends(auth_helper.valid_api_key)):
db_user = crud.get_user(db, user_id=user_id) db_user = crud.get_user(db, user_id=user_id)
if db_user is None: if db_user is None:
raise HTTPException(status_code=404, detail="User not found") raise HTTPException(status_code=404, detail="User not found")
@ -89,7 +90,7 @@ def read_user(user_id: int, db: Session = Depends(get_db)):
# TODO: Can duplicate # TODO: Can duplicate
@app.post("/admin/users/allowdevice/id", tags=['Admin']) @app.post("/admin/users/allowdevice/id", tags=['Admin'])
def allow_user_for_iot_entity_by_id(request: schemas.UserAllowForIotEntityRequestByID, db: Session = Depends(get_db)): def allow_user_for_iot_entity_by_id(request: schemas.UserAllowForIotEntityRequestByID, db: Session = Depends(get_db), api_key: APIKey = Depends(auth_helper.valid_api_key)):
user = crud.get_user(db, request.user_id) user = crud.get_user(db, request.user_id)
if not user: if not user:
raise HTTPException(status_code=404, detail="User not found") raise HTTPException(status_code=404, detail="User not found")
@ -105,7 +106,7 @@ def allow_user_for_iot_entity_by_id(request: schemas.UserAllowForIotEntityReques
return user return user
@app.post("/admin/users/disallowdevice/id", tags=['Admin']) @app.post("/admin/users/disallowdevice/id", tags=['Admin'])
def disallow_user_for_iot_entity_by_id(request: schemas.UserAllowForIotEntityRequestByID, db: Session = Depends(get_db)): def disallow_user_for_iot_entity_by_id(request: schemas.UserAllowForIotEntityRequestByID, db: Session = Depends(get_db), api_key: APIKey = Depends(auth_helper.valid_api_key)):
user = crud.get_user(db, request.user_id) user = crud.get_user(db, request.user_id)
if not user: if not user:
raise HTTPException(status_code=404, detail="User not found") raise HTTPException(status_code=404, detail="User not found")
@ -122,7 +123,7 @@ def disallow_user_for_iot_entity_by_id(request: schemas.UserAllowForIotEntityReq
return return
@app.post("/admin/users/allowdevice/name", tags=['Admin']) @app.post("/admin/users/allowdevice/name", tags=['Admin'])
def allow_user_for_iot_entity_by_name(request: schemas.UserAllowForIotEntityRequestByUsername, db: Session = Depends(get_db)): def allow_user_for_iot_entity_by_name(request: schemas.UserAllowForIotEntityRequestByUsername, db: Session = Depends(get_db), api_key: APIKey = Depends(auth_helper.valid_api_key)):
user = crud.get_user_by_username(db, request.username) user = crud.get_user_by_username(db, request.username)
if not user: if not user:
raise HTTPException(status_code=404, detail="User not found") raise HTTPException(status_code=404, detail="User not found")
@ -138,11 +139,11 @@ def allow_user_for_iot_entity_by_name(request: schemas.UserAllowForIotEntityRequ
return return
@app.post("/admin/users/{user_id}/deactiveate", tags=['Admin']) @app.post("/admin/users/{user_id}/deactiveate", tags=['Admin'])
def deactiveate_user(user_id: int, db:Session = Depends(get_db)): def deactiveate_user(user_id: int, db:Session = Depends(get_db), api_key: APIKey = Depends(auth_helper.valid_api_key)):
return return
@app.post("/admin/users/{user_id}/activeate", tags=['Admin']) @app.post("/admin/users/{user_id}/activeate", tags=['Admin'])
def deactiveate_user(user_id: int, db:Session = Depends(get_db)): def deactiveate_user(user_id: int, db:Session = Depends(get_db), api_key: APIKey = Depends(auth_helper.valid_api_key)):
return return
@app.get("/users/acesslist/", response_model=List[schemas.IotEntity], tags=['Users']) @app.get("/users/acesslist/", response_model=List[schemas.IotEntity], tags=['Users'])