123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193 |
- from fastapi import FastAPI, Form, Request, Depends, HTTPException, Body
- from pydantic import BaseModel
- from fastapi.responses import JSONResponse, RedirectResponse, HTMLResponse
- from fastapi.templating import Jinja2Templates
- from fastapi.staticfiles import StaticFiles
- from fastapi.middleware.cors import CORSMiddleware
- from starlette.middleware.sessions import SessionMiddleware
- import secrets
- import uvicorn
- import logging
- import requests # Import the requests library here
- from typing import Annotated, Dict, List, Optional #import typing
- # --- Keycloak Configuration ---
- KEYCLOAK_SERVER_URL = "http://165.22.75.145:8080" # Double-check this! Changed to http
- KEYCLOAK_REALM = "Generation"
- KEYCLOAK_CLIENT_ID = "web-app-pw"
- KEYCLOAK_CLIENT_SECRET = "fQGWt8HSPn65cCKTOzE5FigqZhf8QTYW"
- KEYCLOAK_SCOPE = "codicefiscale email openid ruolo"
- # --- App Initialization ---
- app = FastAPI()
- # --- Logging Configuration ---
- logging.basicConfig(level=logging.INFO)
- logger = logging.getLogger(__name__)
- # --- CORS ---
- # origins = [
- # "http://localhost:8000/*",
- # "http://localhost:3000/*",
- # ]
- app.add_middleware(
- CORSMiddleware,
- allow_origins=["*"],
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- )
- # --- Session ---
- app.add_middleware(SessionMiddleware, secret_key=secrets.token_hex(32))
- # --- Helper Functions (modified to be part of the API) ---
- # Function to get tokens from keycloak
- def get_token_from_keycloak(username, password) -> Dict:
- """Retrieves access and refresh tokens from Keycloak."""
- url = f"{KEYCLOAK_SERVER_URL}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/token"
- payload = f'grant_type=password&client_id={KEYCLOAK_CLIENT_ID}&scope={KEYCLOAK_SCOPE}&username={username}&password={password}&client_secret={KEYCLOAK_CLIENT_SECRET}'
- headers = {
- 'Content-Type': 'application/x-www-form-urlencoded'
- }
- try:
- response = requests.post(url, headers=headers, data=payload, timeout=10)
- response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
- return response.json()
- except requests.exceptions.RequestException as e:
- logger.error(f"Error getting token from Keycloak: {e}")
- raise HTTPException(status_code=500, detail=f"Failed to get token from Keycloak: {e}") from e
- def refresh_token_from_keycloak(refresh_token: str) -> Dict:
- """Refreshes the access token using the refresh token."""
- url = f"{KEYCLOAK_SERVER_URL}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/token"
- payload = f'grant_type=refresh_token&client_id={KEYCLOAK_CLIENT_ID}&client_secret={KEYCLOAK_CLIENT_SECRET}&refresh_token={refresh_token}'
- headers = {
- 'Content-Type': 'application/x-www-form-urlencoded'
- }
- try:
- response = requests.post(url, headers=headers, data=payload, timeout=10)
- response.raise_for_status()
- return response.json()
- except requests.exceptions.RequestException as e:
- logger.error(f"Error refreshing token: {e}")
- raise HTTPException(status_code=500, detail=f"Failed to refresh token: {e}") from e
- def introspect_keycloak_token_request(access_token: str) -> Dict:
- """Introspects a Keycloak token to verify if it's active."""
- url = f"{KEYCLOAK_SERVER_URL}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/token/introspect"
- payload = f'token={access_token}&client_id={KEYCLOAK_CLIENT_ID}&client_secret={KEYCLOAK_CLIENT_SECRET}'
- headers = {
- 'Content-Type': 'application/x-www-form-urlencoded'
- }
- try:
- response = requests.post(url, headers=headers, data=payload, verify=False, timeout=10)
- response.raise_for_status()
- return response.json()
- except requests.exceptions.RequestException as e:
- logger.error(f"Error introspecting token: {e}")
- raise HTTPException(status_code=500, detail=f"Failed to introspect token: {e}") from e
- def get_user_info_from_keycloak(access_token: str) -> Dict:
- """Gets user information from Keycloak using the access token."""
- url = f"{KEYCLOAK_SERVER_URL}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/userinfo"
- headers = {
- 'Authorization': f'Bearer {access_token}'
- }
- try:
- response = requests.get(url, headers=headers, timeout=10)
- response.raise_for_status()
- return response.json()
- except requests.exceptions.RequestException as e:
- logger.error(f"Error getting user info: {e}")
- raise HTTPException(status_code=500, detail=f"Failed to get user info: {e}") from e
- def logout_keycloak(refresh_token:str):
- url = f"{KEYCLOAK_SERVER_URL}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/logout"
- payload = f'refresh_token={refresh_token}&client_id={KEYCLOAK_CLIENT_ID}&client_secret={KEYCLOAK_CLIENT_SECRET}'
- headers = {
- 'Content-Type': 'application/x-www-form-urlencoded'
- }
- try:
- response = requests.request("POST", url, headers=headers, data=payload)
- response.raise_for_status()
- return response.json()
- except requests.exceptions.RequestException as e:
- logger.error(f"Error logging out user: {e}")
- raise HTTPException(status_code=500, detail=f"Failed to logout user: {e}") from e
- class Credenziali(BaseModel):
- username: str
- password: str
- app.mount("/static", StaticFiles(directory="static"), name="static")
- templates = Jinja2Templates(directory="templates")
- @app.get("/callback")
- async def callback(request: Request):
- return templates.TemplateResponse("mappa_logout.html", context={"request": request, "title": "Mappa"})
- @app.get("/access")
- @app.post("/access")
- async def login(request: Request, username: Optional[str] = Form(None), password: Optional[str] = Form(None)):
- error = None
- if request.method == "POST":
- try:
- request.session["user_info"] = {}
- #print(request.session.get("tokens"))
-
- tokens = get_token_from_keycloak(username, password)#request.body.json()["username"], request.json()["password"])
-
- # Save tokens and information to the session
- request.session["access_token"] = tokens["access_token"]
- request.session["refresh_token"] = tokens["refresh_token"]
- #print(tokens["access_token"])
- # Get user info and save it to the session too
- user_info = get_user_info_from_keycloak(tokens["access_token"])
- request.session["user_info"] = user_info
- return RedirectResponse(url=f"/callback?access_token={tokens["access_token"]}&refresh_token={tokens["refresh_token"]}", status_code=303)
- except HTTPException as e:
- return JSONResponse(content={"detail": e.detail}, status_code=e.status_code)
- except Exception as e:
- logger.error(f"An unexpected error occurred during login: {e}")
- return JSONResponse(content={"detail": "An unexpected error occurred"}, status_code=500)
-
- if request.session.get("access_token") is not None:
- #print(request.session.get("access_token"))
- if introspect_keycloak_token_request(str(request.session.get("access_token")))["active"] == True:
- #print(3)
- return RedirectResponse(url="/callback", status_code=303)
- try:
- tokens = refresh_token_from_keycloak(str(request.session.get("refresh_token")))
- #localStorage.setItem('jwtToken', tuoJWT);
- request.session["access_token"] = tokens["access_token"]
- request.session["refresh_token"] = tokens["refresh_token"]
- return RedirectResponse(url=f"/callback?access_token={tokens["access_token"]}&refresh_token={tokens["refresh_token"]}", status_code=303)
- except Exception as e:
- logger.error(f"An unexpected error occurred during login: {e}")
- return JSONResponse(content={"detail": "An unexpected error occurred"}, status_code=500)
-
- return templates.TemplateResponse("login.html", {"request": request, "error": error})
|