from fastapi import FastAPI, Form, Request, HTTPException from pydantic import BaseModel from fastapi.responses import JSONResponse, RedirectResponse from fastapi.templating import Jinja2Templates from fastapi.staticfiles import StaticFiles from fastapi.middleware.cors import CORSMiddleware from starlette.middleware.sessions import SessionMiddleware import secrets import uvicorn import logging import requests from typing import Dict, Optional # --- Keycloak Configuration --- KEYCLOAK_SERVER_URL = "http://165.22.75.145:8080" KEYCLOAK_REALM = "Generation" KEYCLOAK_CLIENT_ID = "web-app-pw" KEYCLOAK_CLIENT_SECRET = "fQGWt8HSPn65cCKTOzE5FigqZhf8QTYW" KEYCLOAK_SCOPE = "codicefiscale email openid ruolo" # --- App Initialization --- app = FastAPI() # --- Logging Configuration --- logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # --- Session --- app.add_middleware(SessionMiddleware, secret_key=secrets.token_hex(32)) # --- Helper Functions (modified to be part of the API) --- # Function to get tokens from keycloak def get_token_from_keycloak(username, password) -> Dict: """Retrieves access and refresh tokens from Keycloak.""" url = f"{KEYCLOAK_SERVER_URL}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/token" payload = f'grant_type=password&client_id={KEYCLOAK_CLIENT_ID}&scope={KEYCLOAK_SCOPE}&username={username}&password={password}&client_secret={KEYCLOAK_CLIENT_SECRET}' headers = { 'Content-Type': 'application/x-www-form-urlencoded' } try: response = requests.post(url, headers=headers, data=payload, timeout=10) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: logger.error(f"Error getting token from Keycloak: {e}") raise HTTPException(status_code=500, detail=f"Failed to get token from Keycloak: {e}") from e def refresh_token_from_keycloak(refresh_token: str) -> Dict: """Refreshes the access token using the refresh token.""" url = f"{KEYCLOAK_SERVER_URL}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/token" payload = f'grant_type=refresh_token&client_id={KEYCLOAK_CLIENT_ID}&client_secret={KEYCLOAK_CLIENT_SECRET}&refresh_token={refresh_token}' headers = { 'Content-Type': 'application/x-www-form-urlencoded' } try: response = requests.post(url, headers=headers, data=payload, timeout=10) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: logger.error(f"Error refreshing token: {e}") raise HTTPException(status_code=500, detail=f"Failed to refresh token: {e}") from e def introspect_keycloak_token_request(access_token: str) -> Dict: """Introspects a Keycloak token to verify if it's active.""" url = f"{KEYCLOAK_SERVER_URL}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/token/introspect" payload = f'token={access_token}&client_id={KEYCLOAK_CLIENT_ID}&client_secret={KEYCLOAK_CLIENT_SECRET}' headers = { 'Content-Type': 'application/x-www-form-urlencoded' } try: response = requests.post(url, headers=headers, data=payload, verify=False, timeout=10) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: logger.error(f"Error introspecting token: {e}") raise HTTPException(status_code=500, detail=f"Failed to introspect token: {e}") from e def get_user_info_from_keycloak(access_token: str) -> Dict: """Gets user information from Keycloak using the access token.""" url = f"{KEYCLOAK_SERVER_URL}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/userinfo" headers = { 'Authorization': f'Bearer {access_token}' } try: response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: logger.error(f"Error getting user info: {e}") raise HTTPException(status_code=500, detail=f"Failed to get user info: {e}") from e def logout_keycloak(refresh_token:str): url = f"{KEYCLOAK_SERVER_URL}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/logout" payload = f'refresh_token={refresh_token}&client_id={KEYCLOAK_CLIENT_ID}&client_secret={KEYCLOAK_CLIENT_SECRET}' headers = { 'Content-Type': 'application/x-www-form-urlencoded' } try: response = requests.request("POST", url, headers=headers, data=payload) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: logger.error(f"Error logging out user: {e}") raise HTTPException(status_code=500, detail=f"Failed to logout user: {e}") from e app.mount("/static", StaticFiles(directory="static"), name="static") templates = Jinja2Templates(directory="templates") @app.get("/mappa_login") async def mappa_login(request: Request): request.session.clear() return templates.TemplateResponse("mappa_login.html", {"request": request}) @app.get("/tfo") async def tfo(request: Request): if "access_token" in request.session.keys(): if introspect_keycloak_token_request(str(request.session.get("access_token")))["active"] == True: return templates.TemplateResponse("TFO.html", {"request": request}) try: tokens = refresh_token_from_keycloak(str(request.session.get("refresh_token"))) request.session.update({"access_token" : tokens["access_token"], "refresh_token" : tokens["refresh_token"]}) user_info = get_user_info_from_keycloak(tokens["access_token"]) request.session.update( {"ruolo": user_info["ruolo"], "codice_fiscale": user_info["CF"]} ) return RedirectResponse(url=f"/tfo?ruolo={user_info["ruolo"]}&codice_fiscale={user_info["CF"]}", status_code=303) except Exception as e: request.session.clear() return RedirectResponse(url="/access", status_code=303) request.session.clear() return RedirectResponse(url="/access", status_code=303) @app.get("/instructions") async def instructions(request: Request): if "access_token" in request.session.keys(): if introspect_keycloak_token_request(str(request.session.get("access_token")))["active"] == True: return templates.TemplateResponse("instructions.html", {"request": request}) try: tokens = refresh_token_from_keycloak(str(request.session.get("refresh_token"))) request.session.update({"access_token" : tokens["access_token"], "refresh_token" : tokens["refresh_token"]}) user_info = get_user_info_from_keycloak(tokens["access_token"]) request.session.update( {"ruolo": user_info["ruolo"], "codice_fiscale": user_info["CF"]} ) return RedirectResponse(url=f"/instructions?ruolo={user_info["ruolo"]}&codice_fiscale={user_info["CF"]}", status_code=303) except Exception as e: request.session.clear() return RedirectResponse(url="/access", status_code=303) request.session.clear() return RedirectResponse(url="/access", status_code=303) @app.get("/buildings") async def buildings(request: Request): if "access_token" in request.session.keys(): if introspect_keycloak_token_request(str(request.session.get("access_token")))["active"] == True: return templates.TemplateResponse("buildings.html", {"request": request}) try: tokens = refresh_token_from_keycloak(str(request.session.get("refresh_token"))) request.session.update({"access_token" : tokens["access_token"], "refresh_token" : tokens["refresh_token"]}) user_info = get_user_info_from_keycloak(tokens["access_token"]) request.session.update( {"ruolo": user_info["ruolo"], "codice_fiscale": user_info["CF"]} ) return RedirectResponse(url=f"/buildings?ruolo={user_info["ruolo"]}&codice_fiscale={user_info["CF"]}", status_code=303) except Exception as e: request.session.clear() return RedirectResponse(url="/access", status_code=303) request.session.clear() return RedirectResponse(url="/access", status_code=303) @app.get("/faq") async def faq(request: Request): if "access_token" in request.session.keys(): if introspect_keycloak_token_request(str(request.session.get("access_token")))["active"] == True: return templates.TemplateResponse("faq.html", {"request": request}) try: tokens = refresh_token_from_keycloak(str(request.session.get("refresh_token"))) request.session.update({"access_token" : tokens["access_token"], "refresh_token" : tokens["refresh_token"]}) user_info = get_user_info_from_keycloak(tokens["access_token"]) request.session.update( {"ruolo": user_info["ruolo"], "codice_fiscale": user_info["CF"]} ) return RedirectResponse(url=f"/faq?ruolo={user_info["ruolo"]}&codice_fiscale={user_info["CF"]}", status_code=303) except Exception as e: request.session.clear() return RedirectResponse(url="/access", status_code=303) request.session.clear() return RedirectResponse(url="/access", status_code=303) @app.get("/techsup") async def techsup(request: Request): if "access_token" in request.session.keys(): if introspect_keycloak_token_request(str(request.session.get("access_token")))["active"] == True: return templates.TemplateResponse("techsup.html", {"request": request}) try: tokens = refresh_token_from_keycloak(str(request.session.get("refresh_token"))) request.session.update({"access_token" : tokens["access_token"], "refresh_token" : tokens["refresh_token"]}) user_info = get_user_info_from_keycloak(tokens["access_token"]) request.session.update( {"ruolo": user_info["ruolo"], "codice_fiscale": user_info["CF"]} ) return RedirectResponse(url=f"/techsup?ruolo={user_info["ruolo"]}&codice_fiscale={user_info["CF"]}", status_code=303) except Exception as e: request.session.clear() return RedirectResponse(url="/access", status_code=303) request.session.clear() return RedirectResponse(url="/access", status_code=303) @app.get("/callback") async def callback(request: Request): if "access_token" in request.session.keys(): if introspect_keycloak_token_request(str(request.session.get("access_token")))["active"] == True: return templates.TemplateResponse("mappa_logout.html", {"request": request}) try: tokens = refresh_token_from_keycloak(str(request.session.get("refresh_token"))) request.session.update({"access_token" : tokens["access_token"], "refresh_token" : tokens["refresh_token"]}) user_info = get_user_info_from_keycloak(tokens["access_token"]) request.session.update( {"ruolo": user_info["ruolo"], "codice_fiscale": user_info["CF"]} ) return RedirectResponse(url=f"/callback?ruolo={user_info["ruolo"]}&codice_fiscale={user_info["CF"]}", status_code=303) except Exception as e: request.session.clear() return RedirectResponse(url="/access", status_code=303) request.session.clear() return RedirectResponse(url="/access", status_code=303) @app.get("/admin") async def admin(request: Request): if "access_token" in request.session.keys(): if introspect_keycloak_token_request(str(request.session.get("access_token")))["active"] == True: if get_user_info_from_keycloak(str(request.session.get("access_token")))["ruolo"] == "amministratore": return templates.TemplateResponse("ADMIN.html", {"request": request}) return RedirectResponse(url="/callback", status_code=303) try: tokens = refresh_token_from_keycloak(str(request.session.get("refresh_token"))) request.session.update({"access_token" : tokens["access_token"], "refresh_token" : tokens["refresh_token"]}) user_info = get_user_info_from_keycloak(tokens["access_token"]) request.session.update( {"ruolo": user_info["ruolo"], "codice_fiscale": user_info["CF"]} ) return RedirectResponse(url=f"/admin?ruolo={user_info["ruolo"]}&codice_fiscale={user_info["CF"]}", status_code=303) except Exception as e: request.session.clear() return RedirectResponse(url="/access", status_code=303) request.session.clear() return RedirectResponse(url="/access", status_code=303) @app.get("/access") @app.post("/access") async def login(request: Request, username: Optional[str] = Form(None), password: Optional[str] = Form(None)): error = None if request.method == "POST": try: request.session.clear() tokens = get_token_from_keycloak(username, password) request.session.update( {"access_token": tokens["access_token"], "refresh_token": tokens["refresh_token"]} ) user_info = get_user_info_from_keycloak(tokens["access_token"]) request.session.update( {"ruolo": user_info["ruolo"], "codice_fiscale": user_info["CF"]} ) return RedirectResponse(url=f"/callback?ruolo={user_info["ruolo"]}&codice_fiscale={user_info["CF"]}", status_code=303) except HTTPException as e: return JSONResponse(content={"detail": e.detail}, status_code=e.status_code) except Exception as e: logger.error(f"An unexpected error occurred during login: {e}") return JSONResponse(content={"detail": "An unexpected error occurred"}, status_code=500) if "access_token" in request.session.keys(): if introspect_keycloak_token_request(str(request.session.get("access_token")))["active"] == True: return RedirectResponse(url="/callback", status_code=303) try: tokens = refresh_token_from_keycloak(str(request.session.get("refresh_token"))) request.session.update({"access_token" : tokens["access_token"], "refresh_token" : tokens["refresh_token"]}) user_info = get_user_info_from_keycloak(tokens["access_token"]) request.session.update( {"ruolo": user_info["ruolo"], "codice_fiscale": user_info["CF"]} ) return RedirectResponse(url=f"/callback?ruolo={user_info["ruolo"]}&codice_fiscale={user_info["CF"]}", status_code=303) except Exception as e: request.session.clear() return templates.TemplateResponse("login.html", {"request": request, "error": error}) request.session.clear() return templates.TemplateResponse("login.html", {"request": request, "error": error}) @app.get("/refresh") async def refresh(request: Request): """Refreshes the access token using the refresh token in the session.""" refresh_token = request.session.get("refresh_token") if not refresh_token: raise HTTPException(status_code=401, detail="Refresh token not found in session") try: new_tokens = refresh_token_from_keycloak(refresh_token) request.session["access_token"] = new_tokens["access_token"] request.session["refresh_token"] = new_tokens["refresh_token"] new_user_info = get_user_info_from_keycloak(new_tokens["access_token"]) request.session["user_info"] = new_user_info return {"message": "Token refreshed successfully", "access_token": new_tokens["access_token"], "user_info": new_user_info} except HTTPException as e: return JSONResponse(content={"detail": e.detail}, status_code=e.status_code) except Exception as e: logger.error(f"An unexpected error occurred during token refresh: {e}") return JSONResponse(content={"detail": "An unexpected error occurred"}, status_code=500) @app.get("/introspect") async def introspect(request: Request): """Introspects the access token in the session.""" access_token = request.session.get("access_token") if not access_token: raise HTTPException(status_code=401, detail="Access token not found in session") try: introspect_data = introspect_keycloak_token_request(access_token) return {"message": "Token introspection successful", "introspect_data": introspect_data} except HTTPException as e: return JSONResponse(content={"detail": e.detail}, status_code=e.status_code) except Exception as e: logger.error(f"An unexpected error occurred during token introspection: {e}") return JSONResponse(content={"detail": "An unexpected error occurred"}, status_code=500) @app.get("/userinfo") async def user_info(request: Request): """Retrieves and returns user information stored in the session.""" user_info = request.session.get("user_info") if not user_info: raise HTTPException(status_code=401, detail="User info not found in session") return {"message": "User information retrieved", "user_info": user_info} @app.get("/logout_keycloak") async def logout_user(request: Request): """Logs out a user by revoking the refresh token.""" refresh_token = request.session.get("refresh_token") if not refresh_token: raise HTTPException(status_code=401, detail="Refresh token not found in session") try: logout_keycloak(refresh_token) request.session.clear() return {"message": "Logout successful"} except HTTPException as e: return JSONResponse(content={"detail": e.detail}, status_code=e.status_code) except Exception as e: logger.error(f"An unexpected error occurred during logout: {e}") return JSONResponse(content={"detail": "An unexpected error occurred"}, status_code=500) @app.get("/protected") async def protected_endpoint(request: Request): """A protected endpoint that requires a valid access token.""" access_token = request.session.get("access_token") if not access_token: raise HTTPException(status_code=401, detail="Access token not found in session") try: introspect_data = introspect_keycloak_token_request(access_token) if not introspect_data.get("active"): raise HTTPException(status_code=401, detail="Access token is not active") return JSONResponse({"message": f"Hello, world! (Protected)", "introspect": introspect_data}) except HTTPException as e: return JSONResponse(content={"detail": e.detail}, status_code=e.status_code) except Exception as e: logger.error(f"An unexpected error occurred during token introspection: {e}") return JSONResponse(content={"detail": "An unexpected error occurred"}, status_code=500) # --- Run the App --- if __name__ == "__main__": uvicorn.run(app, host="localhost", port=8000)