sql_app: Reorder

Signed-off-by: HeshamTB <hishaminv@gmail.com>
This commit is contained in:
HeshamTB 2022-06-07 12:58:56 +03:00
parent bf14c97cb6
commit 01e6c990f7
Signed by: Hesham
GPG Key ID: 74876157D199B09E

View File

@ -86,6 +86,66 @@ def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
def get_user_details(current_user: schemas.User = Depends(get_current_active_user)):
return current_user
@app.post("/users/open", tags=['Users'])
def issue_open_door_command(command: schemas.OpenDoorRequestTime,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_current_active_user)):
err = HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,
detail="Unauthrized to open")
device = crud.get_iot_entity_by_bluetooth_mac(db, command.bluetooth_mac)
if not device: raise err
# TODO: Use database search rather then this linear search
user = crud.get_user(db, current_user.id)
for dev in user.authorized_devices:
if dev.bluetooth_mac == device.bluetooth_mac:
crud.set_open_door_request(db, device.id, command.time_seconds)
log_entry = schemas.DoorAccessLog(user_id=current_user.id,
door_bluetooth_mac=command.bluetooth_mac,
time=datetime.now())
crud.record_door_access_log(db, log_entry)
return device
raise err
@app.post("/users/tkn", response_model=schemas.Token, tags=['Users'])
@app.post("/tkn", response_model=schemas.Token, tags=['Users'])
def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
user = auth_helper.authenticate_user(db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
#access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = auth_helper.create_access_token(
data={"sub": form_data.username}, expires_delta=timedelta(minutes=15)
)
crud.set_user_last_token(db, form_data.username, access_token)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/acesslist/", response_model=List[schemas.RoomOverview], tags=['Users'])
def get_iot_access_list_for_user(db: Session = Depends(get_db), current_user: schemas.User = Depends(get_current_active_user)):
user = crud.get_user_by_username(db, current_user.username)
access_list = list()
for device in user.authorized_devices:
dev_db : models.IotEntity = device
sensors = crud.get_room_data_now(db)
if not sensors: raise HTTPException(status_code=500, detail="No Room link")
entry : schemas.RoomOverview = schemas.RoomOverview(
id=dev_db.id,
description=dev_db.description,
bluetooth_mac=dev_db.bluetooth_mac,
open_request=dev_db.open_request,
time_seconds=dev_db.time_seconds,
acces_list_counter=dev_db.acces_list_counter,
humidity=sensors.humidity,
people=sensors.people,
temperature=sensors.temperature,
smoke_sensor_reading=sensors.smoke_sensor_reading
)
access_list.append(entry)
return access_list
@app.get("/admin/users/", response_model=List[schemas.User], tags=['Admin'])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = crud.get_users(db, skip=skip, limit=limit)
@ -196,71 +256,10 @@ def get_access_log_history_for_user(request : schemas.UserAccessLogRequestUserna
if not user: raise HTTPException(status_code=404, detail="User not found")
return crud.get_access_log_for_user_by_id(db, user.id)
@app.get("/users/acesslist/", response_model=List[schemas.RoomOverview], tags=['Users'])
def get_iot_access_list_for_user(db: Session = Depends(get_db), current_user: schemas.User = Depends(get_current_active_user)):
user = crud.get_user_by_username(db, current_user.username)
access_list = list()
for device in user.authorized_devices:
dev_db : models.IotEntity = device
sensors = crud.get_room_data_now(db)
if not sensors: raise HTTPException(status_code=500, detail="No Room link")
entry : schemas.RoomOverview = schemas.RoomOverview(
id=dev_db.id,
description=dev_db.description,
bluetooth_mac=dev_db.bluetooth_mac,
open_request=dev_db.open_request,
time_seconds=dev_db.time_seconds,
acces_list_counter=dev_db.acces_list_counter,
humidity=sensors.humidity,
people=sensors.people,
temperature=sensors.temperature,
smoke_sensor_reading=sensors.smoke_sensor_reading
)
access_list.append(entry)
return access_list
@app.get("/admin/roominfo/now/", tags=['Admin'])
def get_room_data(db: Session = Depends(get_db)):
return crud.get_room_data_now(db)
@app.post("/users/open", tags=['Users'])
def issue_open_door_command(command: schemas.OpenDoorRequestTime,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_current_active_user)):
err = HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,
detail="Unauthrized to open")
device = crud.get_iot_entity_by_bluetooth_mac(db, command.bluetooth_mac)
if not device: raise err
# TODO: Use database search rather then this linear search
user = crud.get_user(db, current_user.id)
for dev in user.authorized_devices:
if dev.bluetooth_mac == device.bluetooth_mac:
crud.set_open_door_request(db, device.id, command.time_seconds)
log_entry = schemas.DoorAccessLog(user_id=current_user.id,
door_bluetooth_mac=command.bluetooth_mac,
time=datetime.now())
crud.record_door_access_log(db, log_entry)
return device
raise err
@app.post("/users/tkn", response_model=schemas.Token, tags=['Users'])
@app.post("/tkn", response_model=schemas.Token, tags=['Users'])
def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
user = auth_helper.authenticate_user(db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
#access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = auth_helper.create_access_token(
data={"sub": form_data.username}, expires_delta=timedelta(minutes=15)
)
crud.set_user_last_token(db, form_data.username, access_token)
return {"access_token": access_token, "token_type": "bearer"}
@app.post("/iotdevice/door/status", response_model=schemas.IotDoorPollingResponse, tags=['Iot'])
def polling_method_for_iot_entity(request: schemas.IotDoorPollingRequest,
db: Session = Depends(get_db)):