From 851a99ebacc8dcf32aa471fba5988930b6a413df Mon Sep 17 00:00:00 2001 From: los_gringos Date: Wed, 7 Jan 2026 19:53:32 +0000 Subject: [PATCH] V1.3 --- app.py | 60 ++++- old.py | 724 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 774 insertions(+), 10 deletions(-) create mode 100644 old.py diff --git a/app.py b/app.py index a4693a8..2604951 100644 --- a/app.py +++ b/app.py @@ -44,7 +44,7 @@ DB_PASSWORD = load_secret('DB_DW_PASS') app.config.update( SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SECURE=True, - SESSION_COOKIE_SAMESITE='Lax', + SESSION_COOKIE_SAMESITE='Strict', # ← FIX: Strict au lieu de Lax MAX_CONTENT_LENGTH=16 * 1024 * 1024, # 16MB max PERMANENT_SESSION_LIFETIME=datetime.timedelta(hours=1), # Protection contre les attaques de session @@ -83,12 +83,31 @@ limiter = Limiter( storage_uri="memory://" ) +# --- SÉCURITÉ : HEADERS --- +@app.after_request +def set_security_headers(response): + """Ajoute les headers de sécurité""" + response.headers['Content-Security-Policy'] = ( + "default-src 'self'; " + "img-src 'self'; " + "style-src 'self' 'unsafe-inline'; " + "script-src 'self'" + ) + response.headers['X-Content-Type-Options'] = 'nosniff' + response.headers['X-Frame-Options'] = 'DENY' + response.headers['X-XSS-Protection'] = '1; mode=block' + return response + # --- FONCTIONS UTILITAIRES SÉCURISÉES --- def get_real_ip(): """Récupère l'IP réelle derrière le proxy Traefik""" return request.remote_addr or '0.0.0.0' +def hash_ip(ip): + """Hash l'IP pour anonymisation (RGPD-friendly)""" + return hashlib.sha256(f"{ip}{app.secret_key}".encode()).hexdigest()[:8] + def validate_image_file(file_stream): """ Validation stricte du fichier image avec python-magic @@ -212,11 +231,12 @@ def index(): if request.method == 'POST': client_ip = get_real_ip() + hashed_ip = hash_ip(client_ip) # ← FIX: Hash l'IP # Vérification Anti-Spam cursor.execute( "SELECT created_at FROM posts WHERE ip_address = %s ORDER BY created_at DESC LIMIT 1", - (client_ip,) + (hashed_ip,) ) last_post = cursor.fetchone() @@ -273,7 +293,7 @@ def index(): try: cursor.execute( "INSERT INTO posts (content, image_filename, ip_address) VALUES (%s, %s, %s)", - (content, filename, client_ip) + (content, filename, hashed_ip) # ← FIX: Utilise hashed_ip ) conn.commit() success = "Message posté avec succès !" @@ -330,6 +350,23 @@ def index(): font-weight: 500; } nav a:hover { text-decoration: underline; } + nav form { + display: inline; + margin: 0; + } + nav button { + background: none; + border: none; + color: #0066cc; + cursor: pointer; + font-weight: 500; + font-size: inherit; + font-family: inherit; + padding: 0; + } + nav button:hover { + text-decoration: underline; + } .flag { background: #fff3cd; padding: 8px 12px; @@ -378,7 +415,7 @@ def index(): resize: vertical; min-height: 100px; } - button { + button[type="submit"] { background: #0066cc; color: white; border: none; @@ -389,8 +426,8 @@ def index(): font-weight: 500; transition: background 0.2s; } - button:hover { background: #0052a3; } - button:active { transform: scale(0.98); } + button[type="submit"]:hover { background: #0052a3; } + button[type="submit"]:active { transform: scale(0.98); } .post { background: white; padding: 20px; @@ -424,7 +461,10 @@ def index():
🏠 Accueil {% if session.get('logged_in') %} - 🚪 Déconnexion +
+ + +
{% else %} 🔐 Connexion Admin {% endif %} @@ -469,11 +509,11 @@ def index(): {% for post in posts %}
- 👤 {{ post.ip_address }} • + 👤 User-{{ post.ip_address }} • 📅 {{ post.created_at.strftime('%d/%m/%Y à %H:%M') }}
{% if post.content %} -
{{ post.content }}
+
{{ post.content | e }}
{% endif %} {% if post.image_filename %} + +

Erreur de sécurité

+

Votre session a expiré. Veuillez rafraîchir la page.

+ + '''), 400 + +@app.errorhandler(429) +def ratelimit_handler(e): + """Gestion du rate limiting""" + return render_template_string(''' + + +

Trop de requêtes

+

Vous avez dépassé la limite. Réessayez dans quelques minutes.

+ + '''), 429 + +# --- ROUTES --- + +@app.route('/', methods=['GET', 'POST']) +@limiter.limit("10 per minute", methods=["POST"]) +def index(): + """Page principale avec formulaire de post""" + conn = get_db_connection() + cursor = conn.cursor(dictionary=True) + error = None + success = None + + if request.method == 'POST': + client_ip = get_real_ip() + + # Vérification Anti-Spam + cursor.execute( + "SELECT created_at FROM posts WHERE ip_address = %s ORDER BY created_at DESC LIMIT 1", + (client_ip,) + ) + last_post = cursor.fetchone() + + can_post = True + if last_post: + time_since_last = datetime.datetime.now() - last_post['created_at'] + if time_since_last.total_seconds() < 60: + error = "Hé ho ! Une minute entre chaque post svp." + can_post = False + + if can_post: + content = request.form.get('content', '').strip() + file = request.files.get('image') + + # Validation du contenu + if content and len(content) > 5000: + error = "Message trop long (max 5000 caractères)" + can_post = False + + filename = None + if can_post and file and file.filename: + original_filename = sanitize_filename(file.filename) + + if allowed_file(original_filename): + # VALIDATION STRICTE DU CONTENU + is_valid, validation_result = validate_image_file(file.stream) + + if is_valid: + ext = original_filename.rsplit('.', 1)[1].lower() + # Nom de fichier sécurisé avec UUID + filename = f"{uuid.uuid4()}.{ext}" + filepath = UPLOAD_FOLDER / filename + + try: + file.save(str(filepath)) + # Vérifie que le fichier a bien été écrit + if not filepath.exists(): + error = "Erreur lors de la sauvegarde du fichier" + can_post = False + filename = None + except Exception as e: + app.logger.error(f"Erreur sauvegarde fichier: {e}") + error = "Erreur lors de l'upload" + can_post = False + filename = None + else: + error = f"Fichier non valide: {validation_result}" + can_post = False + else: + error = "Extension de fichier non autorisée" + can_post = False + + if can_post and (content or filename): + try: + cursor.execute( + "INSERT INTO posts (content, image_filename, ip_address) VALUES (%s, %s, %s)", + (content, filename, client_ip) + ) + conn.commit() + success = "Message posté avec succès !" + except Exception as e: + app.logger.error(f"Erreur insertion DB: {e}") + error = "Erreur lors de la publication" + # Nettoie le fichier uploadé en cas d'erreur DB + if filename: + try: + (UPLOAD_FOLDER / filename).unlink(missing_ok=True) + except: + pass + elif can_post: + error = "Veuillez saisir un message ou une image" + + # Récupération des posts avec limite + cursor.execute("SELECT * FROM posts ORDER BY created_at DESC LIMIT 50") + posts = cursor.fetchall() + cursor.close() + conn.close() + + # Template avec auto-escape activé par défaut + html = """ + + + + + + Team M - Mini Forum + + + + + +

📝 Le Forum de la Team M

+ + {% if error %} +
❌ {{ error }}
+ {% endif %} + + {% if success %} +
✅ {{ success }}
+ {% endif %} + +
+
+ + + + +
+
+ +
+ + {% if posts %} + {% for post in posts %} +
+
+ 👤 {{ post.ip_address }} • + 📅 {{ post.created_at.strftime('%d/%m/%Y à %H:%M') }} +
+ {% if post.content %} +
{{ post.content }}
+ {% endif %} + {% if post.image_filename %} + Image postée + {% endif %} +
+ {% endfor %} + {% else %} +
+

+ Aucun message pour le moment. Soyez le premier à poster ! 🎉 +

+
+ {% endif %} + + + """ + + return render_template_string( + html, + posts=posts, + error=error, + success=success, + session=session, + flag=FLAG_DEVWEB if session.get('logged_in') else "" + ) + +@app.route('/uploads/') +@limiter.limit("100 per minute") +def uploaded_file(filename): + """Sert les fichiers uploadés de manière sécurisée""" + # Sécurité : empêche le path traversal + safe_filename = sanitize_filename(filename) + if safe_filename != filename or '..' in filename or '/' in filename: + abort(400) + + filepath = UPLOAD_FOLDER / safe_filename + if not filepath.exists() or not filepath.is_file(): + abort(404) + + # Vérifie que le fichier est bien dans le dossier uploads + try: + filepath.resolve().relative_to(UPLOAD_FOLDER.resolve()) + except ValueError: + abort(403) + + return send_from_directory(UPLOAD_FOLDER, safe_filename) + +@app.route('/login', methods=['GET', 'POST']) +@limiter.limit("5 per minute", methods=["POST"]) +def login(): + """Page de connexion admin avec rate limiting strict""" + if session.get('logged_in'): + return redirect(url_for('index')) + + error = None + + if request.method == 'POST': + username = request.form.get('username', '') + password = request.form.get('password', '') + + # Comparaison à temps constant pour éviter les timing attacks + username_match = hmac.compare_digest(username, ADMIN_USER) + password_match = hmac.compare_digest(password, ADMIN_PASS) + + if username_match and password_match: + # Régénère l'ID de session pour éviter la fixation + session.clear() + session['logged_in'] = True + session.permanent = True + + # Log de connexion réussie + app.logger.info(f"Connexion admin réussie depuis {get_real_ip()}") + + return redirect(url_for('index')) + else: + error = "Identifiants incorrects" + # Log de tentative échouée + app.logger.warning(f"Tentative de connexion échouée depuis {get_real_ip()}") + + html = """ + + + + + + Connexion Admin + + + + + + + """ + + return render_template_string(html, error=error) + +@app.route('/logout') +def logout(): + """Déconnexion sécurisée""" + session.clear() + return redirect(url_for('index')) + +@app.route('/health') +@csrf.exempt # Le healthcheck n'a pas besoin de CSRF +def health(): + """Endpoint de healthcheck pour Docker""" + try: + # Vérifie que la DB est accessible + conn = get_db_connection() + conn.close() + return {'status': 'healthy'}, 200 + except Exception as e: + app.logger.error(f"Healthcheck failed: {e}") + return {'status': 'unhealthy', 'error': str(e)}, 503 + +# --- INITIALISATION --- + +if __name__ == '__main__': + # Initialise la DB au démarrage + init_db() + + # En production, utiliser gunicorn ou uwsgi, pas le serveur de dev Flask + # Mais pour le CTF, le serveur de dev peut suffire avec debug=False + app.run( + host='0.0.0.0', + port=80, + debug=False, + threaded=True + )