FastAPI日志配置完全指南从基础到进阶掌握高效日志管理技巧提升应用性能与调试能力
引言
在现代Web应用开发中,日志管理是不可或缺的关键环节。良好的日志系统不仅能够帮助开发者追踪和调试问题,还能提供系统运行状态的宝贵数据,为性能优化和故障排查提供有力支持。FastAPI作为现代高性能Web框架,其异步特性和类型提示功能为开发者提供了极佳的开发体验,但正确配置和管理日志系统才能充分发挥其潜力。本文将从基础到进阶,全面介绍FastAPI日志配置的各个方面,帮助你掌握高效日志管理技巧,提升应用性能与调试能力。
FastAPI日志基础
日志级别和基本配置
Python标准库中的logging
模块是FastAPI日志系统的基础。了解不同日志级别对于有效管理日志至关重要:
- DEBUG: 详细信息,通常只在诊断问题时使用
- INFO: 确认事情按预期工作
- WARNING: 表示发生了意外情况,或者在不久的将来可能出现问题(例如”磁盘空间不足”),但软件仍在正常工作
- ERROR: 由于更严重的问题,软件已不能执行一些功能
- CRITICAL: 严重错误,表明程序本身可能无法继续运行
在FastAPI应用中,可以通过以下方式设置基本日志配置:
import logging from fastapi import FastAPI # 创建FastAPI应用 app = FastAPI() # 配置日志级别 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) # 创建路由 @app.get("/") async def read_root(): logging.info("访问根路径") return {"Hello": "World"}
FastAPI内置日志系统
FastAPI使用Uvicorn作为ASGI服务器,Uvicorn有自己的日志系统。要全面配置FastAPI应用的日志,需要同时考虑应用日志和Uvicorn服务器日志。
import logging from fastapi import FastAPI import uvicorn app = FastAPI() # 配置应用日志 logger = logging.getLogger("myapp") logger.setLevel(logging.INFO) # 创建控制台处理器 console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) # 创建格式化器 formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') console_handler.setFormatter(formatter) # 添加处理器到日志记录器 logger.addHandler(console_handler) @app.get("/") async def read_root(): logger.info("访问根路径") return {"Hello": "World"} if __name__ == "__main__": # 配置Uvicorn日志 uvicorn.run(app, host="0.0.0.0", port=8000, log_level="info")
FastAPI日志进阶配置
自定义日志格式
自定义日志格式可以使日志信息更具可读性和实用性。FastAPI允许你通过配置日志格式化器来实现这一点:
import logging from fastapi import FastAPI import uvicorn import sys from datetime import datetime app = FastAPI() # 创建自定义格式化器 class CustomFormatter(logging.Formatter): def formatTime(self, record, datefmt=None): now = datetime.now() if datefmt: return now.strftime(datefmt) return now.strftime("%Y-%m-%d %H:%M:%S,%f")[:-3] # 创建格式化器实例 formatter = CustomFormatter( '%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s' ) # 配置根日志记录器 root_logger = logging.getLogger() root_logger.setLevel(logging.INFO) # 创建控制台处理器 console_handler = logging.StreamHandler(sys.stdout) console_handler.setLevel(logging.INFO) console_handler.setFormatter(formatter) root_logger.addHandler(console_handler) @app.get("/") async def read_root(): logging.info("访问根路径") return {"Hello": "World"}
日志处理器和过滤器
日志处理器决定了日志信息的输出目的地,而过滤器则允许你对日志记录进行更精细的控制。以下示例展示了如何配置文件处理器和自定义过滤器:
import logging from fastapi import FastAPI import uvicorn import os app = FastAPI() # 确保日志目录存在 os.makedirs("logs", exist_ok=True) # 创建自定义过滤器 class EndpointFilter(logging.Filter): def filter(self, record): # 只记录特定端点的日志 return "/api/v1/" in record.getMessage() # 配置文件处理器 file_handler = logging.FileHandler("logs/app.log") file_handler.setLevel(logging.INFO) # 配置另一个文件处理器用于错误日志 error_handler = logging.FileHandler("logs/error.log") error_handler.setLevel(logging.ERROR) error_handler.addFilter(EndpointFilter()) # 添加过滤器 # 创建格式化器 formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') file_handler.setFormatter(formatter) error_handler.setFormatter(formatter) # 配置应用日志记录器 logger = logging.getLogger("myapp") logger.setLevel(logging.INFO) logger.addHandler(file_handler) logger.addHandler(error_handler) @app.get("/") async def read_root(): logger.info("访问根路径") return {"Hello": "World"} @app.get("/api/v1/users") async def get_users(): logger.info("获取用户列表") logger.error("这是一个错误消息示例") return {"users": []}
日志轮转和归档
为了避免日志文件过大,需要实现日志轮转功能。Python的logging.handlers
模块提供了RotatingFileHandler
和TimedRotatingFileHandler
来实现这一功能:
import logging from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler from fastapi import FastAPI import os app = FastAPI() # 确保日志目录存在 os.makedirs("logs", exist_ok=True) # 配置按大小轮转的日志处理器 # 每个日志文件最大10MB,保留5个备份 rotating_handler = RotatingFileHandler( "logs/app.log", maxBytes=10*1024*1024, # 10MB backupCount=5 ) rotating_handler.setLevel(logging.INFO) # 配置按时间轮转的日志处理器 # 每天午夜轮转,保留7天的日志 timed_handler = TimedRotatingFileHandler( "logs/app_timed.log", when="midnight", interval=1, backupCount=7 ) timed_handler.setLevel(logging.INFO) # 创建格式化器 formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') rotating_handler.setFormatter(formatter) timed_handler.setFormatter(formatter) # 配置应用日志记录器 logger = logging.getLogger("myapp") logger.setLevel(logging.INFO) logger.addHandler(rotating_handler) logger.addHandler(timed_handler) @app.get("/") async def read_root(): logger.info("访问根路径") return {"Hello": "World"}
高效日志管理技巧
结构化日志
结构化日志(如JSON格式)使日志更易于机器解析和分析,特别适合与日志聚合系统集成。以下是使用Python的python-json-logger
库实现结构化日志的示例:
import logging from fastapi import FastAPI from pythonjsonlogger import jsonlogger import os app = FastAPI() # 确保日志目录存在 os.makedirs("logs", exist_ok=True) # 创建JSON格式化器 json_formatter = jsonlogger.JsonFormatter( '%(asctime)s %(name)s %(levelname)s %(message)s %(filename)s %(lineno)d' ) # 配置文件处理器 json_handler = logging.FileHandler("logs/app.json.log") json_handler.setLevel(logging.INFO) json_handler.setFormatter(json_formatter) # 配置应用日志记录器 logger = logging.getLogger("myapp") logger.setLevel(logging.INFO) logger.addHandler(json_handler) @app.get("/") async def read_root(): logger.info("访问根路径", extra={"endpoint": "/", "method": "GET"}) return {"Hello": "World"} @app.get("/users/{user_id}") async def get_user(user_id: int): logger.info( "获取用户信息", extra={ "endpoint": f"/users/{user_id}", "method": "GET", "user_id": user_id } ) return {"user_id": user_id, "name": "John Doe"}
异步日志处理
在FastAPI这样的异步框架中,使用同步日志记录可能会阻塞事件循环,影响性能。以下是实现异步日志处理的方法:
import logging from fastapi import FastAPI import asyncio import os from concurrent.futures import ThreadPoolExecutor app = FastAPI() # 确保日志目录存在 os.makedirs("logs", exist_ok=True) # 创建线程池用于日志处理 log_executor = ThreadPoolExecutor(max_workers=1) # 配置文件处理器 file_handler = logging.FileHandler("logs/app.log") file_handler.setLevel(logging.INFO) # 创建格式化器 formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') file_handler.setFormatter(formatter) # 配置应用日志记录器 logger = logging.getLogger("myapp") logger.setLevel(logging.INFO) logger.addHandler(file_handler) # 创建异步日志记录函数 async def async_log(level, message, *args, **kwargs): def _log(): if level == "info": logger.info(message, *args, **kwargs) elif level == "warning": logger.warning(message, *args, **kwargs) elif level == "error": logger.error(message, *args, **kwargs) elif level == "debug": logger.debug(message, *args, **kwargs) # 在线程池中执行日志记录 loop = asyncio.get_event_loop() await loop.run_in_executor(log_executor, _log) @app.get("/") async def read_root(): await async_log("info", "访问根路径") return {"Hello": "World"} @app.get("/users/{user_id}") async def get_user(user_id: int): await async_log("info", f"获取用户信息: {user_id}") return {"user_id": user_id, "name": "John Doe"}
日志聚合和集中管理
对于分布式系统,将日志集中管理非常重要。以下是使用Logstash和Elasticsearch实现日志聚合的示例配置:
import logging from fastapi import FastAPI import os from logstash_async.handler import AsynchronousLogstashHandler from logstash_async.formatter import LogstashFormatter app = FastAPI() # 配置Logstash处理器 logstash_handler = AsynchronousLogstashHandler( host='localhost', port=5959, database_path=None # 使用内存队列 ) logstash_handler.setFormatter(LogstashFormatter()) # 配置应用日志记录器 logger = logging.getLogger("myapp") logger.setLevel(logging.INFO) logger.addHandler(logstash_handler) @app.get("/") async def read_root(): logger.info("访问根路径", extra={"endpoint": "/", "method": "GET"}) return {"Hello": "World"} @app.get("/users/{user_id}") async def get_user(user_id: int): logger.info( "获取用户信息", extra={ "endpoint": f"/users/{user_id}", "method": "GET", "user_id": user_id } ) return {"user_id": user_id, "name": "John Doe"}
日志与性能优化
日志对性能的影响
日志记录虽然重要,但不当的日志配置可能会对应用性能产生负面影响。以下是几个关键考虑因素:
- 同步日志记录:同步日志记录会阻塞请求处理线程,特别是在高I/O操作时。
- 日志级别:在生产环境中使用DEBUG级别日志会产生大量日志数据,影响性能。
- 日志格式化:复杂的日志格式化会增加CPU开销。
- 日志目的地:写入远程日志系统或数据库比写入本地文件慢。
优化日志记录的技巧
以下是一些优化日志记录的技巧:
import logging from fastapi import FastAPI import os from logging.handlers import RotatingFileHandler from functools import wraps import time app = FastAPI() # 确保日志目录存在 os.makedirs("logs", exist_ok=True) # 配置文件处理器 file_handler = RotatingFileHandler( "logs/app.log", maxBytes=10*1024*1024, # 10MB backupCount=5 ) file_handler.setLevel(logging.INFO) # 创建格式化器 formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') file_handler.setFormatter(formatter) # 配置应用日志记录器 logger = logging.getLogger("myapp") logger.setLevel(logging.INFO) logger.addHandler(file_handler) # 性能监控装饰器 def monitor_performance(func): @wraps(func) async def wrapper(*args, **kwargs): start_time = time.time() result = await func(*args, **kwargs) duration = (time.time() - start_time) * 1000 # 转换为毫秒 # 只记录执行时间超过阈值的请求 if duration > 100: # 100毫秒 logger.warning(f"慢请求: {func.__name__} 执行时间: {duration:.2f}ms") return result return wrapper # 条件日志记录函数 def log_if(condition, level, message, *args, **kwargs): if condition: getattr(logger, level)(message, *args, **kwargs) @app.get("/") @monitor_performance async def read_root(): # 使用条件日志记录 log_if(app.debug, "debug", "访问根路径 (调试模式)") logger.info("访问根路径") return {"Hello": "World"} @app.get("/users/{user_id}") @monitor_performance async def get_user(user_id: int): # 使用条件日志记录 log_if(user_id > 1000, "warning", f"请求大ID用户: {user_id}") logger.info(f"获取用户信息: {user_id}") # 模拟耗时操作 import asyncio await asyncio.sleep(0.1) return {"user_id": user_id, "name": "John Doe"}
条件日志记录
条件日志记录可以减少不必要的日志输出,提高性能:
import logging from fastapi import FastAPI, Request import time import os app = FastAPI() # 确保日志目录存在 os.makedirs("logs", exist_ok=True) # 配置文件处理器 file_handler = logging.FileHandler("logs/app.log") file_handler.setLevel(logging.INFO) # 创建格式化器 formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') file_handler.setFormatter(formatter) # 配置应用日志记录器 logger = logging.getLogger("myapp") logger.setLevel(logging.INFO) logger.addHandler(file_handler) # 请求日志中间件 @app.middleware("http") async def log_requests(request: Request, call_next): start_time = time.time() # 处理请求 response = await call_next(request) # 计算处理时间 process_time = (time.time() - start_time) * 1000 # 只记录慢请求或错误请求 if process_time > 100 or response.status_code >= 400: logger.info( f"请求: {request.method} {request.url.path} - " f"状态码: {response.status_code} - " f"处理时间: {process_time:.2f}ms" ) return response @app.get("/") async def read_root(): # 条件日志记录 if app.debug: logger.debug("访问根路径 (调试模式)") # 总是记录重要事件 logger.info("访问根路径") return {"Hello": "World"} @app.get("/users/{user_id}") async def get_user(user_id: int): # 条件日志记录 if user_id > 1000: logger.warning(f"请求大ID用户: {user_id}") logger.info(f"获取用户信息: {user_id}") # 模拟不同处理时间 import asyncio if user_id % 10 == 0: await asyncio.sleep(0.2) # 模拟慢请求 return {"user_id": user_id, "name": "John Doe"}
日志在调试中的应用
错误追踪和调试
良好的日志系统可以大大简化错误追踪和调试过程。以下是实现错误追踪和调试的示例:
import logging from fastapi import FastAPI, Request, HTTPException from fastapi.responses import JSONResponse import traceback import os from datetime import datetime app = FastAPI() # 确保日志目录存在 os.makedirs("logs", exist_ok=True) # 配置文件处理器 file_handler = logging.FileHandler("logs/app.log") file_handler.setLevel(logging.INFO) # 配置错误日志处理器 error_handler = logging.FileHandler("logs/error.log") error_handler.setLevel(logging.ERROR) # 创建格式化器 formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') file_handler.setFormatter(formatter) error_handler.setFormatter(formatter) # 配置应用日志记录器 logger = logging.getLogger("myapp") logger.setLevel(logging.INFO) logger.addHandler(file_handler) logger.addHandler(error_handler) # 全局异常处理 @app.exception_handler(Exception) async def global_exception_handler(request: Request, exc: Exception): # 记录异常详细信息 error_id = f"{datetime.now().strftime('%Y%m%d%H%M%S')}_{id(request)}" logger.error( f"未处理的异常 [ID: {error_id}]: {str(exc)}n" f"请求路径: {request.url.path}n" f"请求方法: {request.method}n" f"客户端IP: {request.client.host}n" f"堆栈跟踪:n{traceback.format_exc()}", extra={"error_id": error_id} ) # 返回友好的错误响应 return JSONResponse( status_code=500, content={ "message": "内部服务器错误", "error_id": error_id, "documentation_url": "https://example.com/docs/errors" } ) # 自定义异常 class UserNotFoundError(HTTPException): def __init__(self, user_id: int): super().__init__( status_code=404, detail=f"用户ID {user_id} 未找到" ) @app.exception_handler(UserNotFoundError) async def user_not_found_handler(request: Request, exc: UserNotFoundError): logger.warning( f"用户未找到: {exc.detail}n" f"请求路径: {request.url.path}n" f"请求方法: {request.method}n" f"客户端IP: {request.client.host}" ) return JSONResponse( status_code=exc.status_code, content={ "message": exc.detail, "error_code": "USER_NOT_FOUND" } ) @app.get("/") async def read_root(): logger.info("访问根路径") return {"Hello": "World"} @app.get("/users/{user_id}") async def get_user(user_id: int): logger.info(f"获取用户信息: {user_id}") # 模拟用户查找 if user_id <= 0: logger.error(f"无效的用户ID: {user_id}") raise HTTPException(status_code=400, detail="无效的用户ID") if user_id > 1000: logger.warning(f"用户ID不存在: {user_id}") raise UserNotFoundError(user_id) # 模拟意外错误 if user_id == 999: logger.error(f"模拟意外错误,用户ID: {user_id}") raise ValueError("模拟意外错误") return {"user_id": user_id, "name": "John Doe"}
请求日志记录
详细记录请求信息对于调试API问题非常有帮助。以下是实现请求日志记录的示例:
import logging from fastapi import FastAPI, Request, Response import time import json import os from typing import Dict, Any app = FastAPI() # 确保日志目录存在 os.makedirs("logs", exist_ok=True) # 配置请求日志处理器 request_handler = logging.FileHandler("logs/requests.log") request_handler.setLevel(logging.INFO) # 创建格式化器 formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') request_handler.setFormatter(formatter) # 配置请求日志记录器 request_logger = logging.getLogger("requests") request_logger.setLevel(logging.INFO) request_logger.addHandler(request_handler) # 请求日志中间件 @app.middleware("http") async def log_requests(request: Request, call_next): # 记录请求开始时间 start_time = time.time() # 获取请求信息 request_info = { "method": request.method, "url": str(request.url), "path": request.url.path, "query_params": dict(request.query_params), "client": { "host": request.client.host, "port": request.client.port }, "headers": dict(request.headers) } # 记录请求体(对于非表单数据) try: if request.method in ["POST", "PUT", "PATCH"]: body = await request.body() if body: try: request_info["body"] = json.loads(body) except json.JSONDecodeError: request_info["body"] = body.decode("utf-8") except Exception: request_info["body"] = "无法解析请求体" # 处理请求 response = await call_next(request) # 计算处理时间 process_time = (time.time() - start_time) * 1000 # 记录响应信息 response_info = { "status_code": response.status_code, "headers": dict(response.headers), "process_time_ms": round(process_time, 2) } # 记录完整请求-响应信息 request_logger.info( f"请求-响应日志: {request.method} {request.url.path} - " f"状态码: {response.status_code} - " f"处理时间: {process_time:.2f}ms", extra={ "request": request_info, "response": response_info } ) return response @app.get("/") async def read_root(): return {"Hello": "World"} @app.post("/users") async def create_user(user: Dict[str, Any]): return {"id": 1, **user} @app.get("/users/{user_id}") async def get_user(user_id: int): return {"user_id": user_id, "name": "John Doe"}
性能监控日志
性能监控日志可以帮助识别性能瓶颈和优化机会。以下是实现性能监控日志的示例:
import logging from fastapi import FastAPI, Request, Response import time import os import asyncio from functools import wraps from collections import defaultdict import statistics app = FastAPI() # 确保日志目录存在 os.makedirs("logs", exist_ok=True) # 配置性能日志处理器 perf_handler = logging.FileHandler("logs/performance.log") perf_handler.setLevel(logging.INFO) # 创建格式化器 formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') perf_handler.setFormatter(formatter) # 配置性能日志记录器 perf_logger = logging.getLogger("performance") perf_logger.setLevel(logging.INFO) perf_logger.addHandler(perf_handler) # 性能数据存储 performance_data = defaultdict(list) # 性能监控装饰器 def monitor_performance(endpoint_name: str = None): def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): start_time = time.time() # 获取端点名称 ep_name = endpoint_name or func.__name__ try: # 执行函数 result = await func(*args, **kwargs) # 计算执行时间 execution_time = (time.time() - start_time) * 1000 # 存储性能数据 performance_data[ep_name].append(execution_time) # 记录性能日志 perf_logger.info( f"端点性能: {ep_name} - 执行时间: {execution_time:.2f}ms", extra={ "endpoint": ep_name, "execution_time_ms": round(execution_time, 2), "status": "success" } ) return result except Exception as e: # 计算执行时间(包括异常处理) execution_time = (time.time() - start_time) * 1000 # 存储性能数据 performance_data[ep_name].append(execution_time) # 记录性能日志 perf_logger.error( f"端点性能: {ep_name} - 执行时间: {execution_time:.2f}ms - 错误: {str(e)}", extra={ "endpoint": ep_name, "execution_time_ms": round(execution_time, 2), "status": "error", "error": str(e) } ) # 重新抛出异常 raise return wrapper return decorator # 性能统计端点 @app.get("/admin/performance") @monitor_performance("admin_performance_stats") async def get_performance_stats(): stats = {} for endpoint, times in performance_data.items(): if times: stats[endpoint] = { "count": len(times), "min_ms": round(min(times), 2), "max_ms": round(max(times), 2), "avg_ms": round(statistics.mean(times), 2), "median_ms": round(statistics.median(times), 2), "p95_ms": round(statistics.quantiles(times, n=20)[18], 2) if len(times) > 1 else times[0] } return {"performance_stats": stats} @app.get("/") @monitor_performance("root_endpoint") async def read_root(): return {"Hello": "World"} @app.get("/users/{user_id}") @monitor_performance("get_user_endpoint") async def get_user(user_id: int): # 模拟不同处理时间 if user_id % 10 == 0: await asyncio.sleep(0.1) # 模拟慢请求 return {"user_id": user_id, "name": "John Doe"} @app.get("/slow") @monitor_performance("slow_endpoint") async def slow_endpoint(): # 模拟慢请求 await asyncio.sleep(0.2) return {"message": "这是一个慢端点"}
日志分析工具集成
ELK Stack集成
ELK Stack(Elasticsearch, Logstash, Kibana)是流行的日志分析解决方案。以下是如何将FastAPI应用与ELK Stack集成的示例:
import logging from fastapi import FastAPI, Request import time import json import os from logstash_async.handler import AsynchronousLogstashHandler from logstash_async.formatter import LogstashFormatter app = FastAPI() # 确保日志目录存在 os.makedirs("logs", exist_ok=True) # 配置Logstash处理器 logstash_handler = AsynchronousLogstashHandler( host='localhost', port=5959, database_path=None # 使用内存队列 ) logstash_handler.setFormatter(LogstashFormatter()) # 配置应用日志记录器 logger = logging.getLogger("myapp") logger.setLevel(logging.INFO) logger.addHandler(logstash_handler) # 请求日志中间件 @app.middleware("http") async def log_requests(request: Request, call_next): start_time = time.time() # 获取请求信息 request_body = None if request.method in ["POST", "PUT", "PATCH"]: try: request_body = await request.json() except Exception: pass # 处理请求 response = await call_next(request) # 计算处理时间 process_time = (time.time() - start_time) * 1000 # 记录请求-响应信息到Logstash logger.info( f"请求: {request.method} {request.url.path} - " f"状态码: {response.status_code} - " f"处理时间: {process_time:.2f}ms", extra={ "request_method": request.method, "request_path": request.url.path, "request_query": dict(request.query_params), "request_body": request_body, "response_status": response.status_code, "response_time_ms": round(process_time, 2), "client_ip": request.client.host } ) return response @app.get("/") async def read_root(): logger.info("访问根路径", extra={"endpoint": "/", "method": "GET"}) return {"Hello": "World"} @app.post("/users") async def create_user(user: dict): logger.info( "创建用户", extra={ "endpoint": "/users", "method": "POST", "user_data": user } ) return {"id": 1, **user} @app.get("/users/{user_id}") async def get_user(user_id: int): logger.info( "获取用户信息", extra={ "endpoint": f"/users/{user_id}", "method": "GET", "user_id": user_id } ) return {"user_id": user_id, "name": "John Doe"}
其他日志分析工具
除了ELK Stack,还有其他日志分析工具可以与FastAPI集成。以下是使用Sentry进行错误跟踪的示例:
import logging from fastapi import FastAPI, Request import sentry_sdk from sentry_sdk.integrations.fastapi import FastApiIntegration from sentry_sdk.integrations.starlette import StarletteIntegration import os # 初始化Sentry sentry_sdk.init( dsn="YOUR_SENTRY_DSN", integrations=[ FastApiIntegration(), StarletteIntegration(), ], traces_sample_rate=1.0, ) app = FastAPI() # 确保日志目录存在 os.makedirs("logs", exist_ok=True) # 配置文件处理器 file_handler = logging.FileHandler("logs/app.log") file_handler.setLevel(logging.INFO) # 创建格式化器 formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') file_handler.setFormatter(formatter) # 配置应用日志记录器 logger = logging.getLogger("myapp") logger.setLevel(logging.INFO) logger.addHandler(file_handler) # Sentry错误处理中间件 @app.middleware("http") async def sentry_exception_middleware(request: Request, call_next): try: response = await call_next(request) return response except Exception as e: # 记录异常到Sentry sentry_sdk.capture_exception(e) # 记录异常到本地日志 logger.error( f"未处理的异常: {str(e)}", extra={ "request_path": request.url.path, "request_method": request.method, "client_ip": request.client.host } ) # 重新抛出异常 raise @app.get("/") async def read_root(): logger.info("访问根路径") return {"Hello": "World"} @app.get("/users/{user_id}") async def get_user(user_id: int): logger.info(f"获取用户信息: {user_id}") # 模拟错误 if user_id == 999: raise ValueError("模拟错误") return {"user_id": user_id, "name": "John Doe"}
最佳实践和常见问题
日志安全考虑
在配置日志系统时,安全性是一个重要考虑因素。以下是一些日志安全的最佳实践:
import logging from fastapi import FastAPI, Request, HTTPException from typing import Dict, Any import re import os import json app = FastAPI() # 确保日志目录存在 os.makedirs("logs", exist_ok=True) # 配置文件处理器 file_handler = logging.FileHandler("logs/app.log") file_handler.setLevel(logging.INFO) # 创建格式化器 formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') file_handler.setFormatter(formatter) # 配置应用日志记录器 logger = logging.getLogger("myapp") logger.setLevel(logging.INFO) logger.addHandler(file_handler) # 敏感信息模式 SENSITIVE_PATTERNS = [ re.compile(r'password["']?s*[:=]s*["']?([^"'s]+)', re.IGNORECASE), re.compile(r'token["']?s*[:=]s*["']?([^"'s]+)', re.IGNORECASE), re.compile(r'api_key["']?s*[:=]s*["']?([^"'s]+)', re.IGNORECASE), re.compile(r'secret["']?s*[:=]s*["']?([^"'s]+)', re.IGNORECASE), re.compile(r'credit_card["']?s*[:=]s*["']?(d{4}[s-]?d{4}[s-]?d{4}[s-]?d{4})', re.IGNORECASE), ] # 敏感信息脱敏函数 def sanitize_data(data: Any) -> Any: if isinstance(data, dict): return {k: sanitize_data(v) for k, v in data.items()} elif isinstance(data, list): return [sanitize_data(item) for item in data] elif isinstance(data, str): # 应用所有敏感信息模式 for pattern in SENSITIVE_PATTERNS: data = pattern.sub('[REDACTED]', data) return data else: return data # 安全日志记录函数 def log_safely(level: str, message: str, extra: Dict[str, Any] = None): # 脱敏额外数据 if extra: extra = sanitize_data(extra) # 记录日志 getattr(logger, level)(message, extra=extra) @app.post("/login") async def login(credentials: Dict[str, str]): username = credentials.get("username", "") password = credentials.get("password", "") # 安全记录登录尝试(不记录密码) log_safely( "info", f"用户登录尝试: {username}", extra={ "username": username, "endpoint": "/login", "method": "POST" } ) # 模拟验证 if username == "admin" and password == "secret": log_safely( "info", f"用户登录成功: {username}", extra={ "username": username, "endpoint": "/login", "method": "POST" } ) return {"message": "登录成功", "token": "fake-jwt-token"} else: log_safely( "warning", f"用户登录失败: {username}", extra={ "username": username, "endpoint": "/login", "method": "POST" } ) raise HTTPException(status_code=401, detail="用户名或密码错误") @app.post("/payment") async def process_payment(payment_data: Dict[str, Any]): # 安全记录支付尝试(脱敏信用卡信息) log_safely( "info", f"处理支付请求", extra={ "payment_data": payment_data, "endpoint": "/payment", "method": "POST" } ) # 模拟处理支付 return {"message": "支付处理成功", "transaction_id": "fake-transaction-id"}
常见问题和解决方案
以下是FastAPI日志配置中常见的问题和解决方案:
问题1:日志不输出
解决方案:
import logging from fastapi import FastAPI app = FastAPI() # 确保正确配置日志级别 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) # 确保获取正确的日志记录器 logger = logging.getLogger("myapp") logger.setLevel(logging.INFO) # 确保级别设置正确 # 确保添加了处理器 if not logger.handlers: handler = logging.StreamHandler() handler.setLevel(logging.INFO) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) @app.get("/") async def read_root(): logger.info("访问根路径") # 使用正确的日志记录器 return {"Hello": "World"}
问题2:Uvicorn日志与应用日志混合
解决方案:
import logging from fastapi import FastAPI import uvicorn import sys app = FastAPI() # 配置应用日志 app_logger = logging.getLogger("myapp") app_logger.setLevel(logging.INFO) # 创建应用日志处理器 app_handler = logging.StreamHandler(sys.stdout) app_handler.setLevel(logging.INFO) app_formatter = logging.Formatter('%(asctime)s - [APP] - %(name)s - %(levelname)s - %(message)s') app_handler.setFormatter(app_formatter) app_logger.addHandler(app_handler) # 配置Uvicorn日志 uvicorn_logger = logging.getLogger("uvicorn") uvicorn_logger.setLevel(logging.INFO) # 创建Uvicorn日志处理器 uvicorn_handler = logging.StreamHandler(sys.stdout) uvicorn_handler.setLevel(logging.INFO) uvicorn_formatter = logging.Formatter('%(asctime)s - [UVICORN] - %(name)s - %(levelname)s - %(message)s') uvicorn_handler.setFormatter(uvicorn_formatter) uvicorn_logger.addHandler(uvicorn_handler) @app.get("/") async def read_root(): app_logger.info("访问根路径") return {"Hello": "World"} if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000, log_level="info")
问题3:异步应用中的日志性能问题
解决方案:
import logging from fastapi import FastAPI import asyncio import os from logging.handlers import RotatingFileHandler from concurrent.futures import ThreadPoolExecutor import time app = FastAPI() # 确保日志目录存在 os.makedirs("logs", exist_ok=True) # 创建线程池用于日志处理 log_executor = ThreadPoolExecutor(max_workers=1) # 配置文件处理器 file_handler = RotatingFileHandler( "logs/app.log", maxBytes=10*1024*1024, # 10MB backupCount=5 ) file_handler.setLevel(logging.INFO) # 创建格式化器 formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') file_handler.setFormatter(formatter) # 配置应用日志记录器 logger = logging.getLogger("myapp") logger.setLevel(logging.INFO) logger.addHandler(file_handler) # 异步日志记录函数 async def async_log(level, message, *args, **kwargs): def _log(): getattr(logger, level)(message, *args, **kwargs) # 在线程池中执行日志记录 loop = asyncio.get_event_loop() await loop.run_in_executor(log_executor, _log) # 批量日志记录器 class BatchLogger: def __init__(self, max_size=100, flush_interval=5): self.max_size = max_size self.flush_interval = flush_interval self.logs = [] self.last_flush = time.time() self.flush_task = None async def log(self, level, message, *args, **kwargs): self.logs.append((level, message, args, kwargs)) # 检查是否需要刷新 if len(self.logs) >= self.max_size or time.time() - self.last_flush >= self.flush_interval: await self.flush() async def flush(self): if not self.logs: return # 复制日志并清空列表 logs_to_flush = self.logs.copy() self.logs = [] self.last_flush = time.time() # 批量记录日志 def _batch_log(): for level, message, args, kwargs in logs_to_flush: getattr(logger, level)(message, *args, **kwargs) # 在线程池中执行批量日志记录 loop = asyncio.get_event_loop() await loop.run_in_executor(log_executor, _batch_log) # 创建批量日志记录器 batch_logger = BatchLogger() @app.get("/") async def read_root(): await batch_logger.log("info", "访问根路径") return {"Hello": "World"} @app.get("/users/{user_id}") async def get_user(user_id: int): await batch_logger.log("info", f"获取用户信息: {user_id}") return {"user_id": user_id, "name": "John Doe"} # 定期刷新批量日志 @app.on_event("startup") async def startup_event(): async def periodic_flush(): while True: await batch_logger.flush() await asyncio.sleep(1) # 每秒检查一次 # 启动定期刷新任务 loop = asyncio.get_event_loop() loop.create_task(periodic_flush()) # 应用关闭时刷新剩余日志 @app.on_event("shutdown") async def shutdown_event(): await batch_logger.flush()
总结
FastAPI作为现代高性能Web框架,其日志系统的正确配置和管理对于应用的可维护性、性能优化和问题排查至关重要。本文从基础到进阶,全面介绍了FastAPI日志配置的各个方面,包括:
- 基础配置:介绍了日志级别、基本配置和FastAPI内置日志系统。
- 进阶配置:详细讲解了自定义日志格式、日志处理器和过滤器、日志轮转和归档等高级配置。
- 高效日志管理技巧:探讨了结构化日志、异步日志处理和日志聚合等技巧。
- 日志与性能优化:分析了日志对性能的影响,提供了优化日志记录的技巧和条件日志记录方法。
- 日志在调试中的应用:展示了如何利用日志进行错误追踪、请求记录和性能监控。
- 日志分析工具集成:介绍了如何将FastAPI应用与ELK Stack和其他日志分析工具集成。
- 最佳实践和常见问题:提供了日志安全考虑和常见问题的解决方案。
通过合理应用这些技术和方法,你可以构建一个高效、可靠且易于维护的日志系统,为FastAPI应用的开发、部署和运维提供强有力的支持。记住,良好的日志系统不仅是调试工具,更是应用健康监控和性能优化的重要手段。