1. 引言

在现代Web应用开发中,用户认证和授权是不可或缺的功能。随着前后端分离架构的普及,传统的基于会话的认证方式逐渐被基于令牌的认证方式所取代。JWT(JSON Web Token)作为一种开放标准(RFC 7519),已成为实现无状态认证的首选方案之一。

Flask作为Python世界中最受欢迎的轻量级Web框架之一,以其简洁、灵活和易于扩展的特性赢得了开发者的青睐。将JWT认证与Flask框架结合,可以为Web应用提供安全、高效且可扩展的用户认证解决方案。

本文将全面介绍如何在Flask框架中实现JWT认证,包括基本概念、实现步骤、高级功能以及常见问题的解决方案。无论您是Flask新手还是有经验的开发者,都能从本文中获得实用的知识和技巧。

2. JWT基础知识

2.1 什么是JWT

JWT(JSON Web Token)是一种开放标准(RFC 7519),它定义了一种紧凑的、自包含的方式,用于在各方之间以JSON对象安全地传输信息。该信息可以被验证和信任,因为它是数字签名的。

2.2 JWT的结构

JWT由三部分组成,用点(.)分隔:

  1. Header(头部):通常包含两部分信息:令牌的类型(即JWT)和所使用的签名算法(如HMAC SHA256或RSA)。

示例:

 { "alg": "HS256", "typ": "JWT" } 
  1. Payload(载荷):包含声明(claims)。声明是关于实体(通常是用户)和其他数据的声明。有三种类型的声明:注册声明、公共声明和私有声明。

示例:

 { "sub": "1234567890", "name": "John Doe", "iat": 1516239022 } 
  1. Signature(签名):用于验证消息在传输过程中没有被更改,并且对于使用私钥签名的令牌,它还可以验证发送者的身份。

签名的创建方式如下:

 HMACSHA256( base64UrlEncode(header) + "." + base64UrlEncode(payload), secret) 

最终的JWT由这三部分组成,用点(.)分隔:

header.payload.signature 

2.3 JWT的工作原理

JWT认证的基本流程如下:

  1. 用户使用凭据(如用户名和密码)登录。
  2. 服务器验证凭据,如果有效,则创建一个JWT并签名。
  3. 服务器将JWT发送回客户端。
  4. 客户端存储JWT(通常在localStorage或cookie中),并在后续请求的Authorization头中包含它。
  5. 服务器验证JWT的签名和有效性,如果有效,则处理请求。

2.4 JWT的优势

  • 无状态:服务器不需要存储会话信息,使得应用更容易扩展。
  • 跨域/跨服务:JWT可以在不同的域和服务之间轻松传递。
  • 移动友好:JWT在移动应用和Web应用中同样有效。
  • 安全性:JWT可以使用数字签名来验证完整性。
  • 自包含:JWT可以包含用户信息,减少数据库查询。

3. Flask中实现JWT认证的准备工作

在开始实现JWT认证之前,我们需要进行一些准备工作,包括安装必要的库和配置基本环境。

3.1 创建Flask应用

首先,让我们创建一个基本的Flask应用:

from flask import Flask app = Flask(__name__) @app.route('/') def hello(): return "Hello, World!" if __name__ == '__main__': app.run(debug=True) 

3.2 安装必要的库

为了在Flask中实现JWT认证,我们需要安装以下库:

pip install flask flask-jwt-extended flask-sqlalchemy flask-migrate flask-cors 

这些库的作用分别是:

  • flask:Flask框架本身。
  • flask-jwt-extended:提供JWT支持的Flask扩展。
  • flask-sqlalchemy:Flask的SQLAlchemy扩展,用于数据库操作。
  • flask-migrate:数据库迁移工具。
  • flask-cors:处理跨域请求。

3.3 配置Flask应用

接下来,我们需要配置Flask应用,包括数据库配置和JWT配置:

from flask import Flask from flask_sqlalchemy import SQLAlchemy from flask_migrate import Migrate from flask_jwt_extended import JWTManager from flask_cors import CORS app = Flask(__name__) # 配置数据库 app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False # 配置JWT app.config['JWT_SECRET_KEY'] = 'your-secret-key' # 在生产环境中使用强密钥 app.config['JWT_ACCESS_TOKEN_EXPIRES'] = 3600 # 访问令牌过期时间(秒) app.config['JWT_REFRESH_TOKEN_EXPIRES'] = 604800 # 刷新令牌过期时间(秒) # 初始化扩展 db = SQLAlchemy(app) migrate = Migrate(app, db) jwt = JWTManager(app) cors = CORS(app) # 定义用户模型 class User(db.Model): id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(80), unique=True, nullable=False) email = db.Column(db.String(120), unique=True, nullable=False) password_hash = db.Column(db.String(128)) def __repr__(self): return f'<User {self.username}>' if __name__ == '__main__': app.run(debug=True) 

4. Flask-JWT扩展的安装与基本配置

在上一节中,我们已经安装了flask-jwt-extended扩展并进行了基本配置。现在,让我们深入了解这个扩展的功能和配置选项。

4.1 Flask-JWT-Extended简介

Flask-JWT-Extended是一个为Flask应用提供JWT支持的扩展,它提供了完整的JWT实现,包括令牌创建、验证和刷新等功能。该扩展的主要特点包括:

  • 支持访问令牌和刷新令牌
  • 提供装饰器保护路由
  • 支持令牌黑名单
  • 可配置的令牌过期时间
  • 支持多种令牌位置(头部、Cookie、查询参数等)

4.2 基本配置选项

Flask-JWT-Extended提供了多种配置选项,以下是一些常用的配置:

app.config['JWT_SECRET_KEY'] = 'your-secret-key' # 用于签名JWT的密钥 app.config['JWT_ACCESS_TOKEN_EXPIRES'] = False # 访问令牌过期时间,False表示永不过期 app.config['JWT_REFRESH_TOKEN_EXPIRES'] = False # 刷新令牌过期时间,False表示永不过期 app.config['JWT_TOKEN_LOCATION'] = ['headers'] # 令牌位置,可以是headers, cookies, json, query_string app.config['JWT_HEADER_NAME'] = 'Authorization' # 包含令牌的HTTP头名称 app.config['JWT_HEADER_TYPE'] = 'Bearer' # 令牌类型,通常是Bearer app.config['JWT_ACCESS_COOKIE_NAME'] = 'access_token_cookie' # 访问令牌Cookie名称 app.config['JWT_REFRESH_COOKIE_NAME'] = 'refresh_token_cookie' # 刷新令牌Cookie名称 app.config['JWT_ACCESS_COOKIE_PATH'] = '/' # 访问令牌Cookie路径 app.config['JWT_REFRESH_COOKIE_PATH'] = '/refresh' # 刷新令牌Cookie路径 app.config['JWT_COOKIE_SECURE'] = False # 是否只通过HTTPS发送Cookie app.config['JWT_COOKIE_CSRF_PROTECT'] = True # 是否启用Cookie CSRF保护 app.config['JWT_JSON_KEY'] = 'access_token' # JSON中的令牌键名 app.config['JWT_REFRESH_JSON_KEY'] = 'refresh_token' # JSON中的刷新令牌键名 app.config['JWT_ERROR_MESSAGE_KEY'] = 'msg' # 错误消息的JSON键名 

4.3 初始化JWT管理器

在配置完成后,我们需要初始化JWT管理器:

from flask_jwt_extended import JWTManager jwt = JWTManager(app) 

JWT管理器提供了多种回调函数,可以用于自定义令牌处理行为:

# 当令牌无效时调用 @jwt.invalid_token_loader def invalid_token_callback(callback): return jsonify({ 'message': 'Invalid token' }), 401 # 当令牌过期时调用 @jwt.expired_token_loader def expired_token_callback(callback): return jsonify({ 'message': 'Token has expired' }), 401 # 当令牌不存在时调用 @jwt.unauthorized_loader def unauthorized_callback(callback): return jsonify({ 'message': 'No authorization token provided' }), 401 # 当令牌被拒绝时调用(例如不在黑名单中) @jwt.revoked_token_loader def revoked_token_callback(callback): return jsonify({ 'message': 'Token has been revoked' }), 401 

5. 实现基本的JWT认证流程

现在我们已经完成了准备工作,接下来让我们实现基本的JWT认证流程,包括用户认证、JWT生成、JWT验证和刷新令牌机制。

5.1 用户认证和JWT生成

首先,我们需要实现用户注册和登录功能:

from flask import request, jsonify from werkzeug.security import generate_password_hash, check_password_hash from flask_jwt_extended import create_access_token, create_refresh_token, jwt_required, get_jwt_identity # 用户注册 @app.route('/register', methods=['POST']) def register(): data = request.get_json() # 验证输入 if not data or not data.get('username') or not data.get('password') or not data.get('email'): return jsonify({'message': 'Missing required fields'}), 400 # 检查用户是否已存在 if User.query.filter_by(username=data['username']).first(): return jsonify({'message': 'Username already exists'}), 400 if User.query.filter_by(email=data['email']).first(): return jsonify({'message': 'Email already exists'}), 400 # 创建新用户 hashed_password = generate_password_hash(data['password'], method='sha256') new_user = User( username=data['username'], email=data['email'], password_hash=hashed_password ) db.session.add(new_user) db.session.commit() return jsonify({'message': 'User created successfully'}), 201 # 用户登录 @app.route('/login', methods=['POST']) def login(): data = request.get_json() # 验证输入 if not data or not data.get('username') or not data.get('password'): return jsonify({'message': 'Missing username or password'}), 400 # 查找用户 user = User.query.filter_by(username=data['username']).first() if not user or not check_password_hash(user.password_hash, data['password']): return jsonify({'message': 'Invalid username or password'}), 401 # 创建JWT access_token = create_access_token(identity=user.id) refresh_token = create_refresh_token(identity=user.id) return jsonify({ 'access_token': access_token, 'refresh_token': refresh_token, 'user': { 'id': user.id, 'username': user.username, 'email': user.email } }), 200 

5.2 JWT验证和受保护路由

接下来,让我们创建一些需要JWT验证的受保护路由:

# 获取当前用户信息 @app.route('/me', methods=['GET']) @jwt_required() def get_current_user(): current_user_id = get_jwt_identity() user = User.query.get(current_user_id) if not user: return jsonify({'message': 'User not found'}), 404 return jsonify({ 'user': { 'id': user.id, 'username': user.username, 'email': user.email } }), 200 # 受保护的路由示例 @app.route('/protected', methods=['GET']) @jwt_required() def protected(): current_user_id = get_jwt_identity() return jsonify({'message': f'Hello, user {current_user_id}! This is a protected endpoint.'}), 200 

5.3 刷新令牌机制

为了在访问令牌过期后能够获取新的访问令牌,我们需要实现刷新令牌机制:

# 刷新访问令牌 @app.route('/refresh', methods=['POST']) @jwt_required(refresh=True) def refresh(): current_user_id = get_jwt_identity() new_token = create_access_token(identity=current_user_id) return jsonify({'access_token': new_token}), 200 # 注销(将令牌加入黑名单) @app.route('/logout', methods=['DELETE']) @jwt_required() def logout(): jti = get_jwt()['jti'] # 这里应该将jti添加到黑名单中 # 实际实现需要使用数据库或Redis等存储黑名单 return jsonify({'message': 'Successfully logged out'}), 200 

6. 高级JWT认证功能

在实现了基本的JWT认证流程后,我们可以进一步探索一些高级功能,如自定义令牌载荷、令牌过期和自动刷新、基于角色的访问控制等。

6.1 自定义令牌载荷

默认情况下,JWT只包含用户标识(identity)作为载荷。但有时我们可能需要在令牌中包含更多信息,如用户角色、权限等。我们可以通过自定义令牌载荷来实现这一点:

from flask_jwt_extended import create_access_token, get_jwt # 自定义令牌载荷 @app.route('/login', methods=['POST']) def login(): data = request.get_json() # 验证用户凭据... user = User.query.filter_by(username=data['username']).first() if not user or not check_password_hash(user.password_hash, data['password']): return jsonify({'message': 'Invalid username or password'}), 401 # 创建包含额外信息的JWT additional_claims = { "role": user.role, # 假设用户模型有role字段 "email": user.email } access_token = create_access_token(identity=user.id, additional_claims=additional_claims) refresh_token = create_refresh_token(identity=user.id) return jsonify({ 'access_token': access_token, 'refresh_token': refresh_token }), 200 # 访问自定义令牌载荷 @app.route('/admin', methods=['GET']) @jwt_required() def admin(): claims = get_jwt() if claims['role'] != 'admin': return jsonify({'message': 'Admin access required'}), 403 return jsonify({'message': 'Welcome, admin!'}), 200 

6.2 令牌过期和自动刷新

为了提高用户体验,我们可以实现令牌自动刷新机制,在令牌即将过期时自动获取新的令牌:

from datetime import datetime, timedelta from flask_jwt_extended import get_jwt, create_access_token, set_access_cookies # 配置令牌自动刷新 app.config['JWT_ACCESS_TOKEN_EXPIRES'] = timedelta(hours=1) app.config['JWT_REFRESH_TOKEN_EXPIRES'] = timedelta(days=30) # 使用Cookie存储令牌 @app.after_request def refresh_expiring_jwts(response): try: exp_timestamp = get_jwt()["exp"] now = datetime.now() target_timestamp = datetime.timestamp(now + timedelta(minutes=30)) if target_timestamp > exp_timestamp: access_token = create_access_token(identity=get_jwt_identity()) set_access_cookies(response, access_token) return response except (RuntimeError, KeyError): # Case where there is not a valid JWT. Just return the original response return response 

6.3 基于角色的访问控制

基于角色的访问控制(RBAC)是一种常见的权限管理方式。我们可以通过自定义装饰器来实现基于角色的访问控制:

from functools import wraps from flask_jwt_extended import get_jwt # 角色检查装饰器 def role_required(required_role): def decorator(f): @wraps(f) @jwt_required() def decorated_function(*args, **kwargs): claims = get_jwt() if 'role' not in claims or claims['role'] != required_role: return jsonify({'message': f'{required_role} role required'}), 403 return f(*args, **kwargs) return decorated_function return decorator # 使用角色装饰器 @app.route('/admin/dashboard') @role_required('admin') def admin_dashboard(): return jsonify({'message': 'Welcome to admin dashboard'}), 200 @app.route('/moderator/dashboard') @role_required('moderator') def moderator_dashboard(): return jsonify({'message': 'Welcome to moderator dashboard'}), 200 

6.4 令牌黑名单

为了实现令牌注销功能,我们需要实现令牌黑名单机制。这里我们使用数据库来存储黑名单令牌:

# 创建令牌黑名单模型 class TokenBlacklist(db.Model): id = db.Column(db.Integer, primary_key=True) jti = db.Column(db.String(36), nullable=False, index=True) token_type = db.Column(db.String(10), nullable=False) user_identity = db.Column(db.String(50), nullable=False) revoked = db.Column(db.Boolean, nullable=False) expires = db.Column(db.DateTime, nullable=False) def to_dict(self): return { 'id': self.id, 'jti': self.jti, 'token_type': self.token_type, 'user_identity': self.user_identity, 'revoked': self.revoked, 'expires': self.expires } # 配置JWT黑名单 @jwt.token_in_blacklist_loader def check_if_token_revoked(decrypted_token): jti = decrypted_token['jti'] token = TokenBlacklist.query.filter_by(jti=jti).first() if token is None: return False # 令牌不在黑名单中 return token.revoked # 返回令牌是否被撤销 # 注销端点 @app.route('/logout', methods=['DELETE']) @jwt_required() def logout(): jti = get_jwt()['jti'] token_type = get_jwt()['type'] user_identity = get_jwt_identity() expires = datetime.fromtimestamp(get_jwt()['exp']) revoked_token = TokenBlacklist( jti=jti, token_type=token_type, user_identity=user_identity, revoked=True, expires=expires ) db.session.add(revoked_token) db.session.commit() return jsonify({'message': 'Successfully logged out'}), 200 

7. 常见问题及解决方案

在实现JWT认证时,我们可能会遇到各种问题。本节将介绍一些常见问题及其解决方案。

7.1 令牌存储和管理问题

7.1.1 令牌存储位置选择

问题:应该在哪里存储JWT令牌?

解决方案: JWT令牌可以存储在以下位置:

  • localStorage:易于使用,但容易受到XSS攻击。
  • sessionStorage:与localStorage类似,但在浏览器关闭后会被清除。
  • Cookie:可以设置HttpOnly和Secure标志来提高安全性,但容易受到CSRF攻击。
  • 内存:最安全,但页面刷新后会丢失。

最佳实践是使用HttpOnly Cookie存储令牌,并启用CSRF保护:

# 配置JWT使用Cookie app.config['JWT_TOKEN_LOCATION'] = ['cookies'] app.config['JWT_COOKIE_SECURE'] = False # 在生产环境中设置为True app.config['JWT_COOKIE_CSRF_PROTECT'] = True app.config['JWT_ACCESS_COOKIE_NAME'] = 'access_token' app.config['JWT_REFRESH_COOKIE_NAME'] = 'refresh_token' # 设置CSRF保护 @app.after_request def set_csrf_cookie(response): if 'access_token' in request.cookies: response.set_cookie( 'csrf_access_token', get_csrf_token(encoded_token=request.cookies['access_token']), secure=app.config['JWT_COOKIE_SECURE'], httponly=False, samesite='Strict' ) return response 

7.1.2 令牌刷新策略

问题:如何处理令牌过期?

解决方案: 实现令牌自动刷新机制,在令牌即将过期时自动获取新的令牌:

from datetime import datetime, timedelta from flask_jwt_extended import get_jwt, create_access_token, set_access_cookies @app.after_request def refresh_expiring_jwts(response): try: exp_timestamp = get_jwt()["exp"] now = datetime.now() target_timestamp = datetime.timestamp(now + timedelta(minutes=30)) if target_timestamp > exp_timestamp: access_token = create_access_token(identity=get_jwt_identity()) set_access_cookies(response, access_token) return response except (RuntimeError, KeyError): # Case where there is not a valid JWT. Just return the original response return response 

7.2 跨域请求处理

7.2.1 CORS配置

问题:如何处理前后端分离架构中的跨域请求?

解决方案: 使用Flask-CORS扩展处理跨域请求:

from flask_cors import CORS # 全局CORS配置 CORS(app, resources={r"/*": {"origins": "*"}}) # 或者针对特定路由配置 @app.route('/api/login', methods=['POST']) @cross_origin(origin='*', headers=['Content-Type', 'Authorization']) def login(): # 登录逻辑 pass 

7.2.2 预检请求处理

问题:浏览器发送OPTIONS预检请求导致认证失败。

解决方案: 为OPTIONS请求提供特殊处理:

@app.before_request def handle_preflight(): if request.method == "OPTIONS": response = jsonify() response.headers.add("Access-Control-Allow-Origin", "*") response.headers.add("Access-Control-Allow-Headers", "Content-Type, Authorization") response.headers.add("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS") return response 

7.3 安全性问题和最佳实践

7.3.1 令牌劫持防护

问题:如何防止令牌被劫持?

解决方案

  1. 使用HTTPS传输令牌。
  2. 设置较短的令牌过期时间。
  3. 使用HttpOnly Cookie存储令牌。
  4. 实现令牌黑名单机制。
  5. 在令牌中包含用户代理和IP地址信息,并在验证时检查:
# 在创建令牌时包含用户信息 @app.route('/login', methods=['POST']) def login(): # 验证用户凭据... # 获取用户代理和IP地址 user_agent = request.headers.get('User-Agent') ip_address = request.remote_addr # 创建包含额外信息的JWT additional_claims = { "user_agent": user_agent, "ip_address": ip_address } access_token = create_access_token(identity=user.id, additional_claims=additional_claims) return jsonify({'access_token': access_token}), 200 # 在验证时检查用户信息 @app.route('/protected', methods=['GET']) @jwt_required() def protected(): claims = get_jwt() current_user_agent = request.headers.get('User-Agent') current_ip_address = request.remote_addr if claims.get('user_agent') != current_user_agent or claims.get('ip_address') != current_ip_address: return jsonify({'message': 'Invalid token'}), 401 return jsonify({'message': 'Access granted'}), 200 

7.3.2 密钥管理

问题:如何安全地管理JWT密钥?

解决方案

  1. 不要将密钥硬编码在代码中。
  2. 使用环境变量或配置文件存储密钥。
  3. 定期更换密钥。
  4. 使用强密钥(至少32个字符的随机字符串)。
import os from dotenv import load_dotenv load_dotenv() # 从.env文件加载环境变量 app.config['JWT_SECRET_KEY'] = os.environ.get('JWT_SECRET_KEY', 'default-secret-key') 

7.3.3 令牌过期策略

问题:如何设置合理的令牌过期时间?

解决方案: 根据应用的安全需求和使用场景设置令牌过期时间:

  • 访问令牌:通常设置为15分钟到1小时。
  • 刷新令牌:通常设置为几天到几周。
from datetime import timedelta app.config['JWT_ACCESS_TOKEN_EXPIRES'] = timedelta(minutes=30) app.config['JWT_REFRESH_TOKEN_EXPIRES'] = timedelta(days=7) 

7.4 性能优化

7.4.1 数据库查询优化

问题:频繁的数据库查询影响性能。

解决方案

  1. 使用缓存存储用户信息。
  2. 在令牌中包含必要的用户信息,减少数据库查询。
from flask_caching import Cache # 配置缓存 cache = Cache(app, config={'CACHE_TYPE': 'simple'}) # 缓存用户信息 @cache.memoize(timeout=60) def get_user(user_id): return User.query.get(user_id) # 在路由中使用缓存 @app.route('/me', methods=['GET']) @jwt_required() def get_current_user(): current_user_id = get_jwt_identity() user = get_user(current_user_id) if not user: return jsonify({'message': 'User not found'}), 404 return jsonify({ 'user': { 'id': user.id, 'username': user.username, 'email': user.email } }), 200 

7.4.2 令牌验证优化

问题:JWT验证过程影响性能。

解决方案

  1. 使用高效的签名算法(如HS256)。
  2. 避免在令牌中包含过多信息。
  3. 使用异步处理令牌验证。
from concurrent.futures import ThreadPoolExecutor # 创建线程池 executor = ThreadPoolExecutor(4) # 异步验证令牌 def verify_token_async(token): with app.app_context(): try: decode_token(token) return True except: return False # 在路由中使用异步验证 @app.route('/protected', methods=['GET']) def protected(): token = request.headers.get('Authorization') if not token: return jsonify({'message': 'Token is missing'}), 401 # 异步验证令牌 future = executor.submit(verify_token_async, token) is_valid = future.result() if not is_valid: return jsonify({'message': 'Token is invalid'}), 401 return jsonify({'message': 'Access granted'}), 200 

8. 完整示例项目

下面是一个完整的Flask JWT认证示例项目,包含用户注册、登录、受保护路由和令牌刷新等功能。

8.1 项目结构

flask_jwt_example/ ├── app.py ├── config.py ├── models.py ├── auth.py ├── routes.py ├── requirements.txt └── .env 

8.2 配置文件 (config.py)

import os from datetime import timedelta class Config: SECRET_KEY = os.environ.get('SECRET_KEY') or 'dev-secret-key' SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or 'sqlite:///app.db' SQLALCHEMY_TRACK_MODIFICATIONS = False # JWT配置 JWT_SECRET_KEY = os.environ.get('JWT_SECRET_KEY') or 'jwt-secret-key' JWT_ACCESS_TOKEN_EXPIRES = timedelta(minutes=30) JWT_REFRESH_TOKEN_EXPIRES = timedelta(days=7) JWT_TOKEN_LOCATION = ['headers'] JWT_HEADER_NAME = 'Authorization' JWT_HEADER_TYPE = 'Bearer' 

8.3 数据模型 (models.py)

from datetime import datetime from flask_sqlalchemy import SQLAlchemy from werkzeug.security import generate_password_hash, check_password_hash db = SQLAlchemy() class User(db.Model): id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(80), unique=True, nullable=False) email = db.Column(db.String(120), unique=True, nullable=False) password_hash = db.Column(db.String(128)) role = db.Column(db.String(20), default='user') created_at = db.Column(db.DateTime, default=datetime.utcnow) def set_password(self, password): self.password_hash = generate_password_hash(password) def check_password(self, password): return check_password_hash(self.password_hash, password) def to_dict(self): return { 'id': self.id, 'username': self.username, 'email': self.email, 'role': self.role, 'created_at': self.created_at.isoformat() } class TokenBlacklist(db.Model): id = db.Column(db.Integer, primary_key=True) jti = db.Column(db.String(36), nullable=False, index=True) token_type = db.Column(db.String(10), nullable=False) user_identity = db.Column(db.String(50), nullable=False) revoked = db.Column(db.Boolean, nullable=False) expires = db.Column(db.DateTime, nullable=False) 

8.4 认证模块 (auth.py)

from functools import wraps from flask import jsonify from flask_jwt_extended import get_jwt, verify_jwt_in_request def role_required(required_role): def decorator(f): @wraps(f) def decorated_function(*args, **kwargs): verify_jwt_in_request() claims = get_jwt() if 'role' not in claims or claims['role'] != required_role: return jsonify({'message': f'{required_role} role required'}), 403 return f(*args, **kwargs) return decorated_function return decorator 

8.5 路由模块 (routes.py)

from flask import request, jsonify from flask_jwt_extended import ( create_access_token, create_refresh_token, get_jwt_identity, jwt_required, get_jwt, set_access_cookies, unset_jwt_cookies ) from datetime import datetime from models import db, User, TokenBlacklist from auth import role_required def register_routes(app, jwt): # 用户注册 @app.route('/register', methods=['POST']) def register(): data = request.get_json() if not data or not data.get('username') or not data.get('password') or not data.get('email'): return jsonify({'message': 'Missing required fields'}), 400 if User.query.filter_by(username=data['username']).first(): return jsonify({'message': 'Username already exists'}), 400 if User.query.filter_by(email=data['email']).first(): return jsonify({'message': 'Email already exists'}), 400 user = User( username=data['username'], email=data['email'], role=data.get('role', 'user') ) user.set_password(data['password']) db.session.add(user) db.session.commit() return jsonify({'message': 'User created successfully'}), 201 # 用户登录 @app.route('/login', methods=['POST']) def login(): data = request.get_json() if not data or not data.get('username') or not data.get('password'): return jsonify({'message': 'Missing username or password'}), 400 user = User.query.filter_by(username=data['username']).first() if not user or not user.check_password(data['password']): return jsonify({'message': 'Invalid username or password'}), 401 additional_claims = { "role": user.role, "email": user.email } access_token = create_access_token(identity=user.id, additional_claims=additional_claims) refresh_token = create_refresh_token(identity=user.id) return jsonify({ 'access_token': access_token, 'refresh_token': refresh_token, 'user': user.to_dict() }), 200 # 刷新令牌 @app.route('/refresh', methods=['POST']) @jwt_required(refresh=True) def refresh(): current_user_id = get_jwt_identity() user = User.query.get(current_user_id) if not user: return jsonify({'message': 'User not found'}), 404 additional_claims = { "role": user.role, "email": user.email } access_token = create_access_token(identity=current_user_id, additional_claims=additional_claims) return jsonify({'access_token': access_token}), 200 # 获取当前用户信息 @app.route('/me', methods=['GET']) @jwt_required() def get_current_user(): current_user_id = get_jwt_identity() user = User.query.get(current_user_id) if not user: return jsonify({'message': 'User not found'}), 404 return jsonify({'user': user.to_dict()}), 200 # 受保护路由 @app.route('/protected', methods=['GET']) @jwt_required() def protected(): current_user_id = get_jwt_identity() return jsonify({'message': f'Hello, user {current_user_id}! This is a protected endpoint.'}), 200 # 管理员路由 @app.route('/admin', methods=['GET']) @role_required('admin') def admin(): return jsonify({'message': 'Welcome, admin!'}), 200 # 注销 @app.route('/logout', methods=['DELETE']) @jwt_required() def logout(): jti = get_jwt()['jti'] token_type = get_jwt()['type'] user_identity = get_jwt_identity() expires = datetime.fromtimestamp(get_jwt()['exp']) revoked_token = TokenBlacklist( jti=jti, token_type=token_type, user_identity=user_identity, revoked=True, expires=expires ) db.session.add(revoked_token) db.session.commit() return jsonify({'message': 'Successfully logged out'}), 200 # JWT错误处理 @jwt.expired_token_loader def expired_token_callback(callback): return jsonify({'message': 'Token has expired'}), 401 @jwt.invalid_token_loader def invalid_token_callback(callback): return jsonify({'message': 'Invalid token'}), 401 @jwt.unauthorized_loader def unauthorized_callback(callback): return jsonify({'message': 'No authorization token provided'}), 401 @jwt.revoked_token_loader def revoked_token_callback(callback): return jsonify({'message': 'Token has been revoked'}), 401 # 令牌黑名单检查 @jwt.token_in_blacklist_loader def check_if_token_revoked(decrypted_token): jti = decrypted_token['jti'] token = TokenBlacklist.query.filter_by(jti=jti).first() if token is None: return False return token.revoked 

8.6 主应用文件 (app.py)

from flask import Flask from flask_migrate import Migrate from flask_jwt_extended import JWTManager from flask_cors import CORS from models import db from routes import register_routes from config import Config def create_app(config_class=Config): app = Flask(__name__) app.config.from_object(config_class) # 初始化扩展 db.init_app(app) migrate = Migrate(app, db) jwt = JWTManager(app) CORS(app) # 注册路由 register_routes(app, jwt) # 创建数据库表 with app.app_context(): db.create_all() return app app = create_app() if __name__ == '__main__': app.run(debug=True) 

8.7 依赖文件 (requirements.txt)

Flask==2.0.1 Flask-SQLAlchemy==2.5.1 Flask-Migrate==3.1.0 Flask-JWT-Extended==4.3.1 Flask-CORS==3.0.10 python-dotenv==0.19.0 Werkzeug==2.0.1 

8.8 环境变量文件 (.env)

SECRET_KEY=your-secret-key JWT_SECRET_KEY=your-jwt-secret-key DATABASE_URL=sqlite:///app.db 

9. 总结

本文全面介绍了在Flask框架下实现JWT认证的方法和常见问题的解决方案。我们从JWT的基础知识开始,逐步深入到实际实现,包括基本认证流程、高级功能和常见问题的解决方案。

9.1 关键要点回顾

  1. JWT基础知识:JWT由三部分组成(头部、载荷和签名),它是一种无状态的认证机制,适用于现代Web应用。

  2. Flask-JWT-Extended扩展:这个扩展提供了完整的JWT实现,包括令牌创建、验证和刷新等功能。

  3. 基本认证流程:包括用户注册、登录、受保护路由和令牌刷新等功能。

  4. 高级功能:包括自定义令牌载荷、令牌过期和自动刷新、基于角色的访问控制和令牌黑名单等。

  5. 常见问题解决方案:包括令牌存储和管理、跨域请求处理、安全性问题和性能优化等。

9.2 最佳实践

  1. 安全性

    • 使用强密钥和安全的签名算法。
    • 设置合理的令牌过期时间。
    • 使用HTTPS传输令牌。
    • 实现令牌黑名单机制。
  2. 性能

    • 使用缓存减少数据库查询。
    • 优化令牌验证过程。
    • 避免在令牌中包含过多信息。
  3. 用户体验

    • 实现令牌自动刷新机制。
    • 提供清晰的错误信息。
    • 支持多种令牌存储方式。

9.3 未来发展方向

随着Web应用的发展,JWT认证也在不断演进。未来可能的发展方向包括:

  1. 更安全的令牌存储:如使用硬件安全模块(HSM)存储密钥。
  2. 更细粒度的权限控制:如基于属性的访问控制(ABAC)。
  3. 更好的令牌管理:如令牌绑定、令牌吊销列表等。
  4. 与其他认证机制的集成:如OAuth 2.0、OpenID Connect等。

总之,JWT认证是现代Web应用中不可或缺的一部分。通过本文的介绍,希望读者能够掌握在Flask框架下实现JWT认证的方法,并能够根据实际需求进行定制和优化。