|
@@ -109,7 +109,6 @@ def logout_keycloak(refresh_token:str):
|
|
try:
|
|
try:
|
|
response = requests.request("POST", url, headers=headers, data=payload)
|
|
response = requests.request("POST", url, headers=headers, data=payload)
|
|
response.raise_for_status()
|
|
response.raise_for_status()
|
|
- return response.json()
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Error logging out user: {e}")
|
|
logger.error(f"Error logging out user: {e}")
|
|
raise HTTPException(status_code=500, detail=f"Failed to logout user: {e}") from e
|
|
raise HTTPException(status_code=500, detail=f"Failed to logout user: {e}") from e
|
|
@@ -120,6 +119,8 @@ templates = Jinja2Templates(directory="templates")
|
|
|
|
|
|
@app.get("/mappa_login")
|
|
@app.get("/mappa_login")
|
|
async def mappa_login(request: Request):
|
|
async def mappa_login(request: Request):
|
|
|
|
+ if "access_token" in request.session.keys() and request.session.get("access_token") is not None and request.session.get("refresh_token") != "":
|
|
|
|
+ logout_keycloak(str(request.session.get("refresh_token")))
|
|
request.session.clear()
|
|
request.session.clear()
|
|
return templates.TemplateResponse("mappa_login.html", {"request": request})
|
|
return templates.TemplateResponse("mappa_login.html", {"request": request})
|
|
|
|
|
|
@@ -327,90 +328,12 @@ async def login(request: Request, username: Optional[str] = Form(None), password
|
|
return RedirectResponse(url=f"/callback?ruolo={user_info["ruolo"]}&codice_fiscale={user_info["CF"]}", status_code=303)
|
|
return RedirectResponse(url=f"/callback?ruolo={user_info["ruolo"]}&codice_fiscale={user_info["CF"]}", status_code=303)
|
|
|
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
|
+ if "access_token" in request.session.keys() and request.session.get("access_token") is not None and request.session.get("refresh_token") != "":
|
|
|
|
+ logout_keycloak(str(request.session.get("refresh_token")))
|
|
request.session.clear()
|
|
request.session.clear()
|
|
- return templates.TemplateResponse("login.html", {"request": request, "error": error})
|
|
|
|
-
|
|
|
|
|
|
+ return RedirectResponse(url="/access", status_code=303)
|
|
request.session.clear()
|
|
request.session.clear()
|
|
- return templates.TemplateResponse("login.html", {"request": request, "error": error})
|
|
|
|
-
|
|
|
|
-@app.get("/refresh")
|
|
|
|
-async def refresh(request: Request):
|
|
|
|
- """Refreshes the access token using the refresh token in the session."""
|
|
|
|
- refresh_token = request.session.get("refresh_token")
|
|
|
|
- if not refresh_token:
|
|
|
|
- raise HTTPException(status_code=401, detail="Refresh token not found in session")
|
|
|
|
-
|
|
|
|
- try:
|
|
|
|
- new_tokens = refresh_token_from_keycloak(refresh_token)
|
|
|
|
- request.session["access_token"] = new_tokens["access_token"]
|
|
|
|
- request.session["refresh_token"] = new_tokens["refresh_token"]
|
|
|
|
- new_user_info = get_user_info_from_keycloak(new_tokens["access_token"])
|
|
|
|
- request.session["user_info"] = new_user_info
|
|
|
|
- return {"message": "Token refreshed successfully", "access_token": new_tokens["access_token"], "user_info": new_user_info}
|
|
|
|
- except HTTPException as e:
|
|
|
|
- return JSONResponse(content={"detail": e.detail}, status_code=e.status_code)
|
|
|
|
- except Exception as e:
|
|
|
|
- logger.error(f"An unexpected error occurred during token refresh: {e}")
|
|
|
|
- return JSONResponse(content={"detail": "An unexpected error occurred"}, status_code=500)
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-@app.get("/introspect")
|
|
|
|
-async def introspect(request: Request):
|
|
|
|
- """Introspects the access token in the session."""
|
|
|
|
- access_token = request.session.get("access_token")
|
|
|
|
- if not access_token:
|
|
|
|
- raise HTTPException(status_code=401, detail="Access token not found in session")
|
|
|
|
-
|
|
|
|
- try:
|
|
|
|
- introspect_data = introspect_keycloak_token_request(access_token)
|
|
|
|
- return {"message": "Token introspection successful", "introspect_data": introspect_data}
|
|
|
|
- except HTTPException as e:
|
|
|
|
- return JSONResponse(content={"detail": e.detail}, status_code=e.status_code)
|
|
|
|
- except Exception as e:
|
|
|
|
- logger.error(f"An unexpected error occurred during token introspection: {e}")
|
|
|
|
- return JSONResponse(content={"detail": "An unexpected error occurred"}, status_code=500)
|
|
|
|
-
|
|
|
|
-@app.get("/userinfo")
|
|
|
|
-async def user_info(request: Request):
|
|
|
|
- """Retrieves and returns user information stored in the session."""
|
|
|
|
- user_info = request.session.get("user_info")
|
|
|
|
- if not user_info:
|
|
|
|
- raise HTTPException(status_code=401, detail="User info not found in session")
|
|
|
|
- return {"message": "User information retrieved", "user_info": user_info}
|
|
|
|
-
|
|
|
|
-@app.get("/logout_keycloak")
|
|
|
|
-async def logout_user(request: Request):
|
|
|
|
- """Logs out a user by revoking the refresh token."""
|
|
|
|
- refresh_token = request.session.get("refresh_token")
|
|
|
|
- if not refresh_token:
|
|
|
|
- raise HTTPException(status_code=401, detail="Refresh token not found in session")
|
|
|
|
- try:
|
|
|
|
- logout_keycloak(refresh_token)
|
|
|
|
- request.session.clear()
|
|
|
|
- return {"message": "Logout successful"}
|
|
|
|
- except HTTPException as e:
|
|
|
|
- return JSONResponse(content={"detail": e.detail}, status_code=e.status_code)
|
|
|
|
- except Exception as e:
|
|
|
|
- logger.error(f"An unexpected error occurred during logout: {e}")
|
|
|
|
- return JSONResponse(content={"detail": "An unexpected error occurred"}, status_code=500)
|
|
|
|
-
|
|
|
|
-@app.get("/protected")
|
|
|
|
-async def protected_endpoint(request: Request):
|
|
|
|
- """A protected endpoint that requires a valid access token."""
|
|
|
|
- access_token = request.session.get("access_token")
|
|
|
|
- if not access_token:
|
|
|
|
- raise HTTPException(status_code=401, detail="Access token not found in session")
|
|
|
|
- try:
|
|
|
|
- introspect_data = introspect_keycloak_token_request(access_token)
|
|
|
|
- if not introspect_data.get("active"):
|
|
|
|
- raise HTTPException(status_code=401, detail="Access token is not active")
|
|
|
|
- return JSONResponse({"message": f"Hello, world! (Protected)", "introspect": introspect_data})
|
|
|
|
- except HTTPException as e:
|
|
|
|
- return JSONResponse(content={"detail": e.detail}, status_code=e.status_code)
|
|
|
|
- except Exception as e:
|
|
|
|
- logger.error(f"An unexpected error occurred during token introspection: {e}")
|
|
|
|
- return JSONResponse(content={"detail": "An unexpected error occurred"}, status_code=500)
|
|
|
|
-
|
|
|
|
|
|
+ return templates.TemplateResponse("login.html", {"request": request, "error": error})
|
|
|
|
|
|
# --- Run the App ---
|
|
# --- Run the App ---
|
|
if __name__ == "__main__":
|
|
if __name__ == "__main__":
|