123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289 |
- from fastapi import FastAPI, Form, Request, Depends, HTTPException, Body
- from pydantic import BaseModel
- from fastapi.responses import JSONResponse, RedirectResponse, HTMLResponse
- from fastapi.templating import Jinja2Templates
- from fastapi.staticfiles import StaticFiles
- from fastapi.middleware.cors import CORSMiddleware
- from starlette.middleware.sessions import SessionMiddleware
- import secrets
- import uvicorn
- import logging
- import requests # Import the requests library here
- from typing import Annotated, Dict, List #import typing
- # --- Keycloak Configuration ---
- KEYCLOAK_SERVER_URL = "http://165.22.75.145:8080" # Double-check this! Changed to http
- KEYCLOAK_REALM = "Generation"
- KEYCLOAK_CLIENT_ID = "web-app-pw"
- KEYCLOAK_CLIENT_SECRET = "fQGWt8HSPn65cCKTOzE5FigqZhf8QTYW"
- KEYCLOAK_SCOPE = "codicefiscale email openid ruolo"
- # --- App Initialization ---
- app = FastAPI()
- # --- Logging Configuration ---
- logging.basicConfig(level=logging.INFO)
- logger = logging.getLogger(__name__)
- # --- CORS ---
- origins = [
- "http://localhost:8000",
- "http://localhost:3000/*",
- ]
- app.add_middleware(
- CORSMiddleware,
- allow_origins=origins,
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- )
- # --- Session ---
- app.add_middleware(SessionMiddleware, secret_key=secrets.token_hex(32))
- # --- Helper Functions (modified to be part of the API) ---
- # Function to get tokens from keycloak
- def get_token_from_keycloak(username, password) -> Dict:
- """Retrieves access and refresh tokens from Keycloak."""
- url = f"{KEYCLOAK_SERVER_URL}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/token"
- payload = f'grant_type=password&client_id={KEYCLOAK_CLIENT_ID}&scope={KEYCLOAK_SCOPE}&username={username}&password={password}&client_secret={KEYCLOAK_CLIENT_SECRET}'
- headers = {
- 'Content-Type': 'application/x-www-form-urlencoded'
- }
- try:
- response = requests.post(url, headers=headers, data=payload, timeout=10)
- response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
- return response.json()
- except requests.exceptions.RequestException as e:
- logger.error(f"Error getting token from Keycloak: {e}")
- raise HTTPException(status_code=500, detail=f"Failed to get token from Keycloak: {e}") from e
- def refresh_token_from_keycloak(refresh_token: str) -> Dict:
- """Refreshes the access token using the refresh token."""
- url = f"{KEYCLOAK_SERVER_URL}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/token"
- payload = f'grant_type=refresh_token&client_id={KEYCLOAK_CLIENT_ID}&client_secret={KEYCLOAK_CLIENT_SECRET}&refresh_token={refresh_token}'
- headers = {
- 'Content-Type': 'application/x-www-form-urlencoded'
- }
- try:
- response = requests.post(url, headers=headers, data=payload, timeout=10)
- response.raise_for_status()
- return response.json()
- except requests.exceptions.RequestException as e:
- logger.error(f"Error refreshing token: {e}")
- raise HTTPException(status_code=500, detail=f"Failed to refresh token: {e}") from e
- def introspect_keycloak_token_request(access_token: str) -> Dict:
- """Introspects a Keycloak token to verify if it's active."""
- url = f"{KEYCLOAK_SERVER_URL}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/token/introspect"
- payload = f'token={access_token}&client_id={KEYCLOAK_CLIENT_ID}&client_secret={KEYCLOAK_CLIENT_SECRET}'
- headers = {
- 'Content-Type': 'application/x-www-form-urlencoded'
- }
- try:
- response = requests.post(url, headers=headers, data=payload, verify=False, timeout=10)
- response.raise_for_status()
- return response.json()
- except requests.exceptions.RequestException as e:
- logger.error(f"Error introspecting token: {e}")
- raise HTTPException(status_code=500, detail=f"Failed to introspect token: {e}") from e
- def get_user_info_from_keycloak(access_token: str) -> Dict:
- """Gets user information from Keycloak using the access token."""
- url = f"{KEYCLOAK_SERVER_URL}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/userinfo"
- headers = {
- 'Authorization': f'Bearer {access_token}'
- }
- try:
- response = requests.get(url, headers=headers, timeout=10)
- response.raise_for_status()
- return response.json()
- except requests.exceptions.RequestException as e:
- logger.error(f"Error getting user info: {e}")
- raise HTTPException(status_code=500, detail=f"Failed to get user info: {e}") from e
- def logout_keycloak(refresh_token:str):
- url = f"{KEYCLOAK_SERVER_URL}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/logout"
- payload = f'refresh_token={refresh_token}&client_id={KEYCLOAK_CLIENT_ID}&client_secret={KEYCLOAK_CLIENT_SECRET}'
- headers = {
- 'Content-Type': 'application/x-www-form-urlencoded'
- }
- try:
- response = requests.request("POST", url, headers=headers, data=payload)
- response.raise_for_status()
- return response.json()
- except requests.exceptions.RequestException as e:
- logger.error(f"Error logging out user: {e}")
- raise HTTPException(status_code=500, detail=f"Failed to logout user: {e}") from e
- #print(get_token_from_keycloak("testuserperpaginaprivata", "user"))
- # --- Routes ---
- #@app.get("/login")
- #async def login(request: Request):
- # """
- # Logs in a user by retrieving tokens from Keycloak.
- # Stores the tokens and user info in the session.
- # """
- # try:
- # #tokens = get_token_from_keycloak()
- # ## Save tokens and information to the session
- # #request.session["access_token"] = tokens["access_token"]
- # #request.session["refresh_token"] = tokens["refresh_token"]
- ##
- # ## Get user info and save it to the session too
- # #user_info = get_user_info_from_keycloak(tokens["access_token"])
- # #request.session["user_info"] = user_info
- ##
- # #return {"message": "Login successful", "access_token": tokens["access_token"], "user_info": user_info}
- #
- # if request.session["access_token"]:
- # return RedirectResponse(url="localhost:3000/index")
- # else:
- # return RedirectResponse(url="localhost:3000/login")
- # except HTTPException as e:
- # return JSONResponse(content={"detail": e.detail}, status_code=e.status_code)
- # except Exception as e:
- # logger.error(f"An unexpected error occurred during login: {e}")
- # return JSONResponse(content={"detail": "An unexpected error occurred"}, status_code=500)
- class Credenziali(BaseModel):
- username: str
- password: str
- #@app.get("/login")
- app.mount("/static", StaticFiles(directory="static"), name="static")
- templates = Jinja2Templates(directory="templates")
- @app.get("/access")
- async def access(request: Request):
- return templates.TemplateResponse("login.html", context={"request": request, "title": "Accesso"})
- @app.post("/login")
- async def login(request: Request, credenziali: Credenziali = Body(...)):
- """
- Logs in a user by retrieving tokens from Keycloak.
- Stores the tokens and user info in the session.
- """
- try:
- tokens = get_token_from_keycloak(credenziali.username, credenziali.password)#request.body.json()["username"], request.json()["password"])
- RedirectResponse(url="localhost:8000/callback")
- # Save tokens and information to the session
- request.session["access_token"] = tokens["access_token"]
- request.session["refresh_token"] = tokens["refresh_token"]
- #print(tokens["access_token"])
-
- # Get user info and save it to the session too
- user_info = get_user_info_from_keycloak(tokens["access_token"])
- request.session["user_info"] = user_info
- return RedirectResponse(url="localhost:8000/callback")
- except HTTPException as e:
- return JSONResponse(content={"detail": e.detail}, status_code=e.status_code)
- except Exception as e:
- logger.error(f"An unexpected error occurred during login: {e}")
- return JSONResponse(content={"detail": "An unexpected error occurred"}, status_code=500)
- @app.get("/refresh")
- async def refresh(request: Request):
- """Refreshes the access token using the refresh token in the session."""
- refresh_token = request.session.get("refresh_token")
- if not refresh_token:
- raise HTTPException(status_code=401, detail="Refresh token not found in session")
- try:
- new_tokens = refresh_token_from_keycloak(refresh_token)
- request.session["access_token"] = new_tokens["access_token"]
- request.session["refresh_token"] = new_tokens["refresh_token"]
- new_user_info = get_user_info_from_keycloak(new_tokens["access_token"])
- request.session["user_info"] = new_user_info
- return {"message": "Token refreshed successfully", "access_token": new_tokens["access_token"], "user_info": new_user_info}
- except HTTPException as e:
- return JSONResponse(content={"detail": e.detail}, status_code=e.status_code)
- except Exception as e:
- logger.error(f"An unexpected error occurred during token refresh: {e}")
- return JSONResponse(content={"detail": "An unexpected error occurred"}, status_code=500)
- @app.get("/introspect")
- async def introspect(request: Request):
- """Introspects the access token in the session."""
- access_token = request.session.get("access_token")
- if not access_token:
- raise HTTPException(status_code=401, detail="Access token not found in session")
- try:
- introspect_data = introspect_keycloak_token_request(access_token)
- return {"message": "Token introspection successful", "introspect_data": introspect_data}
- except HTTPException as e:
- return JSONResponse(content={"detail": e.detail}, status_code=e.status_code)
- except Exception as e:
- logger.error(f"An unexpected error occurred during token introspection: {e}")
- return JSONResponse(content={"detail": "An unexpected error occurred"}, status_code=500)
- @app.get("/userinfo")
- async def user_info(request: Request):
- """Retrieves and returns user information stored in the session."""
- user_info = request.session.get("user_info")
- if not user_info:
- raise HTTPException(status_code=401, detail="User info not found in session")
- return {"message": "User information retrieved", "user_info": user_info}
- @app.get("/logout_keycloak")
- async def logout_user(request: Request):
- """Logs out a user by revoking the refresh token."""
- refresh_token = request.session.get("refresh_token")
- if not refresh_token:
- raise HTTPException(status_code=401, detail="Refresh token not found in session")
- try:
- logout_keycloak(refresh_token)
- request.session.clear()
- return {"message": "Logout successful"}
- except HTTPException as e:
- return JSONResponse(content={"detail": e.detail}, status_code=e.status_code)
- except Exception as e:
- logger.error(f"An unexpected error occurred during logout: {e}")
- return JSONResponse(content={"detail": "An unexpected error occurred"}, status_code=500)
- @app.get("/protected")
- async def protected_endpoint(request: Request):
- """A protected endpoint that requires a valid access token."""
- access_token = request.session.get("access_token")
- if not access_token:
- raise HTTPException(status_code=401, detail="Access token not found in session")
- try:
- introspect_data = introspect_keycloak_token_request(access_token)
- if not introspect_data.get("active"):
- raise HTTPException(status_code=401, detail="Access token is not active")
- return JSONResponse({"message": f"Hello, world! (Protected)", "introspect": introspect_data})
- except HTTPException as e:
- return JSONResponse(content={"detail": e.detail}, status_code=e.status_code)
- except Exception as e:
- logger.error(f"An unexpected error occurred during token introspection: {e}")
- return JSONResponse(content={"detail": "An unexpected error occurred"}, status_code=500)
-
- # --- Run the App ---
- if __name__ == "__main__":
- uvicorn.run(app, host="localhost", port=8000)
|