import geojson from fastapi import FastAPI, Depends, HTTPException, Request, Form, status, Body from fastapi.responses import JSONResponse, RedirectResponse, HTMLResponse from fastapi.staticfiles import StaticFiles from fastapi.middleware.cors import CORSMiddleware from fastapi.templating import Jinja2Templates from sqlalchemy import create_engine, Column, Integer, String, Date, Identity, Boolean, Text , TIMESTAMP from sqlalchemy.orm import sessionmaker, declarative_base, Session from geoalchemy2 import Geometry, Geography from geoalchemy2.types import WKBElement from geoalchemy2.shape import to_shape from starlette.middleware.sessions import SessionMiddleware import secrets import uvicorn import logging import requests # Import the requests library here from typing import Annotated, Dict, List #import typing from pydantic import BaseModel from datetime import date, datetime # --- Keycloak Configuration --- KEYCLOAK_SERVER_URL = "http://165.22.75.145:8080" # Double-check this! Changed to http KEYCLOAK_REALM = "Generation" KEYCLOAK_CLIENT_ID = "web-app-pw" KEYCLOAK_CLIENT_SECRET = "fQGWt8HSPn65cCKTOzE5FigqZhf8QTYW" KEYCLOAK_SCOPE = "codicefiscale email openid ruolo" # --- App Initialization --- app = FastAPI() # --- Logging Configuration --- logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # --- CORS --- origins = [ "http://localhost:3000", ] app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # --- Session --- app.add_middleware(SessionMiddleware, secret_key=secrets.token_hex(32)) # app.add_middleware(SessionMiddleware, secret_key="your-secret-key") # Template Jinja2 app.mount("/static", StaticFiles(directory="static"), name="static") templates = Jinja2Templates(directory="templates") # Configurazione del database PostgreSQL #SQLALCHEMY_DATABASE_URL = "postgresql://postgres:postgres@165.22.75.145:15432/GenerationDAITA25" SQLALCHEMY_DATABASE_URL = "postgresql://postgres:postgres@165.22.75.145:15432/backend" engine = create_engine(SQLALCHEMY_DATABASE_URL) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) # Funzione per ottenere la sessione def get_db(): db = SessionLocal() try: yield db finally: db.close() Base = declarative_base() # Definizione delle tabelle del database class TabellaEdifici(Base): __tablename__ = 'edifici' id_ = Column(Integer, primary_key=True, name='id_') # Chiave primaria, autoincrement id_codice_fiscale = Column(String(16), name='id_codice_fiscale') id_edificio_osm = Column(String(30), nullable=False, name='id_edificio', unique=True) indirizzo = Column(String(100), nullable=False) codice_catastale = Column(String(50), nullable=True) coordinate = Column(Geometry(geometry_type='POINT', srid=4326)) # SRID 4326 รจ lo standard per latitudine/longitudine poligono = Column(Geometry(geometry_type='POLYGON', srid=4326)) tipo_edificio = Column(String(100)) stato = Column(Boolean, default=True) data_installazione = Column(Date, nullable=False) data_cancellazione = Column(Date, nullable=True) def to_dict(self): result ={ 'id_': self.id_, 'id_codice_fiscale': self.id_codice_fiscale, 'id_edificio': self.id_edificio_osm, 'indirizzo': self.indirizzo, 'codice_catastale': self.codice_catastale, # 'coordinate': coordinate_json, # 'poligono': poligono_json, 'tipo_edificio': self.tipo_edificio, 'stato': self.stato, 'data_installazione': self.data_installazione, 'data_cancellazione': self.data_cancellazione } if self.coordinate: shape = to_shape(self.coordinate) geojson_str = geojson.dumps(shape) result['coordinate'] = geojson.loads(geojson_str) if self.poligono: shape = to_shape(self.poligono) geojson_str = geojson.dumps(shape) result['poligono'] = geojson.loads(geojson_str) return result class TabellaTFO(Base): __tablename__ = 'tfo' id_ = Column(Integer, primary_key=True, name='id_') # Chiave primaria, autoincrement id_codice_fiscale = Column(String(16), name='id_codice_fiscale') id_tfo = Column(String(30), nullable=False, name='id_tfo') codice_catastale = Column(String(50), nullable=True, name='codice_catastale') operatore = Column(String(255)) piano = Column(String(50)) scala = Column(String(50)) interno = Column(String(50)) id_edificio = Column(String(30), nullable=False, name='id_edificio') data_installazione = Column(Date, nullable=False, name='data_installazione') data_cancellazione = Column(Date, nullable=True, name='data_cancellazione') class TabellaUtenti(Base): __tablename__ = 'utenti' id_ = Column(Integer, primary_key=True, name='id_') # Chiave primaria, autoincrement codice_fiscale = Column(String(16), name='codice_fiscale') nome = Column(String(255), nullable=False) cognome = Column(String(255), nullable=False) email = Column(String(100), nullable=False) ruolo = Column(String(255)) stato_utente = Column(Boolean) class TabellaLogEventi(Base): __tablename__ = 'log_' id_ = Column(Integer, primary_key=True, name='id_') # Chiave primaria, autoincrement id_edificio = Column(Integer, name='id_edificio') id_tfo = Column(String(30), name='id_tfo') id_utente = Column(String(16), name='id_utente') data = Column(TIMESTAMP, default = datetime.now(), name='data') categoria_modifica = Column(String(255), name='categoria_modifica') descrizione_evento = Column(String(4000), name='descrizione_evento') class Assistenza(Base): __tablename__ = 'assistenza' id_ticket = Column(Integer, primary_key=True, autoincrement=True) nome = Column(String(50)) cognome = Column(String(50)) email = Column(String(100), unique=True, nullable=False) data_richiesta = Column(TIMESTAMP, default=datetime.now()) descrizione_problema = Column(String(4000)) # Base.metadata.drop_all(engine) # Base.metadata.create_all(bind=engine) class Credenziali(BaseModel): username: str password: str #### accesso_modifica = "RSSMRA80A01H501Z" #### # funzioni per ottenere i dati dal database def get_all_buildings(db): buildings = db.query(TabellaEdifici).all() return [building.to_dict() for building in buildings] def get_buildings_by_user(db): return db.query(TabellaEdifici).filter(TabellaEdifici.id_codice_fiscale == accesso_modifica).all() def get_specific_building(db, building_id): return db.query(TabellaEdifici).filter(TabellaEdifici.id_edificio_osm == building_id).first() ############Funzione per filtro ricerca def filtro_ricerca_indirizzo(db, filtro): return db.query(TabellaEdifici).filter(TabellaEdifici.indirizzo.ilike(f"%{filtro}%")).all() def filtro_ricerca_id_edificio_osm(db, filtro): return db.query(TabellaEdifici).filter(TabellaEdifici.id_edificio_osm.ilike(f"%{filtro}%")).all() def filtro_ricerca_data(db, filtro): return db.query(TabellaEdifici).filter(TabellaEdifici.data_installazione == filtro).all() def filtro_ricerca_tipo_edificio(db, filtro): return db.query(TabellaEdifici).filter(TabellaEdifici.tipo_edificio.ilike(f"%{filtro}%")).all() def filtro_ricerca_id_codice_fiscale(db, filtro): return db.query(TabellaEdifici).filter(TabellaEdifici.id_codice_fiscale.ilike(f"%{filtro}%")).all() def filtro_ricerca_totale(db, filtro_indirizzo, filtro_data, filtro_id_edificio_osm, filtro_tipo_edificio, filtro_id_codice_fiscale): query1 = db.query(TabellaEdifici) if filtro_indirizzo is not None: query1 = query1.filter(TabellaEdifici.indirizzo.ilike(f"%{filtro_indirizzo}%")) if filtro_data is not None: query1 = query1.filter(TabellaEdifici.data_installazione.ilike(f"%{filtro_data}%")) if filtro_id_edificio_osm is not None: query1 = query1.filter(TabellaEdifici.id_edificio_osm.ilike(f"%{filtro_id_edificio_osm}%")) if filtro_tipo_edificio is not None: query1 = query1.filter(TabellaEdifici.tipo_edificio.ilike(f"%{filtro_tipo_edificio}%")) if filtro_id_codice_fiscale is not None: query1 = query1.filter(TabellaEdifici.id_codice_fiscale.ilike(f"%{filtro_id_codice_fiscale}%")) return query1.all() def get_token_from_keycloak(username, password) -> Dict: """Retrieves access and refresh tokens from Keycloak.""" url = f"{KEYCLOAK_SERVER_URL}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/token" payload = f'grant_type=password&client_id={KEYCLOAK_CLIENT_ID}&scope={KEYCLOAK_SCOPE}&username={username}&password={password}&client_secret={KEYCLOAK_CLIENT_SECRET}' headers = { 'Content-Type': 'application/x-www-form-urlencoded' } try: response = requests.post(url, headers=headers, data=payload, timeout=10) response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) return response.json() except requests.exceptions.RequestException as e: logger.error(f"Error getting token from Keycloak: {e}") raise HTTPException(status_code=500, detail=f"Failed to get token from Keycloak: {e}") from e def refresh_token_from_keycloak(refresh_token: str) -> Dict: """Refreshes the access token using the refresh token.""" url = f"{KEYCLOAK_SERVER_URL}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/token" payload = f'grant_type=refresh_token&client_id={KEYCLOAK_CLIENT_ID}&client_secret={KEYCLOAK_CLIENT_SECRET}&refresh_token={refresh_token}' headers = { 'Content-Type': 'application/x-www-form-urlencoded' } try: response = requests.post(url, headers=headers, data=payload, timeout=10) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: logger.error(f"Error refreshing token: {e}") raise HTTPException(status_code=500, detail=f"Failed to refresh token: {e}") from e def introspect_keycloak_token_request(access_token: str) -> Dict: """Introspects a Keycloak token to verify if it's active.""" url = f"{KEYCLOAK_SERVER_URL}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/token/introspect" payload = f'token={access_token}&client_id={KEYCLOAK_CLIENT_ID}&client_secret={KEYCLOAK_CLIENT_SECRET}' headers = { 'Content-Type': 'application/x-www-form-urlencoded' } try: response = requests.post(url, headers=headers, data=payload, verify=False, timeout=10) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: logger.error(f"Error introspecting token: {e}") raise HTTPException(status_code=500, detail=f"Failed to introspect token: {e}") from e def get_user_info_from_keycloak(access_token: str) -> Dict: """Gets user information from Keycloak using the access token.""" url = f"{KEYCLOAK_SERVER_URL}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/userinfo" headers = { 'Authorization': f'Bearer {access_token}' } try: response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: logger.error(f"Error getting user info: {e}") raise HTTPException(status_code=500, detail=f"Failed to get user info: {e}") from e def logout_keycloak(refresh_token:str): url = f"{KEYCLOAK_SERVER_URL}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/logout" payload = f'refresh_token={refresh_token}&client_id={KEYCLOAK_CLIENT_ID}&client_secret={KEYCLOAK_CLIENT_SECRET}' headers = { 'Content-Type': 'application/x-www-form-urlencoded' } try: response = requests.request("POST", url, headers=headers, data=payload) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: logger.error(f"Error logging out user: {e}") raise HTTPException(status_code=500, detail=f"Failed to logout user: {e}") from e @app.get("/access") async def access(request: Request): return templates.TemplateResponse("index.html", context={"request": request, "title": "Accesso"}) @app.get("/") async def home(request: Request): return print("Ciao") #return templates.TemplateResponse("index.html", {"request": request}) @app.post("/login") async def login(credenziali: Credenziali = Body(...)): """ Logs in a user by retrieving tokens from Keycloak. Stores the tokens and user info in the session. """ try: tokens = get_token_from_keycloak(credenziali.username, credenziali.password)#request.body.json()["username"], request.json()["password"]) #RedirectResponse(url="localhost:8000/callback") # Save tokens and information to the session #request.session["access_token"] = tokens["access_token"] #request.session["refresh_token"] = tokens["refresh_token"] #print(tokens["access_token"]) # Get user info and save it to the session too user_info = get_user_info_from_keycloak(tokens["access_token"]) #request.session["user_info"] = user_info return tokens except HTTPException as e: return JSONResponse(content={"detail": e.detail}, status_code=e.status_code) except Exception as e: logger.error(f"An unexpected error occurred during login: {e}") return JSONResponse(content={"detail": "An unexpected error occurred"}, status_code=500) # @app.post("/login") # async def login(request: Request, credenziali: Credenziali = Body(...)): # try: # tokens = get_token_from_keycloak(credenziali.username, credenziali.password)#request.body.json()["username"], request.json()["password"]) # RedirectResponse(url="localhost:8000/callback") # # Save tokens and information to the session # request.session["access_token"] = tokens["access_token"] # request.session["refresh_token"] = tokens["refresh_token"] # # Get user info and save it to the session too # user_info = get_user_info_from_keycloak(tokens["access_token"]) # request.session["user_info"] = user_info # return RedirectResponse(url="localhost:8000/callback") # except HTTPException as e: # return JSONResponse(content={"detail": e.detail}, status_code=e.status_code) # except Exception as e: # logger.error(f"An unexpected error occurred during login: {e}") # return JSONResponse(content={"detail": "An unexpected error occurred"}, status_code=500) # @app.post("/login") # async def login(request: Request, code: str = Form(...)): # token = keycloak_openid.token(grant_type='authorization_code', code=code, redirect_uri=os.getenv('REDIRECT_URI')) # if not token: # raise HTTPException(status_code=401, detail="Login fallito") # return RedirectResponse(url="/inserisci_dati") # @app.get("/inserisci_dati") # async def inserisci_dati(request: Request): # return templates.TemplateResponse("form.html", {"request": request}) # @app.post("/salva_dati") # async def salva_dati(request: Request, # id_codice_fiscale: str = Form(...), # id_edificio_osm: str = Form(...), # indirizzo: str = Form(...), # codice_catastale: str = Form(...), # coordinate: Geometry = Form(...), # poligono: Geometry = Form(...), # tipo_edificio: str = Form(...), # data_installazione: Geometry = Form(...), # stato: int = Form(...), # db: Session = Depends(get_db)): # edificio = TabellaEdifici(id_codice_fiscale=id_codice_fiscale, # id_edificio_osm=id_edificio_osm, # indirizzo=indirizzo, # codice_catastale=codice_catastale, # coordinate=coordinate, # poligono=poligono, # tipo_edificio=tipo_edificio, # data_installazione=data_installazione, # stato=stato) # db.add(edificio) # db.commit() # db.refresh(edificio) # return RedirectResponse(url="/salva_dati", status_code=status.HTTP_303_SEE_OTHER) @app.get("/visualizza_pratiche") async def visualizza_pratiche(request: Request, db: Session = Depends(get_db)): return get_all_buildings(db) @app.post("/visualizza_pratiche/modifica") async def modifica(request: Request, building_id : str = Form(...), indirizzo: str = Form(...), codice_catastale: str = Form(...), tipo_edificio: str = Form(...), db: Session = Depends(get_db)): pratica_da_modificare = get_specific_building(db, building_id) if pratica_da_modificare.indirizzo != indirizzo: pratica_da_modificare.indirizzo = indirizzo if pratica_da_modificare.codice_catastale != codice_catastale: pratica_da_modificare.codice_catastale = codice_catastale if pratica_da_modificare.tipo_edificio != tipo_edificio: pratica_da_modificare.tipo_edificio = tipo_edificio db.commit() return RedirectResponse(url="/visualizza_pratiche", status_code=status.HTTP_303_SEE_OTHER) @app.post("/visualizza_pratiche/elimina") async def elimina(request: Request, building_id : str = Form(...), db: Session = Depends(get_db)): pratica_da_eliminare = get_specific_building(db, building_id) pratica_da_eliminare.data_cancellazione = date.today() db.commit() return RedirectResponse(url="/visualizza_pratiche", status_code=status.HTTP_303_SEE_OTHER) @app.post("/visualizza_pratiche/ripristina") async def ripristina(request: Request, building_id : str = Form(...), db: Session = Depends(get_db)): pratica_da_ripristinare = get_specific_building(db, building_id) pratica_da_ripristinare.data_cancellazione = None db.commit() return RedirectResponse(url="/visualizza_pratiche", status_code=status.HTTP_303_SEE_OTHER) def aggiungi_prova(): with SessionLocal() as db: edificio = TabellaEdifici(id_codice_fiscale="RSSMRA80A01H501Z", id_edificio_osm="ED006", indirizzo="corso italia 1", codice_catastale="1244", coordinate="POINT(12.496365 41.902782)", poligono="POLYGON((12.496365 41.902782, 12.496365 41.902782, 12.496365 41.902782, 12.496365 41.902782, 12.496365 41.902782))", tipo_edificio="commerciale", data_installazione=datetime.strptime("12/11/2021", "%d/%m/%Y").date() ) db.add(edificio) db.commit() # aggiungi_prova() def test_json(): with SessionLocal() as db: return get_all_buildings(db) print(test_json()) uvicorn.run(app, host="localhost", port=8000) # uvicorn main:app --reload