import json
import sqlite3
from db import get_connection
from utility.authentication import authenticate
from utility.utility import set_headers
from utility.session import create_session, delete_session
import bcrypt
[documenti]
def handle_login(handler):
"""
POST /login
JSON: {"username":"...","password":"..."}
Verifica l'utente e crea una sessione
"""
print("Ricevuta richiesta di login")
try:
content_length = int(handler.headers.get("Content-Length", 0))
body = handler.rfile.read(content_length).decode("utf-8")
data = json.loads(body)
username = data["username"]
password = data["password"]
print(f"Tentativo di login per l'utente: {username}")
except (ValueError, KeyError, json.JSONDecodeError):
print("Errore nel parsing della richiesta di login")
error_bytes = json.dumps({"error": "JSON non valido o campi mancanti"}).encode("utf-8")
set_headers(handler, 400, error_bytes)
handler.wfile.write(error_bytes)
return
# ricerca dell'utente nel database
with get_connection() as conn:
c = conn.cursor()
c.execute("""
SELECT id, username, password, email, role
FROM users
WHERE username = ?
""", (username,))
row = c.fetchone()
if row is None:
print(f"Utente non trovato: {username}")
error_response = json.dumps({"error": "Utente inesistente"}).encode("utf-8")
set_headers(handler, 401, error_response)
handler.wfile.write(error_response)
return
user_id, db_username, db_hashed_pw, email, role = row
print(f"Utente trovato: {db_username}, Ruolo: {role}")
# controllo per password, bcrypt.checkpw ritorna True se la password è corretta
if bcrypt.checkpw(password.encode("utf-8"), db_hashed_pw.encode("utf-8")):
print(f"Password corretta per l'utente: {username}")
# password corretta, crea sessione nel database e la gestisce nel be
session_id = create_session(user_id)
print(f"Sessione creata: {session_id}")
user_obj = {
"id": user_id,
"username": db_username,
"email": email,
"role": role,
"message": "Login effettuato con successo"
}
response_data = json.dumps(user_obj).encode("utf-8")
# header extra per Set-Cookie per passare il session_id al fe una volta
# loggato nei cookie l'fe lo salva ed è possibile fare richieste successive
# al server e il resto delle richieste saranno autenticate
extra_headers = {
"Set-Cookie": f"session_id={session_id}; HttpOnly; Path=/"
}
#tramite metodo set headers impostiamo header extra necessari per passare il session_id al fe
set_headers(handler, 200, response_data, extra_headers=extra_headers)
handler.wfile.write(response_data)
else:
print(f"Password errata per l'utente: {username}")
error_response = json.dumps({"error": "Username o password errati"}).encode("utf-8")
set_headers(handler, 401, error_response)
handler.wfile.write(error_response)
[documenti]
def handle_logout(handler):
"""POST /logout - Elimina la sessione dell'utente."""
cookies = handler.headers.get("Cookie")
if not cookies:
error_response = json.dumps({"error": "Nessuna sessione attiva"}).encode("utf-8")
set_headers(handler, 400, error_response)
handler.wfile.write(error_response)
return
session_id = None
for cookie in cookies.split(";"):
if "session_id=" in cookie:
session_id = cookie.strip().split("session_id=")[1]
break
if not session_id:
error_response = json.dumps({"error": "Nessuna sessione attiva"}).encode("utf-8")
set_headers(handler, 400, error_response)
handler.wfile.write(error_response)
return
delete_session(session_id)
# imposta il cookie di sessione come scaduto (normalmente si utilizza con la funzione logout)
response_data = json.dumps({"message": "Logout effettuato con successo"}).encode("utf-8")
extra_headers = {
"Set-Cookie": "session_id=; HttpOnly; Path=/; Max-Age=0"
}
set_headers(
handler,
code=200,
response_data=response_data,
extra_headers=extra_headers
)
handler.wfile.write(response_data)
[documenti]
def handle_get_all_users(handler,authenticated_user):
"""GET /users - Ritorna tutti gli utenti senza le password."""
# tramite autorizzazione, permette solo agli utenti con ruolo admin
# di vedere tutti gli utenti
if authenticated_user["role"] != "admin":
error_response = json.dumps({"error": "Autorizzazione richiesta"}).encode("utf-8")
set_headers(handler, 403, error_response)
handler.wfile.write(error_response)
return
with get_connection() as conn:
c = conn.cursor()
c.execute("SELECT id, username, email, role FROM users")
rows = c.fetchall()
results = []
for row in rows:
results.append({
"id": row["id"],
"username": row["username"],
"email": row["email"],
"role": row["role"]
})
response_data = json.dumps(results).encode("utf-8")
set_headers(handler, 200, response_data)
handler.wfile.write(response_data)
[documenti]
def handle_get_user_by_id(handler, user_id):
"""GET /users/<id> - Ritorna il singolo utente se esiste."""
try:
user_id = int(user_id)
except ValueError:
error_response = json.dumps({"error": "ID non valido"}).encode("utf-8")
set_headers(handler, 400, error_response)
handler.wfile.write(error_response)
return
with get_connection() as conn:
c = conn.cursor()
c.execute("SELECT id, username, password, email FROM users WHERE id = ?", (user_id,))
row = c.fetchone()
if row:
result = {
"id": row[0],
"username": row[1],
"password": row[2],
"email": row[3]
}
response_data = json.dumps(result).encode("utf-8")
set_headers(handler, 200, response_data)
handler.wfile.write(response_data)
else:
error_response = json.dumps({"error": "User non trovato"}).encode("utf-8")
set_headers(handler, 404, error_response)
handler.wfile.write(error_response)
[documenti]
def handle_get_current_user(handler):
"""
GET /current-user
Ritorna i dettagli dell'utente autenticato in base al cookie session_id.
"""
session_id = handler.headers.get("Cookie", "").split("session_id=")[-1].split(";")[0]
if not session_id:
error_response = json.dumps({"error": "Sessione non trovata"}).encode("utf-8")
set_headers(handler, 401, error_response)
print("sessione non valida")
handler.wfile.write(error_response)
return
with get_connection() as conn:
c = conn.cursor()
c.execute("""
SELECT u.id, u.username, u.email, u.role
FROM sessions s
JOIN users u ON s.user_id = u.id
WHERE s.session_id = ?
""", (session_id,))
user = c.fetchone()
if user:
response = {
"id": user["id"],
"username": user["username"],
"email": user["email"],
"role": user["role"]
}
response_data = json.dumps(response).encode("utf-8")
print(response_data)
set_headers(handler, 200, response_data)
handler.wfile.write(response_data)
else:
error_response = json.dumps({"error": "Sessione non valida"}).encode("utf-8")
print("sessione non valida")
set_headers(handler, 401, error_response)
handler.wfile.write(error_response)
[documenti]
def is_valid_password(password: str) -> bool:
"""
Verifica se la password inserita per la registrazione utente e'valida.
I constrolli per la password sono i seguenti:
- Verifica lunghezza 8 caratteri
- Verifica presenza di una lettera maiuscola
- Verifica presenza di una lettera minuscola
- Verifica presenza di un numero
- Verifica presenza di un carattere speciale
"""
if len(password) < 8:
return False
if not any(char.isupper() for char in password):
return False
if not any(char.islower() for char in password):
return False
if not any(char.isdigit() for char in password):
return False
# Caratteri speciali da accettare nella password
special_characters = "!@#$%^&*()_+-=[]{}\\|;:'\",.<>?/`~"
if not any(char in special_characters for char in password):
return False
return True
[documenti]
def handle_create_user(handler):
"""POST /users - Crea un nuovo utente nel DB."""
content_length = int(handler.headers.get("Content-Length", 0))
body = handler.rfile.read(content_length).decode("utf-8")
try:
data = json.loads(body)
except json.JSONDecodeError:
error_response = json.dumps({"error": "JSON non valido"}).encode("utf-8")
set_headers(handler, 400, error_response)
handler.wfile.write(error_response)
return
# dati utente
username = data.get("username", "")
# password dell'utente prima del hashing
password = data.get("password", "")
email = data.get("email", "")
# validazione campi
if not username or not password or not email:
error_response = json.dumps({"error": "Tutti i campi sono obbligatori"}).encode("utf-8")
set_headers(handler, 400, error_response)
handler.wfile.write(error_response)
return
# validazione formato della email
if "@" not in email or "." not in email or email.index("@") > email.rindex("."):
error_response = json.dumps({"error": "Email non valida"}).encode("utf-8")
set_headers(handler, 400, error_response)
handler.wfile.write(error_response)
return
# validazione password
if not is_valid_password(password):
error_response = json.dumps({"error": "Password non valida la password deve avere la seguente froma: almeno 8 caratteri, "
"almeno una lettera minuscola, almeno una lettera maiuscola, un numero e un carattere speciale."}).encode("utf-8")
set_headers(handler, 400, error_response)
handler.wfile.write(error_response)
return
# hashing della password tramite l'utililizzo di bcrypt
hashed_pw = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
with get_connection() as conn:
c = conn.cursor()
try:
c.execute("""
INSERT INTO users (username, password, email)
VALUES (?, ?, ?)
""", (username, hashed_pw, email))
conn.commit()
new_id = c.lastrowid
except sqlite3.IntegrityError as e:
# error per gestire violazioni UNIQUE su username o email
error_response = json.dumps({"error": f"Violazione di unicità: {str(e)}"}).encode("utf-8")
set_headers(handler, 400, error_response)
handler.wfile.write(error_response)
return
new_user = {
"id": new_id,
"username": username,
"email": email
}
response_data = json.dumps(new_user).encode("utf-8")
set_headers(handler, 201, response_data)
handler.wfile.write(response_data)
[documenti]
def handle_update_user(handler, user_id):
"""PUT /users/<id> - Aggiorna i campi di un utente (username, password, email)."""
try:
user_id = int(user_id)
except ValueError:
error_response = json.dumps({"error": "ID non valido"}).encode("utf-8")
set_headers(handler, 400, error_response)
handler.wfile.write(error_response)
return
content_length = int(handler.headers.get("Content-Length", 0))
body = handler.rfile.read(content_length).decode("utf-8")
try:
data = json.loads(body)
except json.JSONDecodeError:
error_response = json.dumps({"error": "JSON non valido"}).encode("utf-8")
set_headers(handler, 400, error_response)
handler.wfile.write(error_response)
return
# campi aggiornabili dell'user
username = data.get("username", None)
# nuova password da aggiornare prima che venga hashata
new_password = data.get("password", None)
email = data.get("email", None)
with get_connection() as conn:
c = conn.cursor()
# check per vedere se l'utente esiste nel db
c.execute("SELECT id, username, password, email FROM users WHERE id = ?", (user_id,))
row = c.fetchone()
if not row:
error_response = json.dumps({"error": "Utente non trovato"}).encode("utf-8")
set_headers(handler, 404, error_response)
handler.wfile.write(error_response)
return
existing_id, existing_username, existing_hashed_pw, existing_email = row
# se i campi non sono presenti manteniamo i valori esistendi
updated_username = username if username is not None else existing_username
updated_email = email if email is not None else existing_email
# processo per la password qualora presente. viene hashata con bcrypt
# se non presente si mantiene la password esistente
if new_password is not None and len(new_password) > 0:
hashed_pw = bcrypt.hashpw(new_password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
else:
hashed_pw = existing_hashed_pw
# andiamo ad aggiornare il DB
try:
c.execute("""
UPDATE users
SET username = ?, password = ?, email = ?
WHERE id = ?
""", (updated_username, hashed_pw, updated_email, user_id))
conn.commit()
except sqlite3.IntegrityError as e:
# gestiamo errori per violazioni di tipo UNIQUE su username o email
error_response = json.dumps({"error": f"Violazione di unicità: {str(e)}"}).encode("utf-8")
set_headers(handler, 400, error_response)
handler.wfile.write(error_response)
return
updated_user = {
"id": user_id,
"username": updated_username,
"email": updated_email,
# rimosso la password per motivi di sicurezza anche se hashata
# "password": hashed_pw
}
response_data = json.dumps(updated_user).encode("utf-8")
set_headers(handler, 200, response_data)
handler.wfile.write(response_data)
[documenti]
def handle_delete_user(handler, user_id):
"""DELETE /users/<id> - Elimina l'utente dal DB."""
try:
user_id = int(user_id)
except ValueError:
error_response = json.dumps({"error": "Invalid ID"}).encode("utf-8")
set_headers(handler, 400, error_response)
handler.wfile.write(error_response)
return
with get_connection() as conn:
c = conn.cursor()
# check per vedere se l'utente esiste
c.execute("SELECT id FROM users WHERE id = ?", (user_id,))
row = c.fetchone()
if not row:
error_response = json.dumps({"error": "User not found"}).encode("utf-8")
set_headers(handler, 404, error_response)
handler.wfile.write(error_response)
return
c.execute("DELETE FROM users WHERE id = ?", (user_id,))
conn.commit()
response_data = json.dumps({"message": f"User {user_id} deleted"}).encode("utf-8")
set_headers(handler, 200, response_data)
handler.wfile.write(response_data)