endpoints.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. from fastapi import FastAPI, Form, Request, Depends, HTTPException, Body
  2. from pydantic import BaseModel
  3. from fastapi.responses import JSONResponse, RedirectResponse, HTMLResponse
  4. from fastapi.templating import Jinja2Templates
  5. from fastapi.staticfiles import StaticFiles
  6. from fastapi.middleware.cors import CORSMiddleware
  7. from starlette.middleware.sessions import SessionMiddleware
  8. import secrets
  9. import uvicorn
  10. import logging
  11. import requests # Import the requests library here
  12. from typing import Annotated, Dict, List, Optional #import typing
  13. # --- Keycloak Configuration ---
  14. KEYCLOAK_SERVER_URL = "http://165.22.75.145:8080" # Double-check this! Changed to http
  15. KEYCLOAK_REALM = "Generation"
  16. KEYCLOAK_CLIENT_ID = "web-app-pw"
  17. KEYCLOAK_CLIENT_SECRET = "fQGWt8HSPn65cCKTOzE5FigqZhf8QTYW"
  18. KEYCLOAK_SCOPE = "codicefiscale email openid ruolo"
  19. # --- App Initialization ---
  20. app = FastAPI()
  21. # --- Logging Configuration ---
  22. logging.basicConfig(level=logging.INFO)
  23. logger = logging.getLogger(__name__)
  24. # --- CORS ---
  25. # origins = [
  26. # "http://localhost:8000/*",
  27. # "http://localhost:3000/*",
  28. # ]
  29. app.add_middleware(
  30. CORSMiddleware,
  31. allow_origins=["*"],
  32. allow_credentials=True,
  33. allow_methods=["*"],
  34. allow_headers=["*"],
  35. )
  36. # --- Session ---
  37. app.add_middleware(SessionMiddleware, secret_key=secrets.token_hex(32))
  38. # --- Helper Functions (modified to be part of the API) ---
  39. # Function to get tokens from keycloak
  40. def get_token_from_keycloak(username, password) -> Dict:
  41. """Retrieves access and refresh tokens from Keycloak."""
  42. url = f"{KEYCLOAK_SERVER_URL}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/token"
  43. payload = f'grant_type=password&client_id={KEYCLOAK_CLIENT_ID}&scope={KEYCLOAK_SCOPE}&username={username}&password={password}&client_secret={KEYCLOAK_CLIENT_SECRET}'
  44. headers = {
  45. 'Content-Type': 'application/x-www-form-urlencoded'
  46. }
  47. try:
  48. response = requests.post(url, headers=headers, data=payload, timeout=10)
  49. response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
  50. return response.json()
  51. except requests.exceptions.RequestException as e:
  52. logger.error(f"Error getting token from Keycloak: {e}")
  53. raise HTTPException(status_code=500, detail=f"Failed to get token from Keycloak: {e}") from e
  54. def refresh_token_from_keycloak(refresh_token: str) -> Dict:
  55. """Refreshes the access token using the refresh token."""
  56. url = f"{KEYCLOAK_SERVER_URL}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/token"
  57. payload = f'grant_type=refresh_token&client_id={KEYCLOAK_CLIENT_ID}&client_secret={KEYCLOAK_CLIENT_SECRET}&refresh_token={refresh_token}'
  58. headers = {
  59. 'Content-Type': 'application/x-www-form-urlencoded'
  60. }
  61. try:
  62. response = requests.post(url, headers=headers, data=payload, timeout=10)
  63. response.raise_for_status()
  64. return response.json()
  65. except requests.exceptions.RequestException as e:
  66. logger.error(f"Error refreshing token: {e}")
  67. raise HTTPException(status_code=500, detail=f"Failed to refresh token: {e}") from e
  68. def introspect_keycloak_token_request(access_token: str) -> Dict:
  69. """Introspects a Keycloak token to verify if it's active."""
  70. url = f"{KEYCLOAK_SERVER_URL}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/token/introspect"
  71. payload = f'token={access_token}&client_id={KEYCLOAK_CLIENT_ID}&client_secret={KEYCLOAK_CLIENT_SECRET}'
  72. headers = {
  73. 'Content-Type': 'application/x-www-form-urlencoded'
  74. }
  75. try:
  76. response = requests.post(url, headers=headers, data=payload, verify=False, timeout=10)
  77. response.raise_for_status()
  78. return response.json()
  79. except requests.exceptions.RequestException as e:
  80. logger.error(f"Error introspecting token: {e}")
  81. raise HTTPException(status_code=500, detail=f"Failed to introspect token: {e}") from e
  82. def get_user_info_from_keycloak(access_token: str) -> Dict:
  83. """Gets user information from Keycloak using the access token."""
  84. url = f"{KEYCLOAK_SERVER_URL}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/userinfo"
  85. headers = {
  86. 'Authorization': f'Bearer {access_token}'
  87. }
  88. try:
  89. response = requests.get(url, headers=headers, timeout=10)
  90. response.raise_for_status()
  91. return response.json()
  92. except requests.exceptions.RequestException as e:
  93. logger.error(f"Error getting user info: {e}")
  94. raise HTTPException(status_code=500, detail=f"Failed to get user info: {e}") from e
  95. def logout_keycloak(refresh_token:str):
  96. url = f"{KEYCLOAK_SERVER_URL}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/logout"
  97. payload = f'refresh_token={refresh_token}&client_id={KEYCLOAK_CLIENT_ID}&client_secret={KEYCLOAK_CLIENT_SECRET}'
  98. headers = {
  99. 'Content-Type': 'application/x-www-form-urlencoded'
  100. }
  101. try:
  102. response = requests.request("POST", url, headers=headers, data=payload)
  103. response.raise_for_status()
  104. return response.json()
  105. except requests.exceptions.RequestException as e:
  106. logger.error(f"Error logging out user: {e}")
  107. raise HTTPException(status_code=500, detail=f"Failed to logout user: {e}") from e
  108. class Credenziali(BaseModel):
  109. username: str
  110. password: str
  111. app.mount("/static", StaticFiles(directory="static"), name="static")
  112. templates = Jinja2Templates(directory="templates")
  113. @app.get("/callback")
  114. async def callback(request: Request):
  115. return templates.TemplateResponse("mappa_logout.html", context={"request": request, "title": "Mappa"})
  116. @app.get("/access")
  117. @app.post("/access")
  118. async def login(request: Request, username: Optional[str] = Form(None), password: Optional[str] = Form(None)):
  119. error = None
  120. if request.method == "POST":
  121. try:
  122. request.session["user_info"] = {}
  123. #print(request.session.get("tokens"))
  124. tokens = get_token_from_keycloak(username, password)#request.body.json()["username"], request.json()["password"])
  125. # Save tokens and information to the session
  126. request.session["access_token"] = tokens["access_token"]
  127. request.session["refresh_token"] = tokens["refresh_token"]
  128. #print(tokens["access_token"])
  129. # Get user info and save it to the session too
  130. user_info = get_user_info_from_keycloak(tokens["access_token"])
  131. request.session["user_info"] = user_info
  132. return RedirectResponse(url=f"/callback?access_token={tokens["access_token"]}&refresh_token={tokens["refresh_token"]}", status_code=303)
  133. except HTTPException as e:
  134. return JSONResponse(content={"detail": e.detail}, status_code=e.status_code)
  135. except Exception as e:
  136. logger.error(f"An unexpected error occurred during login: {e}")
  137. return JSONResponse(content={"detail": "An unexpected error occurred"}, status_code=500)
  138. if request.session.get("access_token") is not None:
  139. #print(request.session.get("access_token"))
  140. if introspect_keycloak_token_request(str(request.session.get("access_token")))["active"] == True:
  141. #print(3)
  142. return RedirectResponse(url="/callback", status_code=303)
  143. try:
  144. tokens = refresh_token_from_keycloak(str(request.session.get("refresh_token")))
  145. #localStorage.setItem('jwtToken', tuoJWT);
  146. request.session["access_token"] = tokens["access_token"]
  147. request.session["refresh_token"] = tokens["refresh_token"]
  148. return RedirectResponse(url=f"/callback?access_token={tokens["access_token"]}&refresh_token={tokens["refresh_token"]}", status_code=303)
  149. except Exception as e:
  150. logger.error(f"An unexpected error occurred during login: {e}")
  151. return JSONResponse(content={"detail": "An unexpected error occurred"}, status_code=500)
  152. return templates.TemplateResponse("login.html", {"request": request, "error": error})