diff --git a/old.py b/old.py deleted file mode 100644 index a4693a8..0000000 --- a/old.py +++ /dev/null @@ -1,724 +0,0 @@ -import os -import uuid -import datetime -import hashlib -import hmac -from pathlib import Path -from functools import wraps - -import mysql.connector -from flask import Flask, request, render_template_string, redirect, url_for, session, send_from_directory, abort, g -from flask_limiter import Limiter -from flask_limiter.util import get_remote_address -from flask_wtf.csrf import CSRFProtect, CSRFError -from werkzeug.security import check_password_hash, generate_password_hash -from werkzeug.middleware.proxy_fix import ProxyFix -import magic - -app = Flask(__name__) - -# --- CONFIGURATION SÉCURISÉE --- - -def load_secret(env_var_name, file_suffix='', default=None): - """Charge un secret depuis un fichier ou une variable d'environnement""" - file_var = f"{env_var_name}_FILE" - if file_var in os.environ: - secret_file = os.environ[file_var] - if os.path.exists(secret_file): - with open(secret_file, 'r') as f: - return f.read().strip() - if env_var_name in os.environ: - return os.environ[env_var_name] - if default: - return default - raise ValueError(f"Secret {env_var_name} non trouvé") - -# Secrets -app.secret_key = load_secret('SECRET_KEY') -ADMIN_USER = os.environ.get('ADMIN_DW_USER') -ADMIN_PASS = load_secret('ADMIN_DW_PASS') -FLAG_DEVWEB = load_secret('FLAG_DEVWEB') -DB_PASSWORD = load_secret('DB_DW_PASS') - -# Configuration sécurisée -app.config.update( - SESSION_COOKIE_HTTPONLY=True, - SESSION_COOKIE_SECURE=True, - SESSION_COOKIE_SAMESITE='Lax', - MAX_CONTENT_LENGTH=16 * 1024 * 1024, # 16MB max - PERMANENT_SESSION_LIFETIME=datetime.timedelta(hours=1), - # Protection contre les attaques de session - SESSION_REFRESH_EACH_REQUEST=True -) - -# Configuration upload -UPLOAD_FOLDER = Path('/app/uploads') -UPLOAD_FOLDER.mkdir(parents=True, exist_ok=True) -ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'webp'} -ALLOWED_MIMETYPES = { - 'image/png', - 'image/jpeg', - 'image/gif', - 'image/webp' -} - -# --- SÉCURITÉ : MIDDLEWARE TRAEFIK --- -# Configure Flask pour faire confiance à Traefik comme proxy -app.wsgi_app = ProxyFix( - app.wsgi_app, - x_for=1, # Nombre de proxies pour X-Forwarded-For - x_proto=1, # Pour X-Forwarded-Proto - x_host=1, # Pour X-Forwarded-Host - x_prefix=1 # Pour X-Forwarded-Prefix -) - -# --- SÉCURITÉ : CSRF PROTECTION --- -csrf = CSRFProtect(app) - -# --- SÉCURITÉ : RATE LIMITING --- -limiter = Limiter( - app=app, - key_func=get_remote_address, - default_limits=["200 per day", "50 per hour"], - storage_uri="memory://" -) - -# --- FONCTIONS UTILITAIRES SÉCURISÉES --- - -def get_real_ip(): - """Récupère l'IP réelle derrière le proxy Traefik""" - return request.remote_addr or '0.0.0.0' - -def validate_image_file(file_stream): - """ - Validation stricte du fichier image avec python-magic - Vérifie le MIME type réel du fichier, pas juste l'extension - """ - try: - # Lit les premiers 2048 bytes pour la détection - header = file_stream.read(2048) - file_stream.seek(0) # Rewind - - # Utilise python-magic pour détecter le vrai type MIME - mime = magic.from_buffer(header, mime=True) - - # Vérifie que c'est bien une image autorisée - if mime not in ALLOWED_MIMETYPES: - return False, f"Type MIME non autorisé: {mime}" - - # Vérification supplémentaire des magic bytes - if mime == 'image/jpeg' and not header.startswith(b'\xFF\xD8\xFF'): - return False, "Magic bytes JPEG invalides" - elif mime == 'image/png' and not header.startswith(b'\x89PNG\r\n\x1a\n'): - return False, "Magic bytes PNG invalides" - elif mime == 'image/gif' and not (header.startswith(b'GIF87a') or header.startswith(b'GIF89a')): - return False, "Magic bytes GIF invalides" - elif mime == 'image/webp' and not header.startswith(b'RIFF') and b'WEBP' not in header[:20]: - return False, "Magic bytes WEBP invalides" - - return True, mime - - except Exception as e: - return False, f"Erreur validation: {str(e)}" - -def allowed_file(filename): - """Vérifie que l'extension est autorisée""" - return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS - -def sanitize_filename(filename): - """Nettoie le nom de fichier pour éviter les attaques path traversal""" - # Garde seulement les caractères alphanumériques et le point - return "".join(c for c in filename if c.isalnum() or c in "._-").rstrip() - -def get_db_connection(): - """Crée une connexion à la base de données""" - return mysql.connector.connect( - host=os.environ.get('DB_HOST'), - user=os.environ.get('DB_DW_USER'), - password=DB_PASSWORD, - database='ForumDB', - charset='utf8mb4', - collation='utf8mb4_unicode_ci', - # Sécurité : timeout pour éviter les connexions qui traînent - connection_timeout=10 - ) - -def init_db(): - """Initialise la base de données""" - try: - conn = get_db_connection() - cursor = conn.cursor() - cursor.execute(''' - CREATE TABLE IF NOT EXISTS posts ( - id INT AUTO_INCREMENT PRIMARY KEY, - content TEXT, - image_filename VARCHAR(255), - ip_address VARCHAR(50), - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - INDEX idx_ip_created (ip_address, created_at) - ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci - ''') - conn.commit() - cursor.close() - conn.close() - except Exception as e: - app.logger.error(f"Erreur init DB: {e}") - -# --- DÉCORATEURS DE SÉCURITÉ --- - -def login_required(f): - """Décorateur pour protéger les routes admin""" - @wraps(f) - def decorated_function(*args, **kwargs): - if not session.get('logged_in'): - return redirect(url_for('login')) - return f(*args, **kwargs) - return decorated_function - -# --- GESTION DES ERREURS --- - -@app.errorhandler(CSRFError) -def handle_csrf_error(e): - """Gestion des erreurs CSRF""" - return render_template_string(''' - -
-Votre session a expiré. Veuillez rafraîchir la page.
- - '''), 400 - -@app.errorhandler(429) -def ratelimit_handler(e): - """Gestion du rate limiting""" - return render_template_string(''' - - -Vous avez dépassé la limite. Réessayez dans quelques minutes.
- - '''), 429 - -# --- ROUTES --- - -@app.route('/', methods=['GET', 'POST']) -@limiter.limit("10 per minute", methods=["POST"]) -def index(): - """Page principale avec formulaire de post""" - conn = get_db_connection() - cursor = conn.cursor(dictionary=True) - error = None - success = None - - if request.method == 'POST': - client_ip = get_real_ip() - - # Vérification Anti-Spam - cursor.execute( - "SELECT created_at FROM posts WHERE ip_address = %s ORDER BY created_at DESC LIMIT 1", - (client_ip,) - ) - last_post = cursor.fetchone() - - can_post = True - if last_post: - time_since_last = datetime.datetime.now() - last_post['created_at'] - if time_since_last.total_seconds() < 60: - error = "Hé ho ! Une minute entre chaque post svp." - can_post = False - - if can_post: - content = request.form.get('content', '').strip() - file = request.files.get('image') - - # Validation du contenu - if content and len(content) > 5000: - error = "Message trop long (max 5000 caractères)" - can_post = False - - filename = None - if can_post and file and file.filename: - original_filename = sanitize_filename(file.filename) - - if allowed_file(original_filename): - # VALIDATION STRICTE DU CONTENU - is_valid, validation_result = validate_image_file(file.stream) - - if is_valid: - ext = original_filename.rsplit('.', 1)[1].lower() - # Nom de fichier sécurisé avec UUID - filename = f"{uuid.uuid4()}.{ext}" - filepath = UPLOAD_FOLDER / filename - - try: - file.save(str(filepath)) - # Vérifie que le fichier a bien été écrit - if not filepath.exists(): - error = "Erreur lors de la sauvegarde du fichier" - can_post = False - filename = None - except Exception as e: - app.logger.error(f"Erreur sauvegarde fichier: {e}") - error = "Erreur lors de l'upload" - can_post = False - filename = None - else: - error = f"Fichier non valide: {validation_result}" - can_post = False - else: - error = "Extension de fichier non autorisée" - can_post = False - - if can_post and (content or filename): - try: - cursor.execute( - "INSERT INTO posts (content, image_filename, ip_address) VALUES (%s, %s, %s)", - (content, filename, client_ip) - ) - conn.commit() - success = "Message posté avec succès !" - except Exception as e: - app.logger.error(f"Erreur insertion DB: {e}") - error = "Erreur lors de la publication" - # Nettoie le fichier uploadé en cas d'erreur DB - if filename: - try: - (UPLOAD_FOLDER / filename).unlink(missing_ok=True) - except: - pass - elif can_post: - error = "Veuillez saisir un message ou une image" - - # Récupération des posts avec limite - cursor.execute("SELECT * FROM posts ORDER BY created_at DESC LIMIT 50") - posts = cursor.fetchall() - cursor.close() - conn.close() - - # Template avec auto-escape activé par défaut - html = """ - - - - - -- Aucun message pour le moment. Soyez le premier à poster ! 🎉 -
-