Files
devweb/old.py
2026-01-07 19:53:32 +00:00

725 lines
24 KiB
Python

import os
import uuid
import datetime
import hashlib
import hmac
from pathlib import Path
from functools import wraps
import mysql.connector
from flask import Flask, request, render_template_string, redirect, url_for, session, send_from_directory, abort, g
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from flask_wtf.csrf import CSRFProtect, CSRFError
from werkzeug.security import check_password_hash, generate_password_hash
from werkzeug.middleware.proxy_fix import ProxyFix
import magic
app = Flask(__name__)
# --- CONFIGURATION SÉCURISÉE ---
def load_secret(env_var_name, file_suffix='', default=None):
"""Charge un secret depuis un fichier ou une variable d'environnement"""
file_var = f"{env_var_name}_FILE"
if file_var in os.environ:
secret_file = os.environ[file_var]
if os.path.exists(secret_file):
with open(secret_file, 'r') as f:
return f.read().strip()
if env_var_name in os.environ:
return os.environ[env_var_name]
if default:
return default
raise ValueError(f"Secret {env_var_name} non trouvé")
# Secrets
app.secret_key = load_secret('SECRET_KEY')
ADMIN_USER = os.environ.get('ADMIN_DW_USER')
ADMIN_PASS = load_secret('ADMIN_DW_PASS')
FLAG_DEVWEB = load_secret('FLAG_DEVWEB')
DB_PASSWORD = load_secret('DB_DW_PASS')
# Configuration sécurisée
app.config.update(
SESSION_COOKIE_HTTPONLY=True,
SESSION_COOKIE_SECURE=True,
SESSION_COOKIE_SAMESITE='Lax',
MAX_CONTENT_LENGTH=16 * 1024 * 1024, # 16MB max
PERMANENT_SESSION_LIFETIME=datetime.timedelta(hours=1),
# Protection contre les attaques de session
SESSION_REFRESH_EACH_REQUEST=True
)
# Configuration upload
UPLOAD_FOLDER = Path('/app/uploads')
UPLOAD_FOLDER.mkdir(parents=True, exist_ok=True)
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'webp'}
ALLOWED_MIMETYPES = {
'image/png',
'image/jpeg',
'image/gif',
'image/webp'
}
# --- SÉCURITÉ : MIDDLEWARE TRAEFIK ---
# Configure Flask pour faire confiance à Traefik comme proxy
app.wsgi_app = ProxyFix(
app.wsgi_app,
x_for=1, # Nombre de proxies pour X-Forwarded-For
x_proto=1, # Pour X-Forwarded-Proto
x_host=1, # Pour X-Forwarded-Host
x_prefix=1 # Pour X-Forwarded-Prefix
)
# --- SÉCURITÉ : CSRF PROTECTION ---
csrf = CSRFProtect(app)
# --- SÉCURITÉ : RATE LIMITING ---
limiter = Limiter(
app=app,
key_func=get_remote_address,
default_limits=["200 per day", "50 per hour"],
storage_uri="memory://"
)
# --- FONCTIONS UTILITAIRES SÉCURISÉES ---
def get_real_ip():
"""Récupère l'IP réelle derrière le proxy Traefik"""
return request.remote_addr or '0.0.0.0'
def validate_image_file(file_stream):
"""
Validation stricte du fichier image avec python-magic
Vérifie le MIME type réel du fichier, pas juste l'extension
"""
try:
# Lit les premiers 2048 bytes pour la détection
header = file_stream.read(2048)
file_stream.seek(0) # Rewind
# Utilise python-magic pour détecter le vrai type MIME
mime = magic.from_buffer(header, mime=True)
# Vérifie que c'est bien une image autorisée
if mime not in ALLOWED_MIMETYPES:
return False, f"Type MIME non autorisé: {mime}"
# Vérification supplémentaire des magic bytes
if mime == 'image/jpeg' and not header.startswith(b'\xFF\xD8\xFF'):
return False, "Magic bytes JPEG invalides"
elif mime == 'image/png' and not header.startswith(b'\x89PNG\r\n\x1a\n'):
return False, "Magic bytes PNG invalides"
elif mime == 'image/gif' and not (header.startswith(b'GIF87a') or header.startswith(b'GIF89a')):
return False, "Magic bytes GIF invalides"
elif mime == 'image/webp' and not header.startswith(b'RIFF') and b'WEBP' not in header[:20]:
return False, "Magic bytes WEBP invalides"
return True, mime
except Exception as e:
return False, f"Erreur validation: {str(e)}"
def allowed_file(filename):
"""Vérifie que l'extension est autorisée"""
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def sanitize_filename(filename):
"""Nettoie le nom de fichier pour éviter les attaques path traversal"""
# Garde seulement les caractères alphanumériques et le point
return "".join(c for c in filename if c.isalnum() or c in "._-").rstrip()
def get_db_connection():
"""Crée une connexion à la base de données"""
return mysql.connector.connect(
host=os.environ.get('DB_HOST'),
user=os.environ.get('DB_DW_USER'),
password=DB_PASSWORD,
database='ForumDB',
charset='utf8mb4',
collation='utf8mb4_unicode_ci',
# Sécurité : timeout pour éviter les connexions qui traînent
connection_timeout=10
)
def init_db():
"""Initialise la base de données"""
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS posts (
id INT AUTO_INCREMENT PRIMARY KEY,
content TEXT,
image_filename VARCHAR(255),
ip_address VARCHAR(50),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_ip_created (ip_address, created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
''')
conn.commit()
cursor.close()
conn.close()
except Exception as e:
app.logger.error(f"Erreur init DB: {e}")
# --- DÉCORATEURS DE SÉCURITÉ ---
def login_required(f):
"""Décorateur pour protéger les routes admin"""
@wraps(f)
def decorated_function(*args, **kwargs):
if not session.get('logged_in'):
return redirect(url_for('login'))
return f(*args, **kwargs)
return decorated_function
# --- GESTION DES ERREURS ---
@app.errorhandler(CSRFError)
def handle_csrf_error(e):
"""Gestion des erreurs CSRF"""
return render_template_string('''
<!doctype html>
<html><body>
<h1>Erreur de sécurité</h1>
<p>Votre session a expiré. Veuillez <a href="/">rafraîchir la page</a>.</p>
</body></html>
'''), 400
@app.errorhandler(429)
def ratelimit_handler(e):
"""Gestion du rate limiting"""
return render_template_string('''
<!doctype html>
<html><body>
<h1>Trop de requêtes</h1>
<p>Vous avez dépassé la limite. Réessayez dans quelques minutes.</p>
</body></html>
'''), 429
# --- ROUTES ---
@app.route('/', methods=['GET', 'POST'])
@limiter.limit("10 per minute", methods=["POST"])
def index():
"""Page principale avec formulaire de post"""
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
error = None
success = None
if request.method == 'POST':
client_ip = get_real_ip()
# Vérification Anti-Spam
cursor.execute(
"SELECT created_at FROM posts WHERE ip_address = %s ORDER BY created_at DESC LIMIT 1",
(client_ip,)
)
last_post = cursor.fetchone()
can_post = True
if last_post:
time_since_last = datetime.datetime.now() - last_post['created_at']
if time_since_last.total_seconds() < 60:
error = "Hé ho ! Une minute entre chaque post svp."
can_post = False
if can_post:
content = request.form.get('content', '').strip()
file = request.files.get('image')
# Validation du contenu
if content and len(content) > 5000:
error = "Message trop long (max 5000 caractères)"
can_post = False
filename = None
if can_post and file and file.filename:
original_filename = sanitize_filename(file.filename)
if allowed_file(original_filename):
# VALIDATION STRICTE DU CONTENU
is_valid, validation_result = validate_image_file(file.stream)
if is_valid:
ext = original_filename.rsplit('.', 1)[1].lower()
# Nom de fichier sécurisé avec UUID
filename = f"{uuid.uuid4()}.{ext}"
filepath = UPLOAD_FOLDER / filename
try:
file.save(str(filepath))
# Vérifie que le fichier a bien été écrit
if not filepath.exists():
error = "Erreur lors de la sauvegarde du fichier"
can_post = False
filename = None
except Exception as e:
app.logger.error(f"Erreur sauvegarde fichier: {e}")
error = "Erreur lors de l'upload"
can_post = False
filename = None
else:
error = f"Fichier non valide: {validation_result}"
can_post = False
else:
error = "Extension de fichier non autorisée"
can_post = False
if can_post and (content or filename):
try:
cursor.execute(
"INSERT INTO posts (content, image_filename, ip_address) VALUES (%s, %s, %s)",
(content, filename, client_ip)
)
conn.commit()
success = "Message posté avec succès !"
except Exception as e:
app.logger.error(f"Erreur insertion DB: {e}")
error = "Erreur lors de la publication"
# Nettoie le fichier uploadé en cas d'erreur DB
if filename:
try:
(UPLOAD_FOLDER / filename).unlink(missing_ok=True)
except:
pass
elif can_post:
error = "Veuillez saisir un message ou une image"
# Récupération des posts avec limite
cursor.execute("SELECT * FROM posts ORDER BY created_at DESC LIMIT 50")
posts = cursor.fetchall()
cursor.close()
conn.close()
# Template avec auto-escape activé par défaut
html = """
<!doctype html>
<html lang="fr">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Team M - Mini Forum</title>
<style>
* { margin: 0; padding: 0; box-sizing: border-box; }
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', sans-serif;
max-width: 900px;
margin: 0 auto;
padding: 20px;
background: #f5f5f5;
color: #333;
}
nav {
background: white;
padding: 15px;
margin-bottom: 20px;
border-radius: 8px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
display: flex;
justify-content: space-between;
align-items: center;
}
nav a {
color: #0066cc;
text-decoration: none;
margin-right: 15px;
font-weight: 500;
}
nav a:hover { text-decoration: underline; }
.flag {
background: #fff3cd;
padding: 8px 12px;
border-radius: 4px;
font-family: monospace;
color: #856404;
}
h1 {
color: #2c3e50;
margin-bottom: 20px;
font-size: 2em;
}
.alert {
padding: 12px 16px;
margin-bottom: 20px;
border-radius: 6px;
font-weight: 500;
}
.error {
background: #f8d7da;
color: #721c24;
border: 1px solid #f5c6cb;
}
.success {
background: #d4edda;
color: #155724;
border: 1px solid #c3e6cb;
}
.form-container {
background: white;
padding: 20px;
border-radius: 8px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
margin-bottom: 30px;
}
textarea, input[type="file"] {
width: 100%;
padding: 12px;
margin-bottom: 12px;
border: 1px solid #ddd;
border-radius: 6px;
font-family: inherit;
font-size: 14px;
}
textarea {
resize: vertical;
min-height: 100px;
}
button {
background: #0066cc;
color: white;
border: none;
padding: 12px 24px;
border-radius: 6px;
cursor: pointer;
font-size: 16px;
font-weight: 500;
transition: background 0.2s;
}
button:hover { background: #0052a3; }
button:active { transform: scale(0.98); }
.post {
background: white;
padding: 20px;
margin-bottom: 15px;
border-radius: 8px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
}
.meta {
color: #666;
font-size: 0.85em;
margin-bottom: 12px;
padding-bottom: 8px;
border-bottom: 1px solid #eee;
}
.post-content {
line-height: 1.6;
margin-bottom: 12px;
word-wrap: break-word;
}
.post img {
max-width: 100%;
height: auto;
border-radius: 6px;
margin-top: 12px;
display: block;
}
</style>
</head>
<body>
<nav>
<div>
<a href="/">🏠 Accueil</a>
{% if session.get('logged_in') %}
<a href="/logout">🚪 Déconnexion</a>
{% else %}
<a href="/login">🔐 Connexion Admin</a>
{% endif %}
</div>
{% if session.get('logged_in') %}
<div class="flag">🚩 {{ flag }}</div>
{% endif %}
</nav>
<h1>📝 Le Forum de la Team M</h1>
{% if error %}
<div class="alert error">❌ {{ error }}</div>
{% endif %}
{% if success %}
<div class="alert success">✅ {{ success }}</div>
{% endif %}
<div class="form-container">
<form method="post" enctype="multipart/form-data">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<textarea
name="content"
placeholder="Partagez vos pensées avec la Team M..."
maxlength="5000"
aria-label="Contenu du message"
></textarea>
<input
type="file"
name="image"
accept=".png,.jpg,.jpeg,.gif,.webp"
aria-label="Image à uploader"
>
<button type="submit">📤 Envoyer</button>
</form>
</div>
<hr style="border: none; border-top: 2px solid #eee; margin: 30px 0;">
{% if posts %}
{% for post in posts %}
<div class="post">
<div class="meta">
👤 <b>{{ post.ip_address }}</b> •
📅 {{ post.created_at.strftime('%d/%m/%Y à %H:%M') }}
</div>
{% if post.content %}
<div class="post-content">{{ post.content }}</div>
{% endif %}
{% if post.image_filename %}
<img
src="{{ url_for('uploaded_file', filename=post.image_filename) }}"
alt="Image postée"
loading="lazy"
>
{% endif %}
</div>
{% endfor %}
{% else %}
<div class="post">
<p style="text-align: center; color: #999;">
Aucun message pour le moment. Soyez le premier à poster ! 🎉
</p>
</div>
{% endif %}
</body>
</html>
"""
return render_template_string(
html,
posts=posts,
error=error,
success=success,
session=session,
flag=FLAG_DEVWEB if session.get('logged_in') else ""
)
@app.route('/uploads/<path:filename>')
@limiter.limit("100 per minute")
def uploaded_file(filename):
"""Sert les fichiers uploadés de manière sécurisée"""
# Sécurité : empêche le path traversal
safe_filename = sanitize_filename(filename)
if safe_filename != filename or '..' in filename or '/' in filename:
abort(400)
filepath = UPLOAD_FOLDER / safe_filename
if not filepath.exists() or not filepath.is_file():
abort(404)
# Vérifie que le fichier est bien dans le dossier uploads
try:
filepath.resolve().relative_to(UPLOAD_FOLDER.resolve())
except ValueError:
abort(403)
return send_from_directory(UPLOAD_FOLDER, safe_filename)
@app.route('/login', methods=['GET', 'POST'])
@limiter.limit("5 per minute", methods=["POST"])
def login():
"""Page de connexion admin avec rate limiting strict"""
if session.get('logged_in'):
return redirect(url_for('index'))
error = None
if request.method == 'POST':
username = request.form.get('username', '')
password = request.form.get('password', '')
# Comparaison à temps constant pour éviter les timing attacks
username_match = hmac.compare_digest(username, ADMIN_USER)
password_match = hmac.compare_digest(password, ADMIN_PASS)
if username_match and password_match:
# Régénère l'ID de session pour éviter la fixation
session.clear()
session['logged_in'] = True
session.permanent = True
# Log de connexion réussie
app.logger.info(f"Connexion admin réussie depuis {get_real_ip()}")
return redirect(url_for('index'))
else:
error = "Identifiants incorrects"
# Log de tentative échouée
app.logger.warning(f"Tentative de connexion échouée depuis {get_real_ip()}")
html = """
<!doctype html>
<html lang="fr">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Connexion Admin</title>
<style>
* { margin: 0; padding: 0; box-sizing: border-box; }
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', sans-serif;
display: flex;
justify-content: center;
align-items: center;
min-height: 100vh;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
}
.login-container {
background: white;
padding: 40px;
border-radius: 12px;
box-shadow: 0 10px 40px rgba(0,0,0,0.2);
width: 100%;
max-width: 400px;
}
h1 {
color: #333;
margin-bottom: 30px;
text-align: center;
}
.error {
background: #f8d7da;
color: #721c24;
padding: 12px;
border-radius: 6px;
margin-bottom: 20px;
border: 1px solid #f5c6cb;
}
.form-group {
margin-bottom: 20px;
}
label {
display: block;
margin-bottom: 8px;
color: #555;
font-weight: 500;
}
input[type="text"],
input[type="password"] {
width: 100%;
padding: 12px;
border: 1px solid #ddd;
border-radius: 6px;
font-size: 16px;
transition: border-color 0.2s;
}
input:focus {
outline: none;
border-color: #667eea;
}
button {
width: 100%;
background: #667eea;
color: white;
border: none;
padding: 14px;
border-radius: 6px;
font-size: 16px;
font-weight: 600;
cursor: pointer;
transition: background 0.2s;
}
button:hover {
background: #5568d3;
}
.back-link {
display: block;
text-align: center;
margin-top: 20px;
color: #667eea;
text-decoration: none;
}
.back-link:hover {
text-decoration: underline;
}
</style>
</head>
<body>
<div class="login-container">
<h1>🔐 Connexion Admin</h1>
{% if error %}
<div class="error">❌ {{ error }}</div>
{% endif %}
<form method="post">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<div class="form-group">
<label for="username">Nom d'utilisateur</label>
<input
type="text"
id="username"
name="username"
required
autocomplete="username"
autofocus
>
</div>
<div class="form-group">
<label for="password">Mot de passe</label>
<input
type="password"
id="password"
name="password"
required
autocomplete="current-password"
>
</div>
<button type="submit">Se connecter</button>
</form>
<a href="/" class="back-link">← Retour à l'accueil</a>
</div>
</body>
</html>
"""
return render_template_string(html, error=error)
@app.route('/logout')
def logout():
"""Déconnexion sécurisée"""
session.clear()
return redirect(url_for('index'))
@app.route('/health')
@csrf.exempt # Le healthcheck n'a pas besoin de CSRF
def health():
"""Endpoint de healthcheck pour Docker"""
try:
# Vérifie que la DB est accessible
conn = get_db_connection()
conn.close()
return {'status': 'healthy'}, 200
except Exception as e:
app.logger.error(f"Healthcheck failed: {e}")
return {'status': 'unhealthy', 'error': str(e)}, 503
# --- INITIALISATION ---
if __name__ == '__main__':
# Initialise la DB au démarrage
init_db()
# En production, utiliser gunicorn ou uwsgi, pas le serveur de dev Flask
# Mais pour le CTF, le serveur de dev peut suffire avec debug=False
app.run(
host='0.0.0.0',
port=80,
debug=False,
threaded=True
)