123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419 |
- from fastapi import FastAPI, Form, Request, HTTPException
- from pydantic import BaseModel
- from fastapi.responses import JSONResponse, RedirectResponse
- from fastapi.templating import Jinja2Templates
- from fastapi.staticfiles import StaticFiles
- from fastapi.middleware.cors import CORSMiddleware
- from starlette.middleware.sessions import SessionMiddleware
- import secrets
- import uvicorn
- import logging
- import requests
- from typing import Dict, Optional
- # --- Keycloak Configuration ---
- KEYCLOAK_SERVER_URL = "http://165.22.75.145:8080"
- KEYCLOAK_REALM = "Generation"
- KEYCLOAK_CLIENT_ID = "web-app-pw"
- KEYCLOAK_CLIENT_SECRET = "fQGWt8HSPn65cCKTOzE5FigqZhf8QTYW"
- KEYCLOAK_SCOPE = "codicefiscale email openid ruolo"
- # --- App Initialization ---
- app = FastAPI()
- # --- Logging Configuration ---
- logging.basicConfig(level=logging.INFO)
- logger = logging.getLogger(__name__)
- app.add_middleware(
- CORSMiddleware,
- allow_origins=["*"],
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- )
- # --- Session ---
- app.add_middleware(SessionMiddleware, secret_key=secrets.token_hex(32))
- # --- Helper Functions (modified to be part of the API) ---
- # Function to get tokens from keycloak
- def get_token_from_keycloak(username, password) -> Dict:
- """Retrieves access and refresh tokens from Keycloak."""
- url = f"{KEYCLOAK_SERVER_URL}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/token"
- payload = f'grant_type=password&client_id={KEYCLOAK_CLIENT_ID}&scope={KEYCLOAK_SCOPE}&username={username}&password={password}&client_secret={KEYCLOAK_CLIENT_SECRET}'
- headers = {
- 'Content-Type': 'application/x-www-form-urlencoded'
- }
- try:
- response = requests.post(url, headers=headers, data=payload, timeout=10)
- response.raise_for_status()
- return response.json()
- except requests.exceptions.RequestException as e:
- logger.error(f"Error getting token from Keycloak: {e}")
- raise HTTPException(status_code=500, detail=f"Failed to get token from Keycloak: {e}") from e
- def refresh_token_from_keycloak(refresh_token: str) -> Dict:
- """Refreshes the access token using the refresh token."""
- url = f"{KEYCLOAK_SERVER_URL}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/token"
- payload = f'grant_type=refresh_token&client_id={KEYCLOAK_CLIENT_ID}&client_secret={KEYCLOAK_CLIENT_SECRET}&refresh_token={refresh_token}'
- headers = {
- 'Content-Type': 'application/x-www-form-urlencoded'
- }
- try:
- response = requests.post(url, headers=headers, data=payload, timeout=10)
- response.raise_for_status()
- return response.json()
- except requests.exceptions.RequestException as e:
- logger.error(f"Error refreshing token: {e}")
- raise HTTPException(status_code=500, detail=f"Failed to refresh token: {e}") from e
- def introspect_keycloak_token_request(access_token: str) -> Dict:
- """Introspects a Keycloak token to verify if it's active."""
- url = f"{KEYCLOAK_SERVER_URL}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/token/introspect"
- payload = f'token={access_token}&client_id={KEYCLOAK_CLIENT_ID}&client_secret={KEYCLOAK_CLIENT_SECRET}'
- headers = {
- 'Content-Type': 'application/x-www-form-urlencoded'
- }
- try:
- response = requests.post(url, headers=headers, data=payload, verify=False, timeout=10)
- response.raise_for_status()
- return response.json()
- except requests.exceptions.RequestException as e:
- logger.error(f"Error introspecting token: {e}")
- raise HTTPException(status_code=500, detail=f"Failed to introspect token: {e}") from e
- def get_user_info_from_keycloak(access_token: str) -> Dict:
- """Gets user information from Keycloak using the access token."""
- url = f"{KEYCLOAK_SERVER_URL}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/userinfo"
- headers = {
- 'Authorization': f'Bearer {access_token}'
- }
- try:
- response = requests.get(url, headers=headers, timeout=10)
- response.raise_for_status()
- return response.json()
- except requests.exceptions.RequestException as e:
- logger.error(f"Error getting user info: {e}")
- raise HTTPException(status_code=500, detail=f"Failed to get user info: {e}") from e
- def logout_keycloak(refresh_token:str):
- url = f"{KEYCLOAK_SERVER_URL}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/logout"
- payload = f'refresh_token={refresh_token}&client_id={KEYCLOAK_CLIENT_ID}&client_secret={KEYCLOAK_CLIENT_SECRET}'
- headers = {
- 'Content-Type': 'application/x-www-form-urlencoded'
- }
- try:
- response = requests.request("POST", url, headers=headers, data=payload)
- response.raise_for_status()
- return response.json()
- except requests.exceptions.RequestException as e:
- logger.error(f"Error logging out user: {e}")
- raise HTTPException(status_code=500, detail=f"Failed to logout user: {e}") from e
- app.mount("/static", StaticFiles(directory="static"), name="static")
- templates = Jinja2Templates(directory="templates")
- @app.get("/mappa_login")
- async def mappa_login(request: Request):
- request.session.clear()
- return templates.TemplateResponse("mappa_login.html", {"request": request})
- @app.get("/tfo")
- async def tfo(request: Request):
- if "access_token" in request.session.keys():
- if introspect_keycloak_token_request(str(request.session.get("access_token")))["active"] == True:
- return templates.TemplateResponse("TFO.html", {"request": request})
- try:
- tokens = refresh_token_from_keycloak(str(request.session.get("refresh_token")))
- request.session.update({"access_token" : tokens["access_token"], "refresh_token" : tokens["refresh_token"]})
- user_info = get_user_info_from_keycloak(tokens["access_token"])
- request.session.update(
- {"ruolo": user_info["ruolo"], "codice_fiscale": user_info["CF"]}
- )
- return RedirectResponse(url=f"/tfo?ruolo={user_info["ruolo"]}&codice_fiscale={user_info["CF"]}", status_code=303)
- except Exception as e:
- request.session.clear()
- return RedirectResponse(url="/access", status_code=303)
- request.session.clear()
- return RedirectResponse(url="/access", status_code=303)
- @app.get("/instructions")
- async def instructions(request: Request):
- if "access_token" in request.session.keys():
- if introspect_keycloak_token_request(str(request.session.get("access_token")))["active"] == True:
- return templates.TemplateResponse("instructions.html", {"request": request})
- try:
- tokens = refresh_token_from_keycloak(str(request.session.get("refresh_token")))
- request.session.update({"access_token" : tokens["access_token"], "refresh_token" : tokens["refresh_token"]})
- user_info = get_user_info_from_keycloak(tokens["access_token"])
- request.session.update(
- {"ruolo": user_info["ruolo"], "codice_fiscale": user_info["CF"]}
- )
- return RedirectResponse(url=f"/instructions?ruolo={user_info["ruolo"]}&codice_fiscale={user_info["CF"]}", status_code=303)
- except Exception as e:
- request.session.clear()
- return RedirectResponse(url="/access", status_code=303)
- request.session.clear()
- return RedirectResponse(url="/access", status_code=303)
- @app.get("/buildings")
- async def buildings(request: Request):
- if "access_token" in request.session.keys():
- if introspect_keycloak_token_request(str(request.session.get("access_token")))["active"] == True:
- return templates.TemplateResponse("buildings.html", {"request": request})
- try:
- tokens = refresh_token_from_keycloak(str(request.session.get("refresh_token")))
- request.session.update({"access_token" : tokens["access_token"], "refresh_token" : tokens["refresh_token"]})
- user_info = get_user_info_from_keycloak(tokens["access_token"])
- request.session.update(
- {"ruolo": user_info["ruolo"], "codice_fiscale": user_info["CF"]}
- )
- return RedirectResponse(url=f"/buildings?ruolo={user_info["ruolo"]}&codice_fiscale={user_info["CF"]}", status_code=303)
- except Exception as e:
- request.session.clear()
- return RedirectResponse(url="/access", status_code=303)
- request.session.clear()
- return RedirectResponse(url="/access", status_code=303)
- @app.get("/faq")
- async def faq(request: Request):
- if "access_token" in request.session.keys():
- if introspect_keycloak_token_request(str(request.session.get("access_token")))["active"] == True:
- return templates.TemplateResponse("faq.html", {"request": request})
- try:
- tokens = refresh_token_from_keycloak(str(request.session.get("refresh_token")))
- request.session.update({"access_token" : tokens["access_token"], "refresh_token" : tokens["refresh_token"]})
- user_info = get_user_info_from_keycloak(tokens["access_token"])
- request.session.update(
- {"ruolo": user_info["ruolo"], "codice_fiscale": user_info["CF"]}
- )
- return RedirectResponse(url=f"/faq?ruolo={user_info["ruolo"]}&codice_fiscale={user_info["CF"]}", status_code=303)
- except Exception as e:
- request.session.clear()
- return RedirectResponse(url="/access", status_code=303)
- request.session.clear()
- return RedirectResponse(url="/access", status_code=303)
- @app.get("/techsup")
- async def techsup(request: Request):
- if "access_token" in request.session.keys():
- if introspect_keycloak_token_request(str(request.session.get("access_token")))["active"] == True:
- return templates.TemplateResponse("techsup.html", {"request": request})
- try:
- tokens = refresh_token_from_keycloak(str(request.session.get("refresh_token")))
- request.session.update({"access_token" : tokens["access_token"], "refresh_token" : tokens["refresh_token"]})
- user_info = get_user_info_from_keycloak(tokens["access_token"])
- request.session.update(
- {"ruolo": user_info["ruolo"], "codice_fiscale": user_info["CF"]}
- )
- return RedirectResponse(url=f"/techsup?ruolo={user_info["ruolo"]}&codice_fiscale={user_info["CF"]}", status_code=303)
- except Exception as e:
- request.session.clear()
- return RedirectResponse(url="/access", status_code=303)
- request.session.clear()
- return RedirectResponse(url="/access", status_code=303)
- @app.get("/callback")
- async def callback(request: Request):
- if "access_token" in request.session.keys():
- if introspect_keycloak_token_request(str(request.session.get("access_token")))["active"] == True:
- return templates.TemplateResponse("mappa_logout.html", {"request": request})
- try:
- tokens = refresh_token_from_keycloak(str(request.session.get("refresh_token")))
- request.session.update({"access_token" : tokens["access_token"], "refresh_token" : tokens["refresh_token"]})
- user_info = get_user_info_from_keycloak(tokens["access_token"])
- request.session.update(
- {"ruolo": user_info["ruolo"], "codice_fiscale": user_info["CF"]}
- )
- return RedirectResponse(url=f"/callback?ruolo={user_info["ruolo"]}&codice_fiscale={user_info["CF"]}", status_code=303)
- except Exception as e:
- request.session.clear()
- return RedirectResponse(url="/access", status_code=303)
- request.session.clear()
- return RedirectResponse(url="/access", status_code=303)
- @app.get("/admin")
- async def admin(request: Request):
- if "access_token" in request.session.keys():
- if introspect_keycloak_token_request(str(request.session.get("access_token")))["active"] == True:
- if get_user_info_from_keycloak(str(request.session.get("access_token")))["ruolo"] == "amministratore":
- return templates.TemplateResponse("ADMIN.html", {"request": request})
- return RedirectResponse(url="/callback", status_code=303)
- try:
- tokens = refresh_token_from_keycloak(str(request.session.get("refresh_token")))
- request.session.update({"access_token" : tokens["access_token"], "refresh_token" : tokens["refresh_token"]})
- user_info = get_user_info_from_keycloak(tokens["access_token"])
- request.session.update(
- {"ruolo": user_info["ruolo"], "codice_fiscale": user_info["CF"]}
- )
- return RedirectResponse(url=f"/admin?ruolo={user_info["ruolo"]}&codice_fiscale={user_info["CF"]}", status_code=303)
- except Exception as e:
- request.session.clear()
- return RedirectResponse(url="/access", status_code=303)
- request.session.clear()
- return RedirectResponse(url="/access", status_code=303)
- @app.get("/access")
- @app.post("/access")
- async def login(request: Request, username: Optional[str] = Form(None), password: Optional[str] = Form(None)):
- error = None
- if request.method == "POST":
- try:
- request.session.clear()
- tokens = get_token_from_keycloak(username, password)
- request.session.update(
- {"access_token": tokens["access_token"], "refresh_token": tokens["refresh_token"]}
- )
- user_info = get_user_info_from_keycloak(tokens["access_token"])
- request.session.update(
- {"ruolo": user_info["ruolo"], "codice_fiscale": user_info["CF"]}
- )
- return RedirectResponse(url=f"/callback?ruolo={user_info["ruolo"]}&codice_fiscale={user_info["CF"]}", status_code=303)
- except HTTPException as e:
- return JSONResponse(content={"detail": e.detail}, status_code=e.status_code)
- except Exception as e:
- logger.error(f"An unexpected error occurred during login: {e}")
- return JSONResponse(content={"detail": "An unexpected error occurred"}, status_code=500)
-
- if "access_token" in request.session.keys():
- if introspect_keycloak_token_request(str(request.session.get("access_token")))["active"] == True:
- return RedirectResponse(url="/callback", status_code=303)
- try:
- tokens = refresh_token_from_keycloak(str(request.session.get("refresh_token")))
- request.session.update({"access_token" : tokens["access_token"], "refresh_token" : tokens["refresh_token"]})
- user_info = get_user_info_from_keycloak(tokens["access_token"])
- request.session.update(
- {"ruolo": user_info["ruolo"], "codice_fiscale": user_info["CF"]}
- )
- return RedirectResponse(url=f"/callback?ruolo={user_info["ruolo"]}&codice_fiscale={user_info["CF"]}", status_code=303)
- except Exception as e:
- request.session.clear()
- return templates.TemplateResponse("login.html", {"request": request, "error": error})
-
- request.session.clear()
- return templates.TemplateResponse("login.html", {"request": request, "error": error})
- @app.get("/refresh")
- async def refresh(request: Request):
- """Refreshes the access token using the refresh token in the session."""
- refresh_token = request.session.get("refresh_token")
- if not refresh_token:
- raise HTTPException(status_code=401, detail="Refresh token not found in session")
- try:
- new_tokens = refresh_token_from_keycloak(refresh_token)
- request.session["access_token"] = new_tokens["access_token"]
- request.session["refresh_token"] = new_tokens["refresh_token"]
- new_user_info = get_user_info_from_keycloak(new_tokens["access_token"])
- request.session["user_info"] = new_user_info
- return {"message": "Token refreshed successfully", "access_token": new_tokens["access_token"], "user_info": new_user_info}
- except HTTPException as e:
- return JSONResponse(content={"detail": e.detail}, status_code=e.status_code)
- except Exception as e:
- logger.error(f"An unexpected error occurred during token refresh: {e}")
- return JSONResponse(content={"detail": "An unexpected error occurred"}, status_code=500)
- @app.get("/introspect")
- async def introspect(request: Request):
- """Introspects the access token in the session."""
- access_token = request.session.get("access_token")
- if not access_token:
- raise HTTPException(status_code=401, detail="Access token not found in session")
- try:
- introspect_data = introspect_keycloak_token_request(access_token)
- return {"message": "Token introspection successful", "introspect_data": introspect_data}
- except HTTPException as e:
- return JSONResponse(content={"detail": e.detail}, status_code=e.status_code)
- except Exception as e:
- logger.error(f"An unexpected error occurred during token introspection: {e}")
- return JSONResponse(content={"detail": "An unexpected error occurred"}, status_code=500)
- @app.get("/userinfo")
- async def user_info(request: Request):
- """Retrieves and returns user information stored in the session."""
- user_info = request.session.get("user_info")
- if not user_info:
- raise HTTPException(status_code=401, detail="User info not found in session")
- return {"message": "User information retrieved", "user_info": user_info}
- @app.get("/logout_keycloak")
- async def logout_user(request: Request):
- """Logs out a user by revoking the refresh token."""
- refresh_token = request.session.get("refresh_token")
- if not refresh_token:
- raise HTTPException(status_code=401, detail="Refresh token not found in session")
- try:
- logout_keycloak(refresh_token)
- request.session.clear()
- return {"message": "Logout successful"}
- except HTTPException as e:
- return JSONResponse(content={"detail": e.detail}, status_code=e.status_code)
- except Exception as e:
- logger.error(f"An unexpected error occurred during logout: {e}")
- return JSONResponse(content={"detail": "An unexpected error occurred"}, status_code=500)
- @app.get("/protected")
- async def protected_endpoint(request: Request):
- """A protected endpoint that requires a valid access token."""
- access_token = request.session.get("access_token")
- if not access_token:
- raise HTTPException(status_code=401, detail="Access token not found in session")
- try:
- introspect_data = introspect_keycloak_token_request(access_token)
- if not introspect_data.get("active"):
- raise HTTPException(status_code=401, detail="Access token is not active")
- return JSONResponse({"message": f"Hello, world! (Protected)", "introspect": introspect_data})
- except HTTPException as e:
- return JSONResponse(content={"detail": e.detail}, status_code=e.status_code)
- except Exception as e:
- logger.error(f"An unexpected error occurred during token introspection: {e}")
- return JSONResponse(content={"detail": "An unexpected error occurred"}, status_code=500)
-
- # --- Run the App ---
- if __name__ == "__main__":
- uvicorn.run(app, host="localhost", port=8000)
|