认证与授权机制实践教程:JWT、OAuth2.0与权限管理¶
学习目标¶
完成本教程后,你将能够:
- 理解认证(Authentication)和授权(Authorization)的区别
- 掌握JWT令牌的工作原理和使用方法
- 了解OAuth 2.0授权流程和应用场景
- 实现基于JWT的用户认证系统
- 设计和实现角色权限管理(RBAC)
- 掌握会话管理和安全最佳实践
- 实现密码加密和安全存储
- 处理常见的安全问题和攻击
前置要求¶
在开始本教程之前,你需要:
知识要求: - 了解HTTP协议和RESTful API基础 - 熟悉Python或Node.js编程 - 了解基本的数据库操作 - 理解Web应用的基本架构
技能要求: - 能够使用命令行工具 - 会使用Postman或curl测试API - 了解JSON数据格式 - 熟悉基本的加密概念
概述¶
认证和授权是Web应用安全的核心组成部分。无论是用户登录、API访问控制,还是第三方应用集成,都需要可靠的认证授权机制。
为什么学习认证与授权?
- 安全需求:保护用户数据和系统资源
- 行业标准:JWT和OAuth 2.0是主流标准
- 职业必备:后端开发的核心技能
- 实际应用:几乎所有应用都需要用户认证
- 合规要求:满足数据保护法规要求
背景知识¶
认证 vs 授权¶
认证(Authentication): - 验证用户身份:"你是谁?" - 确认用户是否是其声称的那个人 - 通常通过用户名密码、生物识别等方式
授权(Authorization): - 验证用户权限:"你能做什么?" - 确认用户是否有权限访问资源或执行操作 - 通常基于角色、权限或策略
示例:
常见认证方式¶
| 认证方式 | 说明 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|---|
| 基本认证 | HTTP Basic Auth | 简单 | 不安全 | 内部系统 |
| Session-Cookie | 服务器存储会话 | 成熟稳定 | 不适合分布式 | 传统Web应用 |
| Token认证 | JWT等令牌 | 无状态、可扩展 | 无法撤销 | 现代Web/移动应用 |
| OAuth 2.0 | 第三方授权 | 标准化、安全 | 复杂 | 第三方登录 |
| 生物识别 | 指纹、面部识别 | 便捷、安全 | 需要硬件支持 | 移动应用 |
安全威胁¶
常见攻击方式: - 暴力破解:尝试大量密码组合 - SQL注入:通过输入恶意SQL代码 - XSS攻击:跨站脚本攻击 - CSRF攻击:跨站请求伪造 - 中间人攻击:拦截通信数据 - 会话劫持:窃取会话令牌
核心概念¶
JWT (JSON Web Token)¶
JWT是一种开放标准(RFC 7519),用于在各方之间安全地传输信息。
JWT结构¶
JWT由三部分组成,用点(.)分隔:
示例JWT:
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.
eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.
SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
1. Header (头部)¶
包含令牌类型和签名算法:
alg: 签名算法(HS256、RS256等)typ: 令牌类型(JWT)
2. Payload (载荷)¶
包含声明(Claims),即要传输的数据:
标准声明:
- iss (issuer): 签发者
- sub (subject): 主题(通常是用户ID)
- aud (audience): 接收方
- exp (expiration): 过期时间
- nbf (not before): 生效时间
- iat (issued at): 签发时间
- jti (JWT ID): 唯一标识符
自定义声明:
{
"user_id": 123,
"username": "zhangsan",
"role": "admin",
"permissions": ["read", "write", "delete"]
}
3. Signature (签名)¶
用于验证令牌的完整性:
JWT工作流程¶
sequenceDiagram
participant Client as 客户端
participant Server as 服务器
Client->>Server: 1. 发送登录请求(用户名+密码)
Server->>Server: 2. 验证用户身份
Server->>Server: 3. 生成JWT令牌
Server->>Client: 4. 返回JWT令牌
Client->>Client: 5. 存储JWT令牌
Client->>Server: 6. 请求资源(携带JWT)
Server->>Server: 7. 验证JWT签名
Server->>Server: 8. 检查过期时间
Server->>Client: 9. 返回资源
JWT的优缺点¶
优点: - ✅ 无状态:服务器不需要存储会话 - ✅ 可扩展:适合分布式系统 - ✅ 跨域:支持跨域认证 - ✅ 移动友好:适合移动应用 - ✅ 性能好:减少数据库查询
缺点: - ❌ 无法撤销:令牌签发后无法主动失效 - ❌ 令牌较大:比Session ID大 - ❌ 安全风险:令牌泄露风险 - ❌ 续期复杂:需要刷新令牌机制
OAuth 2.0¶
OAuth 2.0是一个授权框架,允许第三方应用访问用户资源,而无需获取用户密码。
OAuth 2.0角色¶
- Resource Owner(资源所有者):用户
- Client(客户端):第三方应用
- Authorization Server(授权服务器):认证服务器
- Resource Server(资源服务器):API服务器
OAuth 2.0授权流程¶
授权码模式(最常用):
sequenceDiagram
participant User as 用户
participant Client as 客户端应用
participant AuthServer as 授权服务器
participant API as 资源服务器
User->>Client: 1. 点击"使用XX登录"
Client->>AuthServer: 2. 重定向到授权页面
AuthServer->>User: 3. 显示授权页面
User->>AuthServer: 4. 同意授权
AuthServer->>Client: 5. 返回授权码
Client->>AuthServer: 6. 用授权码换取令牌
AuthServer->>Client: 7. 返回访问令牌
Client->>API: 8. 使用令牌访问API
API->>Client: 9. 返回用户数据
OAuth 2.0授权模式¶
- 授权码模式(Authorization Code)
- 最安全的模式
- 适用于有后端的Web应用
-
授权码通过前端传递,令牌通过后端获取
-
隐式模式(Implicit)
- 简化的授权码模式
- 适用于纯前端应用
- 直接返回令牌,不返回授权码
-
安全性较低,不推荐使用
-
密码模式(Password)
- 用户直接提供用户名密码
- 适用于高度信任的应用
-
不推荐使用
-
客户端模式(Client Credentials)
- 客户端以自己的名义访问资源
- 适用于服务器间通信
- 不涉及用户授权
权限管理模型¶
RBAC (基于角色的访问控制)¶
用户 → 角色 → 权限 → 资源
graph LR
User[用户] --> Role[角色]
Role --> Permission[权限]
Permission --> Resource[资源]
User1[张三] --> Admin[管理员]
User2[李四] --> Editor[编辑]
User3[王五] --> Viewer[访客]
Admin --> P1[创建]
Admin --> P2[编辑]
Admin --> P3[删除]
Editor --> P2
Viewer --> P4[查看]
示例:
角色:管理员
权限:
- 用户管理:创建、编辑、删除用户
- 内容管理:创建、编辑、删除、发布文章
- 系统设置:修改系统配置
角色:编辑
权限:
- 内容管理:创建、编辑自己的文章
- 查看用户列表
角色:访客
权限:
- 查看已发布的文章
- 发表评论
ABAC (基于属性的访问控制)¶
基于用户属性、资源属性、环境属性等进行访问控制。
示例:
步骤1:实现用户注册和登录¶
1.1 准备工作¶
安装必要的依赖:
# 创建虚拟环境
python -m venv venv
# 激活虚拟环境
# Windows:
venv\Scripts\activate
# Linux/Mac:
source venv/bin/activate
# 安装依赖
pip install flask flask-cors pyjwt bcrypt python-dotenv
1.2 创建用户模型¶
创建 models.py 文件:
import bcrypt
from datetime import datetime
class User:
def __init__(self, id, username, email, password_hash, role='user', created_at=None):
self.id = id
self.username = username
self.email = email
self.password_hash = password_hash
self.role = role
self.created_at = created_at or datetime.now().isoformat()
@staticmethod
def hash_password(password):
"""哈希密码"""
salt = bcrypt.gensalt()
return bcrypt.hashpw(password.encode('utf-8'), salt).decode('utf-8')
def verify_password(self, password):
"""验证密码"""
return bcrypt.checkpw(
password.encode('utf-8'),
self.password_hash.encode('utf-8')
)
def to_dict(self, include_sensitive=False):
"""转换为字典"""
data = {
'id': self.id,
'username': self.username,
'email': self.email,
'role': self.role,
'created_at': self.created_at
}
if include_sensitive:
data['password_hash'] = self.password_hash
return data
代码说明:
- hash_password: 使用bcrypt加密密码
- verify_password: 验证密码是否正确
- to_dict: 转换为字典,默认不包含敏感信息
1.3 创建JWT工具类¶
创建 auth_utils.py 文件:
import jwt
from datetime import datetime, timedelta
from functools import wraps
from flask import request, jsonify
import os
# 从环境变量读取密钥
SECRET_KEY = os.getenv('JWT_SECRET_KEY', 'your-secret-key-change-in-production')
ALGORITHM = 'HS256'
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_DAYS = 7
def create_access_token(user_id, username, role):
"""创建访问令牌"""
payload = {
'user_id': user_id,
'username': username,
'role': role,
'type': 'access',
'exp': datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES),
'iat': datetime.utcnow()
}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def create_refresh_token(user_id):
"""创建刷新令牌"""
payload = {
'user_id': user_id,
'type': 'refresh',
'exp': datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS),
'iat': datetime.utcnow()
}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def decode_token(token):
"""解码令牌"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except jwt.ExpiredSignatureError:
raise Exception('令牌已过期')
except jwt.InvalidTokenError:
raise Exception('无效的令牌')
def token_required(f):
"""装饰器:要求提供有效的访问令牌"""
@wraps(f)
def decorated(*args, **kwargs):
token = None
# 从请求头获取令牌
if 'Authorization' in request.headers:
auth_header = request.headers['Authorization']
try:
token = auth_header.split(' ')[1] # Bearer <token>
except IndexError:
return jsonify({
'success': False,
'error': {
'code': 'INVALID_TOKEN',
'message': '令牌格式错误'
}
}), 401
if not token:
return jsonify({
'success': False,
'error': {
'code': 'MISSING_TOKEN',
'message': '缺少认证令牌'
}
}), 401
try:
# 解码令牌
payload = decode_token(token)
# 检查令牌类型
if payload.get('type') != 'access':
return jsonify({
'success': False,
'error': {
'code': 'INVALID_TOKEN_TYPE',
'message': '令牌类型错误'
}
}), 401
# 将用户信息传递给路由函数
return f(payload, *args, **kwargs)
except Exception as e:
return jsonify({
'success': False,
'error': {
'code': 'INVALID_TOKEN',
'message': str(e)
}
}), 401
return decorated
def role_required(required_role):
"""装饰器:要求特定角色"""
def decorator(f):
@wraps(f)
@token_required
def decorated(current_user, *args, **kwargs):
if current_user['role'] != required_role:
return jsonify({
'success': False,
'error': {
'code': 'INSUFFICIENT_PERMISSIONS',
'message': '权限不足'
}
}), 403
return f(current_user, *args, **kwargs)
return decorated
return decorator
代码说明:
- create_access_token: 创建短期访问令牌(30分钟)
- create_refresh_token: 创建长期刷新令牌(7天)
- decode_token: 解码并验证令牌
- token_required: 装饰器,保护需要认证的路由
- role_required: 装饰器,保护需要特定角色的路由
1.4 实现注册和登录API¶
创建 app.py 文件:
from flask import Flask, request, jsonify
from flask_cors import CORS
from models import User
from auth_utils import (
create_access_token,
create_refresh_token,
decode_token,
token_required,
role_required
)
import os
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()
app = Flask(__name__)
CORS(app)
# 模拟数据库
users = []
user_id_counter = 1
# 统一响应格式
def success_response(data, message="操作成功", status_code=200):
return jsonify({
"success": True,
"data": data,
"message": message
}), status_code
def error_response(message, code="ERROR", details=None, status_code=400):
response = {
"success": False,
"error": {
"code": code,
"message": message
}
}
if details:
response["error"]["details"] = details
return jsonify(response), status_code
# 用户注册
@app.route('/auth/register', methods=['POST'])
def register():
global user_id_counter
data = request.get_json()
# 验证必需字段
required_fields = ['username', 'email', 'password']
missing_fields = [f for f in required_fields if f not in data]
if missing_fields:
return error_response(
"缺少必需字段",
"VALIDATION_ERROR",
[{"field": f, "message": f"字段{f}是必需的"} for f in missing_fields],
status_code=400
)
# 验证密码强度
password = data['password']
if len(password) < 8:
return error_response(
"密码强度不足",
"WEAK_PASSWORD",
[{"field": "password", "message": "密码至少需要8个字符"}],
status_code=400
)
# 检查用户名是否已存在
if any(u.username == data['username'] for u in users):
return error_response(
"用户名已存在",
"USERNAME_EXISTS",
status_code=409
)
# 检查邮箱是否已存在
if any(u.email == data['email'] for u in users):
return error_response(
"邮箱已被注册",
"EMAIL_EXISTS",
status_code=409
)
# 创建新用户
password_hash = User.hash_password(password)
new_user = User(
id=user_id_counter,
username=data['username'],
email=data['email'],
password_hash=password_hash,
role=data.get('role', 'user')
)
users.append(new_user)
user_id_counter += 1
# 生成令牌
access_token = create_access_token(
new_user.id,
new_user.username,
new_user.role
)
refresh_token = create_refresh_token(new_user.id)
return success_response({
'user': new_user.to_dict(),
'access_token': access_token,
'refresh_token': refresh_token,
'token_type': 'Bearer'
}, "注册成功", status_code=201)
# 用户登录
@app.route('/auth/login', methods=['POST'])
def login():
data = request.get_json()
# 验证必需字段
if 'username' not in data or 'password' not in data:
return error_response(
"缺少用户名或密码",
"VALIDATION_ERROR",
status_code=400
)
# 查找用户
user = next((u for u in users if u.username == data['username']), None)
if not user:
return error_response(
"用户名或密码错误",
"INVALID_CREDENTIALS",
status_code=401
)
# 验证密码
if not user.verify_password(data['password']):
return error_response(
"用户名或密码错误",
"INVALID_CREDENTIALS",
status_code=401
)
# 生成令牌
access_token = create_access_token(user.id, user.username, user.role)
refresh_token = create_refresh_token(user.id)
return success_response({
'user': user.to_dict(),
'access_token': access_token,
'refresh_token': refresh_token,
'token_type': 'Bearer'
}, "登录成功")
# 刷新令牌
@app.route('/auth/refresh', methods=['POST'])
def refresh():
data = request.get_json()
if 'refresh_token' not in data:
return error_response(
"缺少刷新令牌",
"MISSING_TOKEN",
status_code=400
)
try:
# 解码刷新令牌
payload = decode_token(data['refresh_token'])
# 检查令牌类型
if payload.get('type') != 'refresh':
return error_response(
"令牌类型错误",
"INVALID_TOKEN_TYPE",
status_code=401
)
# 查找用户
user = next((u for u in users if u.id == payload['user_id']), None)
if not user:
return error_response(
"用户不存在",
"USER_NOT_FOUND",
status_code=404
)
# 生成新的访问令牌
access_token = create_access_token(user.id, user.username, user.role)
return success_response({
'access_token': access_token,
'token_type': 'Bearer'
}, "令牌刷新成功")
except Exception as e:
return error_response(
str(e),
"INVALID_TOKEN",
status_code=401
)
# 获取当前用户信息
@app.route('/auth/me', methods=['GET'])
@token_required
def get_current_user(current_user):
# 查找用户
user = next((u for u in users if u.id == current_user['user_id']), None)
if not user:
return error_response(
"用户不存在",
"USER_NOT_FOUND",
status_code=404
)
return success_response(user.to_dict())
# 登出(客户端删除令牌即可)
@app.route('/auth/logout', methods=['POST'])
@token_required
def logout(current_user):
# JWT是无状态的,服务器端无需处理
# 客户端应该删除存储的令牌
return success_response(None, "登出成功")
if __name__ == '__main__':
app.run(debug=True, port=5000)
代码说明:
- /auth/register: 用户注册,返回用户信息和令牌
- /auth/login: 用户登录,验证密码并返回令牌
- /auth/refresh: 使用刷新令牌获取新的访问令牌
- /auth/me: 获取当前登录用户信息(需要认证)
- /auth/logout: 登出(客户端删除令牌)
步骤2:实现权限管理¶
2.1 定义权限系统¶
创建 permissions.py 文件:
# 定义权限
PERMISSIONS = {
'user': {
'articles': ['read', 'create_own', 'edit_own', 'delete_own'],
'comments': ['read', 'create', 'edit_own', 'delete_own'],
'profile': ['read_own', 'edit_own']
},
'editor': {
'articles': ['read', 'create', 'edit_own', 'delete_own', 'publish_own'],
'comments': ['read', 'create', 'edit_own', 'delete_own', 'moderate'],
'profile': ['read_own', 'edit_own', 'read_others']
},
'admin': {
'articles': ['read', 'create', 'edit', 'delete', 'publish'],
'comments': ['read', 'create', 'edit', 'delete', 'moderate'],
'users': ['read', 'create', 'edit', 'delete', 'manage_roles'],
'profile': ['read', 'edit']
}
}
def has_permission(role, resource, action):
"""检查角色是否有权限"""
if role not in PERMISSIONS:
return False
if resource not in PERMISSIONS[role]:
return False
return action in PERMISSIONS[role][resource]
def check_resource_owner(user_id, resource_user_id):
"""检查是否是资源所有者"""
return user_id == resource_user_id
2.2 创建权限装饰器¶
在 auth_utils.py 中添加:
def permission_required(resource, action):
"""装饰器:要求特定权限"""
def decorator(f):
@wraps(f)
@token_required
def decorated(current_user, *args, **kwargs):
from permissions import has_permission
if not has_permission(current_user['role'], resource, action):
return jsonify({
'success': False,
'error': {
'code': 'INSUFFICIENT_PERMISSIONS',
'message': f'没有{resource}的{action}权限'
}
}), 403
return f(current_user, *args, **kwargs)
return decorated
return decorator
2.3 应用权限控制¶
在 app.py 中添加受保护的路由:
from auth_utils import permission_required
from permissions import check_resource_owner
# 模拟文章数据
articles = []
article_id_counter = 1
# 创建文章(需要create权限)
@app.route('/articles', methods=['POST'])
@permission_required('articles', 'create')
def create_article(current_user):
global article_id_counter
data = request.get_json()
# 验证必需字段
if 'title' not in data or 'content' not in data:
return error_response(
"缺少标题或内容",
"VALIDATION_ERROR",
status_code=400
)
# 创建文章
new_article = {
'id': article_id_counter,
'title': data['title'],
'content': data['content'],
'author_id': current_user['user_id'],
'author_username': current_user['username'],
'status': 'draft',
'created_at': datetime.now().isoformat()
}
articles.append(new_article)
article_id_counter += 1
return success_response(new_article, "文章创建成功", status_code=201)
# 编辑文章(需要edit权限或是作者)
@app.route('/articles/<int:article_id>', methods=['PUT'])
@token_required
def update_article(current_user, article_id):
from permissions import has_permission
# 查找文章
article = next((a for a in articles if a['id'] == article_id), None)
if not article:
return error_response(
"文章不存在",
"NOT_FOUND",
status_code=404
)
# 检查权限
is_owner = check_resource_owner(current_user['user_id'], article['author_id'])
has_edit_all = has_permission(current_user['role'], 'articles', 'edit')
has_edit_own = has_permission(current_user['role'], 'articles', 'edit_own')
if not (has_edit_all or (has_edit_own and is_owner)):
return error_response(
"没有编辑权限",
"INSUFFICIENT_PERMISSIONS",
status_code=403
)
# 更新文章
data = request.get_json()
if 'title' in data:
article['title'] = data['title']
if 'content' in data:
article['content'] = data['content']
article['updated_at'] = datetime.now().isoformat()
return success_response(article, "文章更新成功")
# 删除文章(需要delete权限或是作者)
@app.route('/articles/<int:article_id>', methods=['DELETE'])
@token_required
def delete_article(current_user, article_id):
global articles
from permissions import has_permission
# 查找文章
article = next((a for a in articles if a['id'] == article_id), None)
if not article:
return error_response(
"文章不存在",
"NOT_FOUND",
status_code=404
)
# 检查权限
is_owner = check_resource_owner(current_user['user_id'], article['author_id'])
has_delete_all = has_permission(current_user['role'], 'articles', 'delete')
has_delete_own = has_permission(current_user['role'], 'articles', 'delete_own')
if not (has_delete_all or (has_delete_own and is_owner)):
return error_response(
"没有删除权限",
"INSUFFICIENT_PERMISSIONS",
status_code=403
)
# 删除文章
articles = [a for a in articles if a['id'] != article_id]
return success_response(None, "文章删除成功", status_code=204)
# 发布文章(需要publish权限)
@app.route('/articles/<int:article_id>/publish', methods=['POST'])
@permission_required('articles', 'publish')
def publish_article(current_user, article_id):
# 查找文章
article = next((a for a in articles if a['id'] == article_id), None)
if not article:
return error_response(
"文章不存在",
"NOT_FOUND",
status_code=404
)
# 发布文章
article['status'] = 'published'
article['published_at'] = datetime.now().isoformat()
return success_response(article, "文章发布成功")
# 用户管理(仅管理员)
@app.route('/admin/users', methods=['GET'])
@role_required('admin')
def get_all_users(current_user):
return success_response([u.to_dict() for u in users])
@app.route('/admin/users/<int:user_id>/role', methods=['PUT'])
@permission_required('users', 'manage_roles')
def update_user_role(current_user, user_id):
data = request.get_json()
if 'role' not in data:
return error_response(
"缺少角色字段",
"VALIDATION_ERROR",
status_code=400
)
# 查找用户
user = next((u for u in users if u.id == user_id), None)
if not user:
return error_response(
"用户不存在",
"NOT_FOUND",
status_code=404
)
# 更新角色
valid_roles = ['user', 'editor', 'admin']
if data['role'] not in valid_roles:
return error_response(
"无效的角色",
"INVALID_ROLE",
status_code=400
)
user.role = data['role']
return success_response(user.to_dict(), "角色更新成功")
代码说明:
- 使用 @permission_required 装饰器检查权限
- 对于"自己的资源",需要额外检查所有权
- 管理员拥有所有权限
- 编辑可以管理自己的内容
- 普通用户只能管理自己的内容
步骤3:测试认证系统¶
3.1 创建环境变量文件¶
创建 .env 文件:
⚠️ 安全提示: - 生产环境必须使用强密钥 - 不要将密钥提交到版本控制 - 定期更换密钥
3.2 启动服务器¶
3.3 测试注册¶
curl -X POST http://localhost:5000/auth/register \
-H "Content-Type: application/json" \
-d '{
"username": "zhangsan",
"email": "zhangsan@example.com",
"password": "password123"
}'
响应示例:
{
"success": true,
"data": {
"user": {
"id": 1,
"username": "zhangsan",
"email": "zhangsan@example.com",
"role": "user",
"created_at": "2024-01-01T00:00:00"
},
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"token_type": "Bearer"
},
"message": "注册成功"
}
3.4 测试登录¶
curl -X POST http://localhost:5000/auth/login \
-H "Content-Type: application/json" \
-d '{
"username": "zhangsan",
"password": "password123"
}'
3.5 测试受保护的路由¶
# 获取当前用户信息
curl -X GET http://localhost:5000/auth/me \
-H "Authorization: Bearer YOUR_ACCESS_TOKEN"
# 创建文章
curl -X POST http://localhost:5000/articles \
-H "Authorization: Bearer YOUR_ACCESS_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"title": "我的第一篇文章",
"content": "这是文章内容..."
}'
3.6 测试令牌刷新¶
curl -X POST http://localhost:5000/auth/refresh \
-H "Content-Type: application/json" \
-d '{
"refresh_token": "YOUR_REFRESH_TOKEN"
}'
3.7 测试权限控制¶
# 普通用户尝试访问管理员接口(应该失败)
curl -X GET http://localhost:5000/admin/users \
-H "Authorization: Bearer USER_ACCESS_TOKEN"
# 管理员访问管理员接口(应该成功)
curl -X GET http://localhost:5000/admin/users \
-H "Authorization: Bearer ADMIN_ACCESS_TOKEN"
步骤4:实现OAuth 2.0第三方登录¶
4.1 OAuth 2.0集成示例(GitHub登录)¶
安装依赖:
在 app.py 中添加:
import requests
# GitHub OAuth配置
GITHUB_CLIENT_ID = os.getenv('GITHUB_CLIENT_ID')
GITHUB_CLIENT_SECRET = os.getenv('GITHUB_CLIENT_SECRET')
GITHUB_REDIRECT_URI = 'http://localhost:5000/auth/github/callback'
# GitHub OAuth登录 - 步骤1:重定向到GitHub
@app.route('/auth/github', methods=['GET'])
def github_login():
# 构建GitHub授权URL
github_auth_url = (
f"https://github.com/login/oauth/authorize"
f"?client_id={GITHUB_CLIENT_ID}"
f"&redirect_uri={GITHUB_REDIRECT_URI}"
f"&scope=user:email"
)
return success_response({
'auth_url': github_auth_url
}, "请访问auth_url进行授权")
# GitHub OAuth回调 - 步骤2:处理回调
@app.route('/auth/github/callback', methods=['GET'])
def github_callback():
# 获取授权码
code = request.args.get('code')
if not code:
return error_response(
"缺少授权码",
"MISSING_CODE",
status_code=400
)
# 用授权码换取访问令牌
token_response = requests.post(
'https://github.com/login/oauth/access_token',
data={
'client_id': GITHUB_CLIENT_ID,
'client_secret': GITHUB_CLIENT_SECRET,
'code': code,
'redirect_uri': GITHUB_REDIRECT_URI
},
headers={'Accept': 'application/json'}
)
if token_response.status_code != 200:
return error_response(
"获取访问令牌失败",
"TOKEN_ERROR",
status_code=500
)
token_data = token_response.json()
github_access_token = token_data.get('access_token')
# 使用访问令牌获取用户信息
user_response = requests.get(
'https://api.github.com/user',
headers={
'Authorization': f'Bearer {github_access_token}',
'Accept': 'application/json'
}
)
if user_response.status_code != 200:
return error_response(
"获取用户信息失败",
"USER_INFO_ERROR",
status_code=500
)
github_user = user_response.json()
# 查找或创建用户
global user_id_counter
user = next((u for u in users if u.email == github_user.get('email')), None)
if not user:
# 创建新用户
user = User(
id=user_id_counter,
username=github_user['login'],
email=github_user.get('email', f"{github_user['login']}@github.com"),
password_hash='', # OAuth用户不需要密码
role='user'
)
users.append(user)
user_id_counter += 1
# 生成JWT令牌
access_token = create_access_token(user.id, user.username, user.role)
refresh_token = create_refresh_token(user.id)
return success_response({
'user': user.to_dict(),
'access_token': access_token,
'refresh_token': refresh_token,
'token_type': 'Bearer'
}, "GitHub登录成功")
使用流程:
1. 前端调用 /auth/github 获取授权URL
2. 用户访问授权URL,在GitHub上授权
3. GitHub重定向回 /auth/github/callback?code=xxx
4. 后端用授权码换取访问令牌
5. 后端用访问令牌获取用户信息
6. 后端创建或查找用户,返回JWT令牌
安全最佳实践¶
1. 密码安全¶
1.1 密码强度要求¶
import re
def validate_password_strength(password):
"""验证密码强度"""
errors = []
# 长度检查
if len(password) < 8:
errors.append("密码至少需要8个字符")
# 包含大写字母
if not re.search(r'[A-Z]', password):
errors.append("密码需要包含至少一个大写字母")
# 包含小写字母
if not re.search(r'[a-z]', password):
errors.append("密码需要包含至少一个小写字母")
# 包含数字
if not re.search(r'\d', password):
errors.append("密码需要包含至少一个数字")
# 包含特殊字符
if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
errors.append("密码需要包含至少一个特殊字符")
return errors
# 在注册时使用
@app.route('/auth/register', methods=['POST'])
def register():
data = request.get_json()
password = data.get('password', '')
# 验证密码强度
password_errors = validate_password_strength(password)
if password_errors:
return error_response(
"密码强度不足",
"WEAK_PASSWORD",
password_errors,
status_code=400
)
# 继续注册流程...
1.2 防止暴力破解¶
from datetime import datetime, timedelta
# 登录尝试记录
login_attempts = {}
def check_rate_limit(username, max_attempts=5, window_minutes=15):
"""检查登录频率限制"""
now = datetime.now()
if username not in login_attempts:
login_attempts[username] = []
# 清理过期记录
login_attempts[username] = [
attempt for attempt in login_attempts[username]
if now - attempt < timedelta(minutes=window_minutes)
]
# 检查尝试次数
if len(login_attempts[username]) >= max_attempts:
return False, f"登录尝试过多,请{window_minutes}分钟后再试"
return True, None
def record_login_attempt(username):
"""记录登录尝试"""
if username not in login_attempts:
login_attempts[username] = []
login_attempts[username].append(datetime.now())
# 在登录时使用
@app.route('/auth/login', methods=['POST'])
def login():
data = request.get_json()
username = data.get('username')
# 检查频率限制
allowed, error_msg = check_rate_limit(username)
if not allowed:
return error_response(
error_msg,
"TOO_MANY_ATTEMPTS",
status_code=429
)
# 验证用户...
user = next((u for u in users if u.username == username), None)
if not user or not user.verify_password(data.get('password', '')):
# 记录失败尝试
record_login_attempt(username)
return error_response(
"用户名或密码错误",
"INVALID_CREDENTIALS",
status_code=401
)
# 登录成功,清除尝试记录
if username in login_attempts:
del login_attempts[username]
# 继续登录流程...
2. 令牌安全¶
2.1 令牌黑名单¶
# 令牌黑名单(实际应使用Redis)
token_blacklist = set()
def blacklist_token(token):
"""将令牌加入黑名单"""
token_blacklist.add(token)
def is_token_blacklisted(token):
"""检查令牌是否在黑名单中"""
return token in token_blacklist
# 修改token_required装饰器
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
token = None
if 'Authorization' in request.headers:
auth_header = request.headers['Authorization']
try:
token = auth_header.split(' ')[1]
except IndexError:
return jsonify({'error': '令牌格式错误'}), 401
if not token:
return jsonify({'error': '缺少令牌'}), 401
# 检查黑名单
if is_token_blacklisted(token):
return jsonify({'error': '令牌已失效'}), 401
try:
payload = decode_token(token)
return f(payload, *args, **kwargs)
except Exception as e:
return jsonify({'error': str(e)}), 401
return decorated
# 登出时加入黑名单
@app.route('/auth/logout', methods=['POST'])
@token_required
def logout(current_user):
# 获取令牌
token = request.headers['Authorization'].split(' ')[1]
# 加入黑名单
blacklist_token(token)
return success_response(None, "登出成功")
2.2 令牌轮换¶
# 实现令牌轮换机制
@app.route('/auth/refresh', methods=['POST'])
def refresh():
data = request.get_json()
refresh_token = data.get('refresh_token')
if not refresh_token:
return error_response("缺少刷新令牌", "MISSING_TOKEN", status_code=400)
try:
# 解码刷新令牌
payload = decode_token(refresh_token)
# 查找用户
user = next((u for u in users if u.id == payload['user_id']), None)
if not user:
return error_response("用户不存在", "USER_NOT_FOUND", status_code=404)
# 将旧的刷新令牌加入黑名单
blacklist_token(refresh_token)
# 生成新的访问令牌和刷新令牌
new_access_token = create_access_token(user.id, user.username, user.role)
new_refresh_token = create_refresh_token(user.id)
return success_response({
'access_token': new_access_token,
'refresh_token': new_refresh_token,
'token_type': 'Bearer'
}, "令牌刷新成功")
except Exception as e:
return error_response(str(e), "INVALID_TOKEN", status_code=401)
3. HTTPS和安全头¶
3.1 强制HTTPS¶
from flask import redirect, url_for
@app.before_request
def before_request():
# 生产环境强制HTTPS
if not request.is_secure and app.config.get('ENV') == 'production':
url = request.url.replace('http://', 'https://', 1)
return redirect(url, code=301)
3.2 设置安全响应头¶
@app.after_request
def set_security_headers(response):
# 防止XSS攻击
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = 'DENY'
response.headers['X-XSS-Protection'] = '1; mode=block'
# 内容安全策略
response.headers['Content-Security-Policy'] = "default-src 'self'"
# HTTPS严格传输安全
if request.is_secure:
response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
return response
4. 输入验证和清理¶
import html
import re
def sanitize_input(text):
"""清理用户输入"""
# HTML转义
text = html.escape(text)
# 移除危险字符
text = re.sub(r'[<>]', '', text)
return text.strip()
def validate_email(email):
"""验证邮箱格式"""
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return re.match(pattern, email) is not None
def validate_username(username):
"""验证用户名格式"""
# 只允许字母、数字、下划线,3-20个字符
pattern = r'^[a-zA-Z0-9_]{3,20}$'
return re.match(pattern, username) is not None
# 在注册时使用
@app.route('/auth/register', methods=['POST'])
def register():
data = request.get_json()
# 验证用户名
username = data.get('username', '')
if not validate_username(username):
return error_response(
"用户名格式不正确",
"INVALID_USERNAME",
["用户名只能包含字母、数字、下划线,长度3-20个字符"],
status_code=400
)
# 验证邮箱
email = data.get('email', '')
if not validate_email(email):
return error_response(
"邮箱格式不正确",
"INVALID_EMAIL",
status_code=400
)
# 清理输入
username = sanitize_input(username)
email = sanitize_input(email)
# 继续注册流程...
深入理解¶
JWT vs Session¶
| 特性 | JWT | Session |
|---|---|---|
| 存储位置 | 客户端 | 服务器 |
| 状态 | 无状态 | 有状态 |
| 扩展性 | 易于扩展 | 需要共享存储 |
| 撤销 | 困难 | 容易 |
| 大小 | 较大 | 较小(只存ID) |
| 安全性 | 需要HTTPS | 相对安全 |
| 适用场景 | 微服务、移动应用 | 传统Web应用 |
令牌存储位置¶
客户端存储选项:
- LocalStorage
- 优点:简单易用,持久化
-
缺点:容易受XSS攻击
-
SessionStorage
- 优点:标签页关闭后自动清除
-
缺点:仍然容易受XSS攻击
-
Cookie (HttpOnly)
- 优点:HttpOnly防止XSS,Secure防止中间人攻击
-
缺点:需要处理CSRF
-
内存存储
- 优点:最安全,刷新页面自动清除
- 缺点:用户体验差
推荐方案: - 访问令牌:HttpOnly Cookie或内存 - 刷新令牌:HttpOnly Cookie - 避免使用LocalStorage存储敏感令牌
多因素认证(MFA)¶
实现基于TOTP的双因素认证:
import pyotp
import qrcode
from io import BytesIO
import base64
# 生成MFA密钥
@app.route('/auth/mfa/setup', methods=['POST'])
@token_required
def setup_mfa(current_user):
# 查找用户
user = next((u for u in users if u.id == current_user['user_id']), None)
if not user:
return error_response("用户不存在", "USER_NOT_FOUND", status_code=404)
# 生成密钥
secret = pyotp.random_base32()
# 生成二维码
totp = pyotp.TOTP(secret)
provisioning_uri = totp.provisioning_uri(
name=user.email,
issuer_name="MyApp"
)
# 生成二维码图片
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(provisioning_uri)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
# 转换为base64
buffer = BytesIO()
img.save(buffer, format='PNG')
img_str = base64.b64encode(buffer.getvalue()).decode()
# 保存密钥(实际应加密存储)
user.mfa_secret = secret
user.mfa_enabled = False # 验证后才启用
return success_response({
'secret': secret,
'qr_code': f'data:image/png;base64,{img_str}'
}, "请使用认证器应用扫描二维码")
# 验证并启用MFA
@app.route('/auth/mfa/verify', methods=['POST'])
@token_required
def verify_mfa(current_user):
data = request.get_json()
code = data.get('code')
if not code:
return error_response("缺少验证码", "MISSING_CODE", status_code=400)
# 查找用户
user = next((u for u in users if u.id == current_user['user_id']), None)
if not user or not hasattr(user, 'mfa_secret'):
return error_response("MFA未设置", "MFA_NOT_SETUP", status_code=400)
# 验证代码
totp = pyotp.TOTP(user.mfa_secret)
if not totp.verify(code):
return error_response("验证码错误", "INVALID_CODE", status_code=400)
# 启用MFA
user.mfa_enabled = True
return success_response(None, "MFA已启用")
# 登录时验证MFA
@app.route('/auth/login', methods=['POST'])
def login():
data = request.get_json()
username = data.get('username')
password = data.get('password')
mfa_code = data.get('mfa_code')
# 验证用户名密码
user = next((u for u in users if u.username == username), None)
if not user or not user.verify_password(password):
return error_response("用户名或密码错误", "INVALID_CREDENTIALS", status_code=401)
# 如果启用了MFA,验证MFA代码
if hasattr(user, 'mfa_enabled') and user.mfa_enabled:
if not mfa_code:
return jsonify({
'success': False,
'mfa_required': True,
'message': '需要MFA验证码'
}), 401
totp = pyotp.TOTP(user.mfa_secret)
if not totp.verify(mfa_code):
return error_response("MFA验证码错误", "INVALID_MFA_CODE", status_code=401)
# 生成令牌
access_token = create_access_token(user.id, user.username, user.role)
refresh_token = create_refresh_token(user.id)
return success_response({
'user': user.to_dict(),
'access_token': access_token,
'refresh_token': refresh_token,
'token_type': 'Bearer'
}, "登录成功")
常见问题¶
Q1: JWT令牌泄露了怎么办?
A: JWT的主要问题是无法主动撤销。解决方案:
- 使用短期令牌:访问令牌设置较短的过期时间(如30分钟)
- 令牌黑名单:维护一个黑名单,检查令牌是否被撤销
- 令牌版本号:在用户表中存储令牌版本号,令牌中包含版本号
- 强制重新登录:发现泄露后,增加用户的令牌版本号
# 令牌版本号方案
class User:
def __init__(self, ...):
# ...
self.token_version = 0
def invalidate_tokens(self):
"""使所有令牌失效"""
self.token_version += 1
def create_access_token(user_id, username, role, token_version):
payload = {
'user_id': user_id,
'username': username,
'role': role,
'token_version': token_version, # 添加版本号
'exp': datetime.utcnow() + timedelta(minutes=30)
}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
# ... 解码令牌 ...
# 检查令牌版本
user = next((u for u in users if u.id == payload['user_id']), None)
if not user or user.token_version != payload.get('token_version', 0):
return jsonify({'error': '令牌已失效'}), 401
return f(payload, *args, **kwargs)
return decorated
Q2: 如何实现"记住我"功能?
A: 使用长期刷新令牌:
def create_refresh_token(user_id, remember_me=False):
"""创建刷新令牌"""
# 记住我:30天,否则7天
expire_days = 30 if remember_me else 7
payload = {
'user_id': user_id,
'type': 'refresh',
'exp': datetime.utcnow() + timedelta(days=expire_days),
'iat': datetime.utcnow()
}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
@app.route('/auth/login', methods=['POST'])
def login():
data = request.get_json()
remember_me = data.get('remember_me', False)
# ... 验证用户 ...
# 生成令牌
access_token = create_access_token(user.id, user.username, user.role)
refresh_token = create_refresh_token(user.id, remember_me)
return success_response({
'access_token': access_token,
'refresh_token': refresh_token,
'token_type': 'Bearer'
})
Q3: 如何处理并发登录?
A: 几种策略:
- 允许多设备登录:不做限制
- 限制设备数量:记录活跃令牌,超过限制时踢出最早的
- 单设备登录:新登录时使旧令牌失效
# 单设备登录方案
class User:
def __init__(self, ...):
# ...
self.current_refresh_token = None
@app.route('/auth/login', methods=['POST'])
def login():
# ... 验证用户 ...
# 生成新令牌
refresh_token = create_refresh_token(user.id)
# 保存当前刷新令牌
user.current_refresh_token = refresh_token
return success_response({
'access_token': access_token,
'refresh_token': refresh_token
})
@app.route('/auth/refresh', methods=['POST'])
def refresh():
data = request.get_json()
refresh_token = data.get('refresh_token')
# ... 解码令牌 ...
user = next((u for u in users if u.id == payload['user_id']), None)
# 检查是否是当前有效的刷新令牌
if user.current_refresh_token != refresh_token:
return error_response("令牌已失效", "INVALID_TOKEN", status_code=401)
# 生成新令牌...
Q4: 如何实现权限的动态更新?
A: 不要在JWT中存储详细权限,而是存储角色或权限版本号:
# 方案1:每次请求时查询权限
@token_required
def protected_route(current_user):
# 从数据库查询最新权限
user = get_user_from_db(current_user['user_id'])
permissions = get_user_permissions(user.role)
# 检查权限...
# 方案2:使用权限版本号
def create_access_token(user_id, username, role, permission_version):
payload = {
'user_id': user_id,
'role': role,
'permission_version': permission_version,
'exp': datetime.utcnow() + timedelta(minutes=30)
}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
# 权限更新时增加版本号
def update_user_permissions(user_id):
user = get_user(user_id)
user.permission_version += 1
save_user(user)
Q5: 如何防止CSRF攻击?
A: 使用CSRF令牌:
import secrets
# 生成CSRF令牌
def generate_csrf_token():
return secrets.token_urlsafe(32)
# 在登录时返回CSRF令牌
@app.route('/auth/login', methods=['POST'])
def login():
# ... 验证用户 ...
csrf_token = generate_csrf_token()
# 存储CSRF令牌(实际应使用Redis)
# csrf_tokens[user.id] = csrf_token
return success_response({
'access_token': access_token,
'csrf_token': csrf_token
})
# 验证CSRF令牌
@app.before_request
def check_csrf():
# 对于修改数据的请求,验证CSRF令牌
if request.method in ['POST', 'PUT', 'DELETE', 'PATCH']:
csrf_token = request.headers.get('X-CSRF-Token')
# 验证令牌...
总结¶
通过本教程,你学习了:
- ✅ 认证和授权的基本概念和区别
- ✅ JWT令牌的结构、工作原理和使用方法
- ✅ OAuth 2.0的授权流程和应用场景
- ✅ 如何实现用户注册、登录和令牌刷新
- ✅ 如何设计和实现基于角色的权限管理(RBAC)
- ✅ 如何保护API接口和检查权限
- ✅ 密码加密、令牌安全等安全最佳实践
- ✅ 如何防御常见的安全攻击
- ✅ 多因素认证(MFA)的实现方法
认证和授权是Web应用安全的基石。建议在实际项目中: - 始终使用HTTPS传输敏感数据 - 实施多层安全防护 - 定期审查和更新安全策略 - 关注最新的安全漏洞和补丁
进阶挑战¶
尝试以下挑战来巩固学习:
- 挑战1:实现邮箱验证
- 注册时发送验证邮件
- 用户点击链接验证邮箱
-
未验证用户限制功能
-
挑战2:实现密码重置
- 忘记密码功能
- 发送重置链接到邮箱
-
验证重置令牌并更新密码
-
挑战3:实现社交登录
- 集成Google OAuth
- 集成Facebook登录
-
支持账号绑定
-
挑战4:实现细粒度权限
- 从RBAC升级到ABAC
- 支持资源级权限控制
-
实现权限继承
-
挑战5:实现审计日志
- 记录所有认证事件
- 记录权限变更
- 实现日志查询和分析
完整代码¶
完整的项目代码可以在这里下载:[GitHub链接]
项目结构:
auth-system/
├── app.py # 主应用
├── models.py # 数据模型
├── auth_utils.py # 认证工具
├── permissions.py # 权限定义
├── requirements.txt # 依赖列表
├── .env # 环境变量
├── .env.example # 环境变量示例
└── README.md # 项目文档
延伸阅读¶
推荐资源¶
- JWT官方文档
- JWT.io
-
OAuth 2.0
- OAuth 2.0官方文档
-
安全最佳实践
- OWASP Top 10
-
Python安全库
- PyJWT
- bcrypt
- cryptography
相关技术¶
- 身份提供商(IdP)
- Auth0
- Okta
- AWS Cognito
-
Firebase Authentication
-
单点登录(SSO)
- SAML 2.0
- OpenID Connect
-
CAS
-
API网关
- Kong
- AWS API Gateway
- Azure API Management
下一步学习¶
建议继续学习以下内容:
- 微服务架构设计 - 学习分布式系统中的认证授权
- API网关 - 学习集中式认证和授权
- 服务网格 - 学习Istio等服务网格的安全特性
- 零信任架构 - 学习现代安全架构模式
- 身份联邦 - 学习跨组织的身份管理
反馈与支持: - 如果你在学习过程中遇到问题,欢迎在评论区留言 - 发现文档错误或有改进建议,请提交Issue - 想要分享你的项目,欢迎投稿
版权声明: 本文采用 CC BY-NC-SA 4.0 许可协议,欢迎分享和改编,但请注明出处。