MongoDB与Python集成开发实战指南:从连接配置到高效数据操作的完整教程
引言:为什么选择MongoDB与Python组合
MongoDB作为一个面向文档的NoSQL数据库,与Python这种灵活高效的编程语言结合,为现代应用开发提供了强大的数据存储和处理能力。Python的pymongo驱动程序提供了与MongoDB交互的完整接口,使得数据操作变得简单直观。
这种组合特别适合以下场景:
- 快速迭代的Web应用开发
- 大数据处理和分析
- 物联网数据存储
- 内容管理系统
- 实时数据分析平台
第一部分:环境准备与安装配置
1.1 安装MongoDB数据库
首先需要在系统中安装MongoDB数据库。以Ubuntu系统为例:
# 导入MongoDB公钥 wget -qO - https://www.mongodb.org/static/pgp/server-6.0.asc | sudo apt-key add - # 创建列表文件 echo "deb [ arch=amd64,arm64 ] https://repo.mongodb.org/apt/ubuntu focal/mongodb-org/6.0 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-6.0.list # 更新包管理器并安装 sudo apt-get update sudo apt-get install -y mongodb-org # 启动MongoDB服务 sudo systemctl start mongod sudo systemctl enable mongod 1.2 安装Python驱动程序pymongo
使用pip安装最新版本的pymongo:
# 基础安装 pip install pymongo # 如果需要支持MongoDB的SCRAM-SHA-256认证机制 pip install pymongo[srv] # 安装特定版本(推荐用于生产环境) pip install pymongo==4.5.0 1.3 验证安装
创建一个简单的Python脚本来验证安装:
import pymongo from pymongo import MongoClient # 连接测试 try: client = MongoClient('mongodb://localhost:27017/') # 检查连接是否成功 client.admin.command('ping') print("✅ MongoDB连接成功!") # 显示服务器信息 server_info = client.server_info() print(f"MongoDB版本: {server_info['version']}") except Exception as e: print(f"❌ 连接失败: {e}") 第二部分:连接配置详解
2.1 基本连接方式
2.1.1 单节点连接
from pymongo import MongoClient # 标准连接字符串格式 client = MongoClient('mongodb://username:password@host:port/database') # 本地默认连接 client = MongoClient('mongodb://localhost:27017/') # 带认证数据库的连接 client = MongoClient('mongodb://admin:secret@localhost:27017/admin') 2.1.2 连接池配置
MongoDB的Python驱动默认使用连接池,可以优化性能:
from pymongo import MongoClient # 配置连接池参数 client = MongoClient( 'mongodb://localhost:27017/', maxPoolSize=100, # 最大连接数 minPoolSize=10, # 最小连接数 maxIdleTimeMS=60000, # 连接最大空闲时间(毫秒) socketTimeoutMS=20000, # Socket超时时间 connectTimeoutMS=10000, # 连接超时时间 retryWrites=True, # 重试写入操作 retryReads=True # 重试读取操作 ) 2.2 高级连接配置
2.2.1 副本集连接
# 连接到副本集 client = MongoClient([ 'mongodb://node1:27017/', 'mongodb://node2:27017/', 'mongodb://node3:27017/' ]) # 带认证的副本集连接 uri = "mongodb://user:pass@node1:27017,node2:27017,node3:27017/database?replicaSet=myReplSet" client = MongoClient(uri) 2.2.2 分片集群连接
# 连接到分片集群 uri = "mongodb://mongos1:27017,mongos2:27017/database" client = MongoClient(uri) # 带认证的分片集群 uri = "mongodb://user:pass@mongos1:27017,mongos2:27017/database?authSource=admin" client = MongoClient(uri) 2.3 连接管理最佳实践
import os from contextlib import contextmanager from pymongo import MongoClient class MongoDBManager: """MongoDB连接管理器""" def __init__(self, connection_string=None): # 从环境变量获取连接字符串 self.connection_string = connection_string or os.getenv( 'MONGODB_URI', 'mongodb://localhost:27017/' ) self._client = None def get_client(self): """获取客户端实例(单例模式)""" if self._client is None or not self._client.is_primary: self._client = MongoClient( self.connection_string, serverSelectionTimeoutMS=5000, retryWrites=True ) return self._client @contextmanager def get_database(self, db_name=None): """上下文管理器,自动管理连接关闭""" client = self.get_client() try: db = client[db_name] if db_name else client.get_default_database() yield db finally: # 在生产环境中,通常保持连接打开 # 但可以在这里添加清理逻辑 pass def close(self): """关闭连接""" if self._client: self._client.close() self._client = None # 使用示例 if __name__ == "__main__": # 配置连接 manager = MongoDBManager("mongodb://localhost:27017/myapp") # 使用上下文管理器 with manager.get_database() as db: # 执行数据库操作 result = db.test_collection.insert_one({"name": "test"}) print(f"插入文档ID: {result.inserted_id}") # 关闭连接 manager.close() 第三部分:数据库与集合操作
3.1 数据库操作
from pymongo import MongoClient client = MongoClient('mongodb://localhost:27017/') # 获取数据库引用 # 方法1:直接通过属性访问 db = client.my_database # 方法2:通过字典方式访问 db = client['my_database'] # 列出所有数据库 databases = client.list_database_names() print(f"可用数据库: {databases}") # 删除数据库 client.drop_database('my_database') 3.2 集合操作
# 获取集合引用 collection = db.my_collection # 或者 collection = db['my_collection'] # 创建集合(显式创建,可选) db.create_collection( "users", capped=True, # 创建固定大小的集合 size=1000000, # 大小(字节) max=1000 # 最大文档数 ) # 列出所有集合 collections = db.list_collection_names() print(f"数据库中的集合: {collections}") # 删除集合 db.drop_collection('my_collection') 第四部分:CRUD操作详解
4.1 创建操作(Create)
4.1.1 插入单个文档
# 基本插入操作 document = { "name": "张三", "age": 28, "email": "zhangsan@example.com", "skills": ["Python", "MongoDB", "Docker"], "address": { "city": "北京", "street": "朝阳路123号" }, "is_active": True, "created_at": datetime.utcnow() } # 插入文档 result = db.users.insert_one(document) print(f"插入的文档ID: {result.inserted_id}") # 插入时指定_id(如果未提供,MongoDB会自动生成) document_with_id = { "_id": "user_001", "name": "李四", "age": 32 } db.users.insert_one(document_with_id) 4.1.2 插入多个文档
# 批量插入文档(更高效) documents = [ {"name": "王五", "age": 25, "department": "技术部"}, {"name": "赵六", "age": 29, "department": "产品部"}, {"name": "钱七", "age": 31, "department": "市场部"} ] # 批量插入 result = db.users.insert_many(documents) print(f"插入的文档ID列表: {result.inserted_ids}") # 批量插入时的错误处理 try: result = db.users.insert_many(documents, ordered=False) # ordered=False允许部分成功 except pymongo.errors.BulkWriteError as e: print(f"批量插入错误: {e.details}") 4.1.3 插入时的选项
# 插入时的写关注设置 db.users.insert_one( {"name": "测试用户"}, write_concern=pymongo.WriteConcern(w=1, j=True) # w: 写关注级别, j: 日志确认 ) # 插入时忽略重复键错误 try: db.users.insert_one({"_id": "duplicate_key", "name": "测试"}) db.users.insert_one({"_id": "duplicate_key", "name": "测试2"}) except pymongo.errors.DuplicateKeyError: print("检测到重复键") 4.2 读取操作(Read)
4.2.1 查询单个文档
# 基本查询 user = db.users.find_one({"name": "张三"}) print(user) # 按_id查询 user = db.users.find_one({"_id": "user_001"}) print(user) # 投影(只返回特定字段) user = db.users.find_one( {"name": "张三"}, {"_id": 0, "name": 1, "email": 1} # 0表示不返回,1表示返回 ) print(user) # 输出: {'name': '张三', 'email': 'zhangsan@example.com'} 4.2.2 查询多个文档
# 查询所有文档 all_users = db.users.find() for user in all_users: print(user) # 条件查询 # 比较操作符: $eq, $ne, $gt, $gte, $lt, $lte, $in, $nin young_users = db.users.find({"age": {"$lt": 30}}) for user in young_users: print(f"{user['name']} - {user['age']}岁") # 逻辑操作符: $and, $or, $not, $nor # 查询年龄在25-30岁之间或部门为技术部的用户 query = { "$or": [ {"age": {"$gte": 25, "$lte": 30}}, {"department": "技术部"} ] } users = db.users.find(query) # 数组查询 python_users = db.users.find({"skills": "Python"}) for user in python_users: print(f"Python开发者: {user['name']}") # 嵌套文档查询 beijing_users = db.users.find({"address.city": "北京"}) 4.2.3 高级查询技巧
# 正则表达式查询 import re pattern = re.compile('^张', re.IGNORECASE) zhang_users = db.users.find({"name": pattern}) # $where查询(谨慎使用,性能较差) # db.users.find({"$where": "this.age > 25"}) # 地理空间查询(需要2dsphere索引) db.places.insert_one({ "name": "故宫", "location": { "type": "Point", "coordinates": [116.397, 39.916] } }) # 查询附近地点 nearby_places = db.places.find({ "location": { "$near": { "$geometry": { "type": "Point", "coordinates": [116.40, 39.92] }, "$maxDistance": 5000 # 5公里内 } } }) 4.2.4 排序、分页和限制
# 排序 # 1: 升序, -1: 降序 sorted_users = db.users.find().sort("age", 1) # 按年龄升序 sorted_users = db.users.find().sort([("age", 1), ("name", -1)]) # 多字段排序 # 限制返回数量 limited_users = db.users.find().limit(10) # 分页查询 page = 2 page_size = 10 skip = (page - 1) * page_size paginated_users = db.users.find().skip(skip).limit(page_size) # 组合使用 results = db.users.find({"age": {"$gte": 25}}) .sort("age", -1) .skip(10) .limit(10) 4.3 更新操作(Update)
4.3.1 更新单个文档
# $set操作符:更新或添加字段 result = db.users.update_one( {"name": "张三"}, {"$set": {"age": 29, "updated_at": datetime.utcnow()}} ) print(f"匹配数: {result.matched_count}, 修改数: {result.modified_count}") # $inc操作符:递增字段值 result = db.users.update_one( {"name": "张三"}, {"$inc": {"age": 1}} # 年龄加1 ) # $push操作符:向数组添加元素 result = db.users.update_one( {"name": "张三"}, {"$push": {"skills": "JavaScript"}} ) # $addToSet操作符:添加不重复的数组元素 result = db.users.update_one( {"name": "张三"}, {"$addToSet": {"skills": "Python"}} # 如果已存在则不会重复添加 ) # $unset操作符:删除字段 result = db.users.update_one( {"name": "张三"}, {"$unset": {"temporary_field": ""}} ) # $rename操作符:重命名字段 result = db.users.update_one( {"name": "张三"}, {"$rename": {"old_field": "new_field"}} ) 4.3.2 更新多个文档
# 更新所有匹配的文档 result = db.users.update_many( {"age": {"$lt": 30}}, {"$set": {"is_young": True}} ) print(f"匹配数: {result.matched_count}, 修改数: {result.modified_count}") # 使用数组操作符批量更新 result = db.users.update_many( {"department": "技术部"}, {"$push": {"skills": "Docker"}} ) 4.3.3 更新选项
# upsert: 如果文档不存在则创建 result = db.users.update_one( {"name": "新用户"}, {"$set": {"age": 25, "department": "技术部"}}, upsert=True ) # collation: 指定排序规则(用于字符串比较) result = db.users.update_one( {"name": "张三"}, {"$set": {"status": "active"}}, collation=pymongo.Collation(locale="zh", strength=2) ) 4.4 删除操作(Delete)
4.4.1 删除单个文档
# 删除匹配的第一个文档 result = db.users.delete_one({"name": "测试用户"}) print(f"删除的文档数: {result.deleted_count}") # 按_id删除 result = db.users.delete_one({"_id": "user_001"}) 4.4.2 删除多个文档
# 删除所有匹配的文档 result = db.users.delete_many({"age": {"$lt": 25}}) print(f"删除的文档数: {result.deleted_count}") # 删除集合中的所有文档(但保留集合) result = db.users.delete_many({}) print(f"删除的文档数: {result.deleted_count}") # 删除整个集合(更高效) db.users.drop() 第五部分:索引与性能优化
5.1 索引类型与创建
# 单字段索引 db.users.create_index("email", unique=True) # 复合索引 db.users.create_index([("age", 1), ("name", -1)]) # age升序,name降序 # 文本索引(全文搜索) db.articles.create_index([("title", "text"), ("content", "text")]) # 地理空间索引 db.places.create_index([("location", "2dsphere")]) # 哈希索引(用于等值查询) db.users.create_index([("user_id", "hashed")]) # TTL索引(自动过期) db.sessions.create_index("created_at", expireAfterSeconds=3600) # 1小时后自动删除 # 多键索引(对数组字段) db.users.create_index("skills") # 部分索引(只对满足条件的文档创建索引) db.users.create_index( [("email", 1)], partialFilterExpression={"is_active": True} ) 5.2 索引管理
# 查看集合的所有索引 indexes = db.users.list_indexes() for index in indexes: print(index) # 获取索引统计信息 index_stats = db.users.aggregate([ {"$indexStats": {}} ]) # 删除索引 db.users.drop_index("email_1") # 按索引名删除 db.users.drop_indexes() # 删除所有索引(保留_id索引) # 重建索引(修复索引) db.users.reindex() 5.3 查询性能分析
# 使用explain()分析查询计划 query_plan = db.users.find({"age": {"$gte": 25}}).explain() print(query_plan) # 关键指标: # - executionStats.executionTimeMillis: 执行时间 # - executionStats.totalDocsExamined: 检查的文档数 # - executionStats.totalKeysExamined: 检查的索引键数 # - executionStages.stage: 执行阶段(COLLSCAN表示全表扫描,IXSCAN表示索引扫描) # 使用hint()强制使用特定索引 results = db.users.find({"age": {"$gte": 25}}).hint("age_1") 5.4 查询优化技巧
# 1. 覆盖查询(只查询索引字段) # 创建复合索引 db.users.create_index([("age", 1), ("name", 1), ("email", 1)]) # 查询只返回索引字段 results = db.users.find( {"age": {"$gte": 25}}, {"_id": 0, "age": 1, "name": 1, "email": 1} ).explain() # 2. 避免全表扫描 # 不好的做法 db.users.find({"$where": "this.age > 25"}) # 好的做法 db.users.find({"age": {"$gt": 25}}) # 3. 使用投影减少数据传输 # 只查询需要的字段 db.users.find({}, {"_id": 0, "name": 1, "email": 1}) # 4. 批量操作优化 # 使用bulk_write进行批量操作 from pymongo import InsertOne, UpdateOne, DeleteOne bulk_operations = [ InsertOne({"name": "用户1", "age": 25}), UpdateOne({"name": "用户2"}, {"$set": {"age": 26}}), DeleteOne({"name": "用户3"}) ] result = db.users.bulk_write(bulk_operations) print(f"插入: {result.inserted_count}, 修改: {result.modified_count}, 删除: {result.deleted_count}") 第六部分:聚合框架(Aggregation Framework)
6.1 聚合管道基础
# 基本聚合示例:统计各年龄段用户数量 pipeline = [ {"$match": {"age": {"$gte": 20, "$lte": 40}}}, # 筛选 {"$group": {"_id": "$age", "count": {"$sum": 1}}}, # 分组统计 {"$sort": {"count": -1}}, # 排序 {"$limit": 10} # 限制结果 ] results = db.users.aggregate(pipeline) for result in results: print(f"年龄 {result['_id']}: {result['count']}人") 6.2 常用聚合操作符
# 1. $project: 重塑文档结构 pipeline = [ { "$project": { "name": 1, "age": 1, "email_upper": {"$toUpper": "$email"}, # 转换为大写 "skills_count": {"$size": "$skills"} # 数组长度 } } ] # 2. $unwind: 展开数组 pipeline = [ {"$unwind": "$skills"}, {"$group": {"_id": "$skills", "users": {"$push": "$name"}}} ] # 3. $lookup: 关联查询(类似SQL JOIN) # 假设有orders集合和users集合 pipeline = [ { "$lookup": { "from": "users", "localField": "user_id", "foreignField": "_id", "as": "user_info" } }, {"$unwind": "$user_info"}, {"$project": {"order_id": 1, "amount": 1, "user_name": "$user_info.name"}} ] # 4. $bucket: 分桶统计 pipeline = [ { "$bucket": { "groupBy": "$age", "boundaries": [0, 20, 30, 40, 50, 100], "default": "其他", "output": { "count": {"$sum": 1}, "users": {"$push": "$name"} } } } ] # 5. $facet: 多维度聚合 pipeline = [ { "$facet": { "age_stats": [ {"$group": {"_id": None, "avg_age": {"$avg": "$age"}, "max_age": {"$max": "$age"}}} ], "dept_stats": [ {"$group": {"_id": "$department", "count": {"$sum": 1}}} ] } } ] 6.3 聚合性能优化
# 1. 使用$match尽早过滤数据 # 好的做法 pipeline = [ {"$match": {"age": {"$gte": 25}}}, # 先过滤 {"$group": {"_id": "$department", "count": {"$sum": 1}}} ] # 不好的做法 pipeline = [ {"$group": {"_id": "$department", "count": {"$sum": 1}}}, {"$match": {"count": {"$gte": 5}}} # 后过滤 ] # 2. 使用allowDiskUse处理大数据 results = db.users.aggregate(pipeline, allowDiskUse=True) # 3. 使用explain()分析聚合性能 explain_result = db.users.aggregate(pipeline).explain() 第七部分:事务处理
7.1 事务基础
from pymongo import MongoClient from pymongo.errors import OperationFailure client = MongoClient('mongodb://localhost:27017/') db = client.myapp # MongoDB 4.0+ 支持多文档事务 try: with client.start_session() as session: with session.start_transaction(): # 事务中的操作 db.users.update_one( {"_id": "user1"}, {"$inc": {"balance": -100}}, session=session ) db.users.update_one( {"_id": "user2"}, {"$inc": {"balance": 100}}, session=session ) # 提交事务 session.commit_transaction() print("事务提交成功") except OperationFailure as e: print(f"事务失败: {e}") # 事务会自动回滚 7.2 事务最佳实践
# 1. 保持事务简短 # 不好的做法:在事务中进行复杂计算或外部API调用 # 好的做法:在事务外准备好数据,事务只包含数据库操作 # 2. 设置合理的超时时间 try: with client.start_session() as session: with session.start_transaction(timeout=30): # 30秒超时 # 事务操作 pass except OperationFailure as e: print(f"事务超时或失败: {e}") # 3. 重试逻辑 def execute_with_retry(func, max_retries=3): for attempt in range(max_retries): try: return func() except OperationFailure as e: if "TransientTransactionError" in str(e) and attempt < max_retries - 1: print(f"事务错误,重试 {attempt + 1}/{max_retries}") continue raise # 使用示例 def transfer_funds(): with client.start_session() as session: with session.start_transaction(): db.users.update_one({"_id": "user1"}, {"$inc": {"balance": -100}}, session=session) db.users.update_one({"_id": "user2"}, {"$inc": {"balance": 100}}, session=session) execute_with_retry(transfer_funds) 第八部分:数据建模与模式设计
8.1 嵌入式 vs 引用式
# 嵌入式模式(适合一对少关系) # 用户和订单(一个用户少量订单) user_with_orders = { "_id": "user1", "name": "张三", "orders": [ {"order_id": "order1", "amount": 100, "date": datetime.utcnow()}, {"order_id": "order2", "amount": 200, "date": datetime.utcnow()} ] } # 引用式模式(适合一对多关系) # 用户和评论(一个用户多条评论) # users集合 user = {"_id": "user1", "name": "张三"} # comments集合 comment1 = {"_id": "comment1", "user_id": "user1", "content": "评论内容"} comment2 = {"_id": "comment2", "user_id": "user1", "content": "另一条评论"} # 8.2 范式化与反范式化 # 反范式化(冗余数据,读取快) product = { "_id": "product1", "name": "商品名称", "category": "电子产品", "category_name": "电子产品", # 冗余字段 "seller": { "_id": "seller1", "name": "卖家名称", "rating": 4.5 # 冗余数据 } } # 范式化(引用数据,更新快) # products集合 product = {"_id": "product1", "name": "商品名称", "seller_id": "seller1"} # sellers集合 seller = {"_id": "seller1", "name": "卖家名称", "rating": 4.5} 8.3 时间序列数据建模
# 时间序列数据优化存储 sensor_data = { "sensor_id": "sensor_001", "timestamp": datetime.utcnow(), "readings": { "temperature": 25.6, "humidity": 60.2, "pressure": 1013.25 } } # 创建TTL索引自动清理旧数据 db.sensor_data.create_index("timestamp", expireAfterSeconds=86400) # 24小时后自动删除 第九部分:错误处理与日志记录
9.1 常见错误类型
from pymongo.errors import ( ConnectionFailure, OperationFailure, DuplicateKeyError, BulkWriteError, NetworkTimeout ) def safe_database_operation(): try: # 尝试连接 client = MongoClient('mongodb://localhost:27017/', serverSelectionTimeoutMS=2000) client.admin.command('ping') # 执行操作 db = client.testdb result = db.users.insert_one({"_id": "test1", "name": "测试"}) except ConnectionFailure as e: print(f"连接失败: {e}") # 尝试重连或使用备用服务器 except DuplicateKeyError as e: print(f"重复键错误: {e}") # 处理重复数据 except BulkWriteError as e: print(f"批量写入错误: {e.details}") # 分析错误详情 except OperationFailure as e: print(f"操作失败: {e}") # 检查权限、索引等 except NetworkTimeout as e: print(f"网络超时: {e}") # 实现重试逻辑 except Exception as e: print(f"未知错误: {e}") # 记录日志 9.2 重试机制实现
import time from functools import wraps def retry_on_failure(max_attempts=3, delay=1): def decorator(func): @wraps(func) def wrapper(*args, **kwargs): for attempt in range(max_attempts): try: return func(*args, **kwargs) except (NetworkTimeout, OperationFailure) as e: if attempt == max_attempts - 1: raise print(f"尝试 {attempt + 1} 失败,{delay}秒后重试: {e}") time.sleep(delay) delay *= 2 # 指数退避 return wrapper return decorator @retry_on_failure(max_attempts=3, delay=1) def insert_user(user_data): db.users.insert_one(user_data) 9.3 日志记录
import logging from pymongo import MongoClient # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('mongodb_operations.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) class LoggingMongoClient: def __init__(self, connection_string): self.connection_string = connection_string self.client = None self.logger = logger def connect(self): try: self.client = MongoClient(self.connection_string) self.logger.info("成功连接到MongoDB") return True except Exception as e: self.logger.error(f"连接失败: {e}") return False def insert_with_logging(self, collection, document): try: result = collection.insert_one(document) self.logger.info(f"成功插入文档,ID: {result.inserted_id}") return result except Exception as e: self.logger.error(f"插入失败: {e}, 文档: {document}") raise 第十部分:高级主题与最佳实践
10.1 变更流(Change Streams)
# 监听数据库变化 def watch_changes(): with db.users.watch() as stream: for change in stream: print(f"变更类型: {change['operationType']}") print(f"文档ID: {change['documentKey']['_id']}") if 'fullDocument' in change: print(f"完整文档: {change['fullDocument']}") # 监听特定集合变化 pipeline = [ {"$match": {"operationType": {"$in": ["insert", "update", "delete"]}}} ] with db.users.watch(pipeline) as stream: for change in stream: # 处理变更 pass 10.2 GridFS文件存储
import gridfs from bson import ObjectId # 存储文件 def store_file(file_path, filename): fs = gridfs.GridFS(db) with open(file_path, 'rb') as f: file_id = fs.put(f, filename=filename, content_type="image/jpeg") return file_id # 读取文件 def read_file(file_id): fs = gridfs.GridFS(db) grid_out = fs.get(ObjectId(file_id)) return grid_out.read() # 删除文件 def delete_file(file_id): fs = gridfs.GridFS(db) fs.delete(ObjectId(file_id)) 10.3 连接字符串安全
import os from urllib.parse import quote_plus # 安全地构建连接字符串 username = quote_plus(os.getenv('MONGO_USER')) password = quote_plus(os.getenv('MONGO_PASSWORD')) host = os.getenv('MONGO_HOST', 'localhost') port = os.getenv('MONGO_PORT', '27017') database = os.getenv('MONGO_DB', 'myapp') connection_string = f"mongodb://{username}:{password}@{host}:{port}/{database}?authSource=admin" # 或者使用URI格式 uri = f"mongodb://{host}:{port}/?authSource=admin" client = MongoClient(uri, authMechanism='SCRAM-SHA-256', authSource='admin') 10.4 生产环境配置
# 生产环境推荐配置 production_config = { 'host': 'mongodb://cluster0.example.com:27017,cluster1.example.com:27017/', 'replicaSet': 'myReplSet', 'readPreference': 'secondaryPreferred', # 优先从副本读取 'retryWrites': True, 'retryReads': True, 'connectTimeoutMS': 10000, 'socketTimeoutMS': 60000, 'maxPoolSize': 100, 'minPoolSize': 10, 'maxIdleTimeMS': 600000, 'w': 'majority', # 写关注 'j': True, # 写入日志确认 'readConcern': 'majority', # 读关注 } client = MongoClient(**production_config) 总结
MongoDB与Python的集成为现代应用开发提供了强大的数据处理能力。通过本指南,您应该已经掌握了:
- 连接配置:从基础连接到高级集群配置
- CRUD操作:完整的增删改查操作及最佳实践
- 性能优化:索引策略、查询优化和聚合框架
- 事务处理:多文档事务的使用和注意事项
- 数据建模:嵌入式与引用式模式的选择
- 错误处理:健壮的错误处理和重试机制
- 高级特性:变更流、GridFS等高级功能
在实际项目中,建议:
- 始终使用连接池和适当的超时设置
- 为查询创建合适的索引
- 使用聚合框架进行复杂数据分析
- 实施适当的错误处理和日志记录
- 根据业务需求选择合适的数据模型
- 在生产环境中使用副本集和适当的写关注
通过这些实践,您可以构建高性能、可靠且可扩展的MongoDB应用。
支付宝扫一扫
微信扫一扫