diff --git a/sql_app/main.py b/sql_app/main.py index 75a50cc..6db3665 100644 --- a/sql_app/main.py +++ b/sql_app/main.py @@ -86,6 +86,66 @@ def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)): def get_user_details(current_user: schemas.User = Depends(get_current_active_user)): return current_user +@app.post("/users/open", tags=['Users']) +def issue_open_door_command(command: schemas.OpenDoorRequestTime, + db: Session = Depends(get_db), + current_user: schemas.User = Depends(get_current_active_user)): + err = HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, + detail="Unauthrized to open") + device = crud.get_iot_entity_by_bluetooth_mac(db, command.bluetooth_mac) + if not device: raise err + # TODO: Use database search rather then this linear search + user = crud.get_user(db, current_user.id) + for dev in user.authorized_devices: + if dev.bluetooth_mac == device.bluetooth_mac: + crud.set_open_door_request(db, device.id, command.time_seconds) + log_entry = schemas.DoorAccessLog(user_id=current_user.id, + door_bluetooth_mac=command.bluetooth_mac, + time=datetime.now()) + crud.record_door_access_log(db, log_entry) + return device + raise err + +@app.post("/users/tkn", response_model=schemas.Token, tags=['Users']) +@app.post("/tkn", response_model=schemas.Token, tags=['Users']) +def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)): + user = auth_helper.authenticate_user(db, form_data.username, form_data.password) + if not user: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Incorrect username or password", + headers={"WWW-Authenticate": "Bearer"}, + ) + #access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) + access_token = auth_helper.create_access_token( + data={"sub": form_data.username}, expires_delta=timedelta(minutes=15) + ) + crud.set_user_last_token(db, form_data.username, access_token) + return {"access_token": access_token, "token_type": "bearer"} + +@app.get("/users/acesslist/", response_model=List[schemas.RoomOverview], tags=['Users']) +def get_iot_access_list_for_user(db: Session = Depends(get_db), current_user: schemas.User = Depends(get_current_active_user)): + user = crud.get_user_by_username(db, current_user.username) + access_list = list() + for device in user.authorized_devices: + dev_db : models.IotEntity = device + sensors = crud.get_room_data_now(db) + if not sensors: raise HTTPException(status_code=500, detail="No Room link") + entry : schemas.RoomOverview = schemas.RoomOverview( + id=dev_db.id, + description=dev_db.description, + bluetooth_mac=dev_db.bluetooth_mac, + open_request=dev_db.open_request, + time_seconds=dev_db.time_seconds, + acces_list_counter=dev_db.acces_list_counter, + humidity=sensors.humidity, + people=sensors.people, + temperature=sensors.temperature, + smoke_sensor_reading=sensors.smoke_sensor_reading + ) + access_list.append(entry) + return access_list + @app.get("/admin/users/", response_model=List[schemas.User], tags=['Admin']) def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): users = crud.get_users(db, skip=skip, limit=limit) @@ -196,71 +256,10 @@ def get_access_log_history_for_user(request : schemas.UserAccessLogRequestUserna if not user: raise HTTPException(status_code=404, detail="User not found") return crud.get_access_log_for_user_by_id(db, user.id) -@app.get("/users/acesslist/", response_model=List[schemas.RoomOverview], tags=['Users']) -def get_iot_access_list_for_user(db: Session = Depends(get_db), current_user: schemas.User = Depends(get_current_active_user)): - user = crud.get_user_by_username(db, current_user.username) - access_list = list() - for device in user.authorized_devices: - dev_db : models.IotEntity = device - sensors = crud.get_room_data_now(db) - if not sensors: raise HTTPException(status_code=500, detail="No Room link") - entry : schemas.RoomOverview = schemas.RoomOverview( - id=dev_db.id, - description=dev_db.description, - bluetooth_mac=dev_db.bluetooth_mac, - open_request=dev_db.open_request, - time_seconds=dev_db.time_seconds, - acces_list_counter=dev_db.acces_list_counter, - humidity=sensors.humidity, - people=sensors.people, - temperature=sensors.temperature, - smoke_sensor_reading=sensors.smoke_sensor_reading - ) - access_list.append(entry) - return access_list - @app.get("/admin/roominfo/now/", tags=['Admin']) def get_room_data(db: Session = Depends(get_db)): return crud.get_room_data_now(db) -@app.post("/users/open", tags=['Users']) -def issue_open_door_command(command: schemas.OpenDoorRequestTime, - db: Session = Depends(get_db), - current_user: schemas.User = Depends(get_current_active_user)): - err = HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, - detail="Unauthrized to open") - device = crud.get_iot_entity_by_bluetooth_mac(db, command.bluetooth_mac) - if not device: raise err - # TODO: Use database search rather then this linear search - user = crud.get_user(db, current_user.id) - for dev in user.authorized_devices: - if dev.bluetooth_mac == device.bluetooth_mac: - crud.set_open_door_request(db, device.id, command.time_seconds) - log_entry = schemas.DoorAccessLog(user_id=current_user.id, - door_bluetooth_mac=command.bluetooth_mac, - time=datetime.now()) - crud.record_door_access_log(db, log_entry) - return device - raise err - -@app.post("/users/tkn", response_model=schemas.Token, tags=['Users']) -@app.post("/tkn", response_model=schemas.Token, tags=['Users']) -def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)): - user = auth_helper.authenticate_user(db, form_data.username, form_data.password) - if not user: - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Incorrect username or password", - headers={"WWW-Authenticate": "Bearer"}, - ) - #access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) - access_token = auth_helper.create_access_token( - data={"sub": form_data.username}, expires_delta=timedelta(minutes=15) - ) - crud.set_user_last_token(db, form_data.username, access_token) - return {"access_token": access_token, "token_type": "bearer"} - - @app.post("/iotdevice/door/status", response_model=schemas.IotDoorPollingResponse, tags=['Iot']) def polling_method_for_iot_entity(request: schemas.IotDoorPollingRequest, db: Session = Depends(get_db)):