68 lines
2.2 KiB
Python
68 lines
2.2 KiB
Python
|
|
from typing import Optional
|
|
from decouple import config
|
|
from datetime import datetime, timedelta
|
|
from sqlalchemy.orm import Session
|
|
from fastapi import Depends, Security, HTTPException
|
|
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
|
|
from fastapi.security.api_key import APIKey, APIKeyHeader
|
|
from . import crud, crypto, schemas
|
|
import jwt
|
|
|
|
import time
|
|
|
|
|
|
JWT_SECRET = config('jwt_secret')
|
|
JWT_ALGO = config('jwt_algorithm')
|
|
|
|
__API_KEY = config('API_KEY')
|
|
__API_KEY_NAME = config('API_KEY_NAME')
|
|
|
|
api_key_header = APIKeyHeader(name=__API_KEY_NAME)
|
|
|
|
def create_access_token(data : dict, expires_delta : Optional[timedelta] = None):
|
|
# TODO: Consider making non-expiring token
|
|
to_encode = data.copy() # Since we may change the dict
|
|
if expires_delta:
|
|
expire = datetime.utcnow() + expires_delta
|
|
else:
|
|
expire = datetime.utcnow() + timedelta(minutes=15)
|
|
to_encode.update({"exp": expire})
|
|
encoded_jwt = jwt.encode(to_encode, JWT_SECRET, algorithm=JWT_ALGO)
|
|
return encoded_jwt
|
|
|
|
def authenticate_user(db: Session, username : str, password : str):
|
|
user = crud.get_user_by_username(db, username)
|
|
if not user:
|
|
return False
|
|
return crypto.verify_key(password, user.passwd_salt, user.hashed_password)
|
|
|
|
def valid_api_key(key = Security(api_key_header)):
|
|
if not __API_KEY == key:
|
|
raise HTTPException(401, detail="invalid key")
|
|
return
|
|
|
|
def create_iot_dev_token(data: dict):
|
|
to_encode = data.copy()
|
|
encoded_jwt = jwt.encode(to_encode, JWT_SECRET, algorithm=JWT_ALGO)
|
|
return encoded_jwt
|
|
|
|
def valid_iot_door_token(token : str, db: Session):
|
|
try:
|
|
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGO])
|
|
except jwt.DecodeError:
|
|
return None
|
|
|
|
mac_signed = payload.get("bluetooth_mac")
|
|
device = crud.get_door_by_bluetooth_mac(db, mac_signed)
|
|
return device
|
|
|
|
def valid_iot_monitor_token(token : str, db: Session):
|
|
try:
|
|
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGO])
|
|
except jwt.DecodeError:
|
|
return None
|
|
|
|
mac_signed = payload.get("bluetooth_mac")
|
|
device = crud.get_monitor_by_bluetooth_mac(db, mac_signed)
|
|
return device |