基于OpenCV的视频监控异常行为分析与实时报警系统解决方案
引言
随着人工智能和计算机视觉技术的飞速发展,传统的视频监控系统正在向智能化、自动化方向演进。基于OpenCV的视频监控异常行为分析与实时报警系统,能够自动识别监控画面中的异常行为(如跌倒、打架、徘徊、遗留物品等),并在检测到异常时立即触发报警机制,极大地提高了监控效率和安全性。本文将详细介绍如何使用OpenCV构建一个完整的异常行为分析与实时报警系统,涵盖从基础概念到核心算法实现,再到系统集成的全过程。
一、系统架构设计
1.1 系统整体架构
一个完整的异常行为分析系统通常包含以下几个核心模块:
- 视频采集模块:负责从摄像头、视频文件或网络流中获取视频数据。
- 预处理模块:对视频帧进行降噪、缩放、灰度化等操作,为后续分析做准备。
- 目标检测与跟踪模块:检测画面中的运动目标(如人、车辆)并进行持续跟踪。
- 行为分析模块:基于目标的运动轨迹、姿态、交互等特征,识别异常行为。
- 报警模块:当检测到异常行为时,触发报警(如声音、邮件、短信、界面弹窗等)。
- 存储与记录模块:保存异常事件的视频片段、截图和日志,便于事后追溯。
1.2 技术选型
- 核心库:OpenCV(计算机视觉基础库)
- 深度学习框架(可选):TensorFlow、PyTorch(用于更复杂的行为识别)
- 目标检测算法:背景减除法、YOLO、SSD等
- 目标跟踪算法:KCF、CSRT、DeepSORT等
- 报警集成:SMTP(邮件)、Twilio(短信)、Webhook(API调用)
二、环境准备与安装
在开始编写代码之前,我们需要搭建开发环境。以下是详细的安装步骤:
2.1 安装Python和OpenCV
推荐使用Python 3.8+版本。安装OpenCV的命令如下:
# 安装OpenCV主库 pip install opencv-python # 安装OpenCV扩展模块(包含更多算法如SIFT、背景减除算法等) pip install opencv-contrib-python # 安装其他辅助库 pip install numpy # 数值计算 pip install imutils # 图像处理辅助工具 2.2 安装深度学习相关库(可选)
如果需要使用深度学习模型进行目标检测或行为识别,需要安装以下库:
# 安装TensorFlow(CPU版本) pip install tensorflow # 安装PyTorch(根据CUDA版本选择) pip install torch torchvision torchaudio # 安装YOLOv5依赖 pip install yolov5 2.3 验证安装
创建一个简单的Python脚本验证OpenCV是否正确安装:
import cv2 # 打印OpenCV版本 print(f"OpenCV版本: {cv2.__version__}") # 读取测试图片(如果有的话) # img = cv2.imread('test.jpg') # cv2.imshow('Test', img) # cv2.waitKey(0) # cv2.destroyAllWindows() 三、视频采集与预处理
3.1 视频源的获取
OpenCV支持多种视频源,包括本地摄像头、视频文件和网络流(RTSP/RTMP)。
3.1.1 从本地摄像头获取视频
import cv2 # 打开默认摄像头(0表示第一个摄像头) cap = cv2.VideoCapture(0) # 检查摄像头是否成功打开 if not cap.isOpened(): print("无法打开摄像头") exit() # 读取视频帧 while True: ret, frame = cap.read() if not ret: print("无法接收帧(摄像头被移除?)") break # 显示视频帧 cv2.imshow('Camera Feed', frame) # 按'q'键退出 if cv2.waitKey(1) & 0xFF == ord('q'): break # 释放资源 cap.release() cv2.destroyAllWindows() 3.1.2 从视频文件获取视频
import cv2 # 打开视频文件 cap = cv2.VideoCapture('video.mp4') # 检查视频是否成功打开 if not cap.isOpened(): print("无法打开视频文件") exit() # 获取视频信息 fps = cap.get(cv2.CAP_PROP_FPS) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) print(f"视频信息: {fps} FPS, {width}x{height}") # 读取视频帧 while True: ret, frame = cap.read() if not ret: break cv2.imshow('Video File', frame) if cv2.waitKey(25) & 0xFF == ord('q'): break cap.release() cv2.destroyAllWindows() 3.1.3 从网络流获取视频(RTSP)
import cv2 # RTSP地址(例如:rtsp://username:password@ip:port/stream) rtsp_url = "rtsp://admin:password@192.168.1.100:554/stream1" cap = cv2.VideoCapture(rtsp_url) if not cap.isOpened(): print("无法连接到RTSP流") exit() while True: ret, frame = cap.read() if not ret: print("连接断开,尝试重连...") # 可以在这里添加重连逻辑 break cv2.imshow('RTSP Stream', frame) if cv2.waitKey(1) & 0xFF == ord('q'): break cap.release() cv2.destroyAllWindows() 3.2 视频帧预处理
在进行目标检测和行为分析之前,通常需要对视频帧进行预处理,以提高算法的准确性和鲁棒性。
3.2.1 灰度化与高斯模糊
import cv2 cap = cv2.VideoCapture(0) while True: ret, frame = cap.read() if not ret: break # 1. 灰度化 gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) # 2. 高斯模糊(降噪) blurred = cv2.GaussianBlur(gray, (5, 5), 0) # 显示处理结果 cv2.imshow('Original', frame) cv2.imshow('Gray', gray) cv2.imshow('Blurred', blurred) if cv2.waitKey(1) & 0xFF == ord('q'): break cap.release() cv2.destroyAllWindows() 3.2.2 调整大小与归一化
import cv2 import numpy as np cap = cv2.VideoCapture(0) while True: ret, frame = cap.read() if not ret: break # 调整大小(例如缩小到640x480) resized = cv2.resize(frame, (640, 480)) # 归一化(将像素值从0-255映射到0-1) normalized = resized.astype('float32') / 255.0 # 显示结果(注意:归一化后的图像需要重新缩放才能显示) cv2.imshow('Resized', resized) cv2.imshow('Normalized', normalized) # 显示会很暗,因为值在0-1之间 if cv2.waitKey(1) & 0xFF == ord('q'): break cap.release() cv2.destroyAllWindows() 四、运动目标检测
运动目标检测是异常行为分析的基础。本节介绍两种常用方法:背景减除法和深度学习目标检测。
4.1 背景减除法(Background Subtraction)
背景减除法适用于固定摄像头场景,通过建立背景模型来检测前景(运动)物体。
4.1.1 MOG2背景减除器
import cv2 # 创建MOG2背景减除器 backSub = cv2.createBackgroundSubtractorMOG2(history=500, varThreshold=16, detectShadows=True) cap = cv2.VideoCapture(0) while True: ret, frame = cap.read() if not ret: break # 应用背景减除器 fgMask = backSub.apply(frame) # 显示结果 cv2.imshow('Frame', frame) cv2.imshow('FG Mask', fgMask) # 打印背景减除器的状态 # print(f"背景模型学习率: {backSub.getBackgroundRatio()}") if cv2.waitKey(1) & 0xFF == ord('q'): break cap.release() cv2.destroyAllWindows() 4.1.2 KNN背景减除器
import cv2 # 创建KNN背景减除器 backSub = cv2.createBackgroundSubtractorKNN(history=500, dist2Threshold=400, detectShadows=True) cap = cv2.VideoCapture(0) while True: ret, frame = cap.read() if not ret: break fgMask = backSub.apply(frame) # 后处理:腐蚀和膨胀 kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3)) fgMask = cv2.erode(fgMask, kernel, iterations=2) fgMask = cv2.dilate(fgMask, kernel, iterations=2) cv2.imshow('Frame', frame) cv2.imshow('FG Mask', fgMask) if cv2.waitKey(1) & 0xFF == ord('q'): break cap.release() cv2.destroyAllWindows() 4.2 深度学习目标检测(YOLOv5)
对于更复杂的场景或需要检测特定类别(如人、车)的情况,可以使用深度学习模型。这里以YOLOv5为例。
4.2.1 安装YOLOv5
pip install yolov5 4.2.2 使用YOLOv5进行目标检测
import cv2 import torch import yolov5 # 加载YOLOv5模型(自动下载预训练权重) model = yolov5.load('yolov5s.pt') # 'yolov5s.pt'是小型模型,速度快 # 设置设备(CPU或GPU) device = 'cuda' if torch.cuda.is_available() else 'cpu' model.to(device) cap = cv2.VideoCapture(0) while True: ret, frame = cap.read() if not ret: break # YOLOv5需要RGB格式,而OpenCV是BGR frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # 进行推理 results = model(frame_rgb) # 解析结果 predictions = results.pred[0] # 获取预测框 boxes = predictions[:, :4] # x1, y1, x2, y2 scores = predictions[:, 4] # 置信度 classes = predictions[:, 5] # 类别ID # 绘制检测框 for box, score, cls in zip(boxes, scores, classes): x1, y1, x2, y2 = map(int, box) label = f"{model.names[int(cls)]}: {score:.2f}" # 只显示人(class=0)的检测结果 if int(cls) == 0: cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2) cv2.putText(frame, label, (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2) cv2.imshow('YOLOv5 Detection', frame) if cv2.waitKey(1) & 0xFF == ord('q'): break cap.release() cv2.destroyAllWindows() 五、目标跟踪
在检测到目标后,我们需要对其进行持续跟踪,以分析其运动轨迹和行为模式。
5.1 KCF跟踪器
KCF(Kernelized Correlation Filters)是一种速度快且准确的跟踪器。
import cv2 # 初始化跟踪器 tracker = cv2.TrackerKCF_create() cap = cv2.VideoCapture(0) # 读取第一帧并选择跟踪区域 ret, frame = cap.read() if not ret: exit() # 手动选择跟踪框(或使用目标检测自动选择) bbox = cv2.selectROI("Select Object", frame, False) tracker.init(frame, bbox) while True: ret, frame = cap.read() if not ret: break # 更新跟踪器 success, bbox = tracker.update(frame) if success: # 跟踪成功,绘制跟踪框 x, y, w, h = map(int, bbox) cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 255, 0), 2) cv2.putText(frame, "Tracking", (x, y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2) else: # 跟踪失败 cv2.putText(frame, "Tracking Failure", (20, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2) cv2.imshow('KCF Tracking', frame) if cv2.waitKey(1) & 0xFF == ord('q'): break cap.release() cv2.destroyAllWindows() 5.2 CSRT跟踪器
CSRT跟踪器比KCF更准确,但速度稍慢。
import cv2 # 初始化CSRT跟踪器 tracker = cv2.TrackerCSRT_create() cap = cv2.VideoCapture(0) ret, frame = cap.read() if not ret: exit() bbox = cv2.selectROI("Select Object", frame, False) tracker.init(frame, bbox) while True: ret, frame = cap.read() if not ret: break success, bbox = tracker.update(frame) if success: x, y, w, h = map(int, bbox) cv2.rectangle(frame, (x, y), (x+w, y+h), (255, 0, 0), 2) cv2.putText(frame, "CSRT Tracking", (x, y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2) else: cv2.putText(frame, "Tracking Failure", (20, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2) cv2.imshow('CSRT Tracking', frame) if cv2.waitKey(1) & 0xFF == ord('q'): break cap.release() cv2.destroyAllWindows() 六、异常行为分析
这是系统的核心部分,我们将实现几种常见的异常行为检测算法。
6.1 跌倒检测(Fall Detection)
跌倒检测通常基于人体的宽高比(Aspect Ratio)变化。当人站立时,宽高比接近1:3;当人跌倒时,宽高比会显著增大。
6.1.1 基于背景减除的跌倒检测
import cv2 import numpy as np class FallDetector: def __init__(self, min_width=50, min_height=100, fall_threshold=0.5): self.min_width = min_width self.min_height = min_height self.fall_threshold = fall_threshold # 宽高比阈值 self.fall_detected = False self.fall_counter = 0 self.fall_duration = 5 # 持续多少帧才认为是跌倒 def detect(self, frame, fgMask): # 查找轮廓 contours, _ = cv2.findContours(fgMask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) current_fall = False for contour in contours: x, y, w, h = cv2.boundingRect(contour) # 过滤掉太小的轮廓 if w < self.min_width or h < self.min_height: continue # 计算宽高比 aspect_ratio = w / float(h) # 绘制检测框和宽高比 cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 255, 0), 2) cv2.putText(frame, f"AR: {aspect_ratio:.2f}", (x, y-5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2) # 判断是否跌倒(宽高比大于阈值) if aspect_ratio > self.fall_threshold: current_fall = True cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 0, 255), 3) cv2.putText(frame, "FALL DETECTED!", (x, y-25), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2) # 连续检测多帧才确认跌倒 if current_fall: self.fall_counter += 1 if self.fall_counter >= self.fall_duration: self.fall_detected = True else: self.fall_counter = 0 self.fall_detected = False return self.fall_detected # 使用示例 cap = cv2.VideoCapture('fall_test.mp4') # 需要测试视频 backSub = cv2.createBackgroundSubtractorMOG2() detector = FallDetector() while True: ret, frame = cap.read() if not ret: break # 获取前景掩码 fgMask = backSub.apply(frame) # 检测跌倒 is_fall = detector.detect(frame, fgMask) if is_fall: print("警报:检测到跌倒!") cv2.imshow('Fall Detection', frame) cv2.imshow('FG Mask', fgMask) if cv2.waitKey(30) & 0xFF == ord('q'): break cap.release() cv2.destroyAllWindows() 6.1.2 基于人体关键点检测的跌倒检测(使用MediaPipe)
更准确的方法是使用人体关键点检测库(如MediaPipe)来获取人体姿态。
import cv2 import mediapipe as mp # 初始化MediaPipe Pose mp_pose = mp.solutions.pose mp_drawing = mp.solutions.drawing_utils pose = mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) class PoseFallDetector: def __init__(self): self.fall_detected = False self.fall_counter = 0 self.fall_duration = 5 def detect(self, frame, landmarks): if not landmarks: return False # 获取关键点 left_shoulder = landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value] right_shoulder = landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value] left_hip = landmarks[mp_pose.PoseLandmark.LEFT_HIP.value] right_hip = landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value] left_ankle = landmarks[mp_pose.PoseLandmark.LEFT_ANKLE.value] right_ankle = landmarks[mp_pose.PoseLandmark.RIGHT_ANKLE.value] # 计算人体中心点 center_y = (left_shoulder.y + right_shoulder.y + left_hip.y + right_hip.y) / 4 # 计算人体高度 height = abs((left_ankle.y + right_ankle.y) / 2 - center_y) # 计算肩膀宽度 shoulder_width = abs(left_shoulder.x - right_shoulder.x) # 计算宽高比 aspect_ratio = shoulder_width / height if height > 0 else 0 # 判断跌倒:宽高比大于阈值且中心点较低 if aspect_ratio > 2.0 and center_y > 0.5: self.fall_counter += 1 if self.fall_counter >= self.fall_duration: self.fall_detected = True else: self.fall_counter = 0 self.fall_detected = False return self.fall_detected # 使用示例 cap = cv2.VideoCapture(0) detector = PoseFallDetector() while True: ret, frame = cap.read() if not ret: break # 转换为RGB frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) results = pose.process(frame_rgb) if results.pose_landmarks: # 绘制关键点 mp_drawing.draw_landmarks(frame, results.pose_landmarks, mp_pose.POSE_CONNECTIONS) # 检测跌倒 is_fall = detector.detect(frame, results.pose_landmarks.landmark) if is_fall: cv2.putText(frame, "FALL DETECTED!", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 0, 255), 3) print("警报:检测到跌倒!") cv2.imshow('Pose Fall Detection', frame) if cv2.waitKey(1) & 0xFF == ord('q'): break cap.release() cv2.destroyAllWindows() 6.2 徘徊检测(Loitering Detection)
徘徊检测基于目标在特定区域内的停留时间。
6.2.1 基于区域停留时间的徘徊检测
import cv2 import time class LoiteringDetector: def __init__(self, region, stay_time_threshold=10): """ region: (x, y, w, h) 定义检测区域 stay_time_threshold: 停留时间阈值(秒) """ self.region = region self.stay_time_threshold = stay_time_threshold self.tracked_objects = {} # {object_id: {'start_time': ..., 'last_seen': ...}} self.next_id = 0 def is_in_region(self, x, y, w, h): # 检查物体中心是否在区域内 center_x = x + w/2 center_y = y + h/2 rx, ry, rw, rh = self.region return (rx <= center_x <= rx+rw) and (ry <= center_y <= ry+rh) def update(self, detections): current_time = time.time() current_ids = set() # 匹配已跟踪的物体和新的检测 for det in detections: x, y, w, h = det['bbox'] center_x = x + w/2 center_y = y + h/2 # 寻找最近的已跟踪物体 matched_id = None min_dist = float('inf') for obj_id, obj_data in self.tracked_objects.items(): last_x, last_y = obj_data['last_pos'] dist = ((center_x - last_x)**2 + (center_y - last_y)**2)**0.5 if dist < 50 and dist < min_dist: # 距离阈值 min_dist = dist matched_id = obj_id if matched_id is not None: # 更新已存在物体 self.tracked_objects[matched_id]['last_pos'] = (center_x, center_y) self.tracked_objects[matched_id]['last_seen'] = current_time current_ids.add(matched_id) else: # 添加新物体 new_id = self.next_id self.next_id += 1 self.tracked_objects[new_id] = { 'start_time': current_time, 'last_seen': current_time, 'last_pos': (center_x, center_y) } current_ids.add(new_id) # 清理长时间未出现的物体 to_remove = [] for obj_id, obj_data in self.tracked_objects.items(): if current_time - obj_data['last_seen'] > 5: # 5秒未出现则删除 to_remove.append(obj_id) for obj_id in to_remove: del self.tracked_objects[obj_id] # 检查徘徊 loitering_ids = [] for obj_id in current_ids: obj_data = self.tracked_objects[obj_id] if self.is_in_region(*obj_data['last_pos'], 0, 0): # 只检查中心点 stay_time = current_time - obj_data['start_time'] if stay_time > self.stay_time_threshold: loitering_ids.append(obj_id) return loitering_ids # 使用示例 cap = cv2.VideoCapture(0) backSub = cv2.createBackgroundSubtractorMOG2() loitering_detector = LoiteringDetector(region=(100, 100, 400, 400), stay_time_threshold=8) while True: ret, frame = cap.read() if not ret: break # 获取前景 fgMask = backSub.apply(frame) # 查找轮廓 contours, _ = cv2.findContours(fgMask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) detections = [] for contour in contours: x, y, w, h = cv2.boundingRect(contour) if w > 30 and h > 30: # 过滤小物体 detections.append({'bbox': (x, y, w, h)}) cv2.rectangle(frame, (x, y), (x+w, y+h), (255, 0, 0), 2) # 更新徘徊检测器 loitering_ids = loitering_detector.update(detections) # 绘制检测区域 rx, ry, rw, rh = loitering_detector.region cv2.rectangle(frame, (rx, ry), (rx+rw, ry+rh), (0, 255, 255), 2) cv2.putText(frame, "Loitering Zone", (rx, ry-5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 255), 2) # 显示徘徊警报 if loitering_ids: cv2.putText(frame, f"LOITERING: {len(loitering_ids)} objects", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2) print(f"警报:检测到{len(loitering_ids)}个徘徊物体!") cv2.imshow('Loitering Detection', frame) if cv2.waitKey(1) & 0xFF == ord('q'): break cap.release() cv2.destroyAllWindows() 6.3 打架检测(Fight Detection)
打架检测通常基于运动剧烈程度和人体交互。
6.3.1 基于运动向量的打架检测
import cv2 import numpy as np class FightDetector: def __init__(self, motion_threshold=20, area_threshold=1000): self.motion_threshold = motion_threshold self.area_threshold = area_threshold self.prev_frame = None self.fall_detected = False def detect(self, frame): gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) gray = cv2.GaussianBlur(gray, (5, 5), 0) if self.prev_frame is None: self.prev_frame = gray return False # 计算光流 flow = cv2.calcOpticalFlowFarneback(self.prev_frame, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0) # 计算运动幅度 magnitude, angle = cv2.cartToPolar(flow[..., 0], flow[..., 1]) # 运动剧烈区域 motion_mask = magnitude > self.motion_threshold # 查找运动区域轮廓 contours, _ = cv2.findContours(motion_mask.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) fight_detected = False total_motion_area = 0 for contour in contours: area = cv2.contourArea(contour) total_motion_area += area if area > self.area_threshold: fight_detected = True x, y, w, h = cv2.boundingRect(contour) cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 0, 255), 3) cv2.putText(frame, "FIGHT?", (x, y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2) # 更新前一帧 self.prev_frame = gray return fight_detected # 使用示例 cap = cv2.VideoCapture('fight_test.mp4') # 需要测试视频 detector = FightDetector() while True: ret, frame = cap.read() if not ret: break is_fight = detector.detect(frame) if is_fight: print("警报:检测到可疑行为!") cv2.imshow('Fight Detection', frame) if cv2.waitKey(30) & 0xFF == ord('q'): break cap.release() cv2.destroyAllWindows() 6.4 遗留物检测(Left Object Detection)
遗留物检测基于物体在特定区域内的静止时间。
import cv2 import time class LeftObjectDetector: def __init__(self, region, static_time_threshold=10): self.region = region # (x, y, w, h) self.static_time_threshold = static_time_threshold self.tracked_objects = {} # {object_id: {'start_time': ..., 'last_pos': ..., 'static_start': ...}} self.next_id = 0 def is_in_region(self, x, y, w, h): center_x = x + w/2 center_y = y + h/2 rx, ry, rw, rh = self.region return (rx <= center_x <= rx+rw) and (ry <= center_y <= ry+rh) def update(self, detections): current_time = time.time() current_ids = set() for det in detections: x, y, w, h = det['bbox'] center_x = x + w/2 center_y = y + h/2 # 匹配已跟踪物体 matched_id = None min_dist = float('inf') for obj_id, obj_data in self.tracked_objects.items(): last_x, last_y = obj_data['last_pos'] dist = ((center_x - last_x)**2 + (center_y - last_y)**2)**0.5 if dist < 20 and dist < min_dist: min_dist = dist matched_id = obj_id if matched_id is not None: obj_data = self.tracked_objects[matched_id] obj_data['last_pos'] = (center_x, center_y) obj_data['last_seen'] = current_time # 检查是否静止 if dist < 5: # 移动很小 if obj_data['static_start'] is None: obj_data['static_start'] = current_time else: obj_data['static_start'] = None current_ids.add(matched_id) else: # 新物体 new_id = self.next_id self.next_id += 1 self.tracked_objects[new_id] = { 'start_time': current_time, 'last_seen': current_time, 'last_pos': (center_x, center_y), 'static_start': None } current_ids.add(new_id) # 清理 to_remove = [] for obj_id, obj_data in self.tracked_objects.items(): if current_time - obj_data['last_seen'] > 5: to_remove.append(obj_id) for obj_id in to_remove: del self.tracked_objects[obj_id] # 检查遗留物 left_objects = [] for obj_id in current_ids: obj_data = self.tracked_objects[obj_id] if self.is_in_region(*obj_data['last_pos'], 0, 0) and obj_data['static_start']: static_time = current_time - obj_data['static_start'] if static_time > self.static_time_threshold: left_objects.append(obj_id) return left_objects # 使用示例 cap = cv2.VideoCapture(0) backSub = cv2.createBackgroundSubtractorMOG2() left_detector = LeftObjectDetector(region=(200, 200, 300, 300), static_time_threshold=8) while True: ret, frame = cap.read() if not ret: break fgMask = backSub.apply(frame) contours, _ = cv2.findContours(fgMask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) detections = [] for contour in contours: x, y, w, h = cv2.boundingRect(contour) if w > 20 and h > 20: detections.append({'bbox': (x, y, w, h)}) cv2.rectangle(frame, (x, y), (x+w, y+h), (255, 0, 0), 2) left_ids = left_detector.update(detections) # 绘制区域 rx, ry, rw, rh = left_detector.region cv2.rectangle(frame, (rx, ry), (rx+rw, ry+rh), (0, 255, 255), 2) cv2.putText(frame, "Left Object Zone", (rx, ry-5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 255), 2) if left_ids: cv2.putText(frame, "LEFT OBJECT!", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2) print("警报:检测到遗留物!") cv2.imshow('Left Object Detection', frame) if cv2.waitKey(1) & 0xFF == ord('q'): break cap.release() cv2.destroyAllWindows() 七、实时报警系统
当检测到异常行为时,需要触发报警。本节介绍多种报警方式。
7.1 邮件报警
使用SMTP协议发送邮件报警。
import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.mime.image import MIMEImage import cv2 import time import os class EmailAlert: def __init__(self, smtp_server, smtp_port, sender_email, sender_password): self.smtp_server = smtp_server self.smtp_port = smtp_port self.sender_email = sender_email self.sender_password = sender_password self.last_alert_time = 0 self.alert_cooldown = 30 # 冷却时间(秒) def send_alert(self, subject, body, image=None): # 检查冷却时间 current_time = time.time() if current_time - self.last_alert_time < self.alert_cooldown: print(f"冷却中,跳过报警(剩余时间:{self.alert_cooldown - (current_time - self.last_alert_time):.1f}秒)") return False try: # 创建邮件 msg = MIMEMultipart() msg['From'] = self.sender_email msg['To'] = self.sender_email # 发送给自己,也可以设置其他收件人 msg['Subject'] = subject # 添加正文 msg.attach(MIMEText(body, 'plain')) # 添加图片附件 if image is not None: # 保存临时图片 temp_path = f"alert_{int(time.time())}.jpg" cv2.imwrite(temp_path, image) with open(temp_path, 'rb') as f: img_data = f.read() image_part = MIMEImage(img_data, name='alert_image.jpg') msg.attach(image_part) # 删除临时文件 os.remove(temp_path) # 发送邮件 server = smtplib.SMTP(self.smtp_server, self.smtp_port) server.starttls() server.login(self.sender_email, self.sender_password) server.send_message(msg) server.quit() self.last_alert_time = current_time print("邮件报警发送成功!") return True except Exception as e: print(f"邮件发送失败: {e}") return False # 使用示例 # 注意:需要开启邮箱的SMTP服务并获取授权码(不是登录密码) email_alert = EmailAlert( smtp_server='smtp.gmail.com', # Gmail的SMTP服务器 smtp_port=587, sender_email='your_email@gmail.com', sender_password='your_app_password' # 使用应用专用密码 ) # 在检测到异常时调用 # frame = ... # 当前视频帧 # email_alert.send_alert( # subject="【警报】检测到异常行为", # body=f"时间: {time.strftime('%Y-%m-%d %H:%M:%S')}n检测类型: 跌倒n位置: 监控区域A", # image=frame # ) 7.2 短信报警(Twilio)
使用Twilio服务发送短信报警。
from twilio.rest import Client import cv2 import time class SMSAlert: def __init__(self, account_sid, auth_token, from_number, to_number): self.account_sid = account_sid self.auth_token = auth_token self.from_number = from_number self.to_number = to_number self.last_alert_time = 0 self.alert_cooldown = 30 def send_alert(self, message, image=None): current_time = time.time() if current_time - self.last_alert_time < self.alert_cooldown: print(f"冷却中,跳过短信报警") return False try: client = Client(self.account_sid, self.auth_token) # Twilio不支持直接发送图片,但可以发送带URL的短信 # 这里我们只发送文本消息 full_message = f"【监控警报】{message}n时间: {time.strftime('%H:%M:%S')}" message = client.messages.create( body=full_message, from_=self.from_number, to=self.to_number ) self.last_alert_time = current_time print(f"短信报警发送成功!SID: {message.sid}") return True except Exception as e: print(f"短信发送失败: {e}") return False # 使用示例 # 需要先在Twilio官网注册并获取account_sid, auth_token和电话号码 # sms_alert = SMSAlert( # account_sid='your_account_sid', # auth_token='your_auth_token', # from_number='+1234567890', # 你的Twilio号码 # to_number='+0987654321' # 接收号码 # ) # 在检测到异常时调用 # sms_alert.send_alert("检测到跌倒行为,请立即查看!") 7.3 Webhook/API报警
通过HTTP请求调用外部API进行报警。
import requests import json import cv2 import time import base64 class WebhookAlert: def __init__(self, webhook_url, headers=None): self.webhook_url = webhook_url self.headers = headers or {'Content-Type': 'application/json'} self.last_alert_time = 0 self.alert_cooldown = 30 def send_alert(self, event_type, data, image=None): current_time = time.time() if current_time - self.last_alert_time < self.alert_cooldown: print(f"冷却中,跳过Webhook报警") return False # 准备数据 payload = { "timestamp": time.strftime('%Y-%m-%d %H:%M:%S'), "event_type": event_type, "data": data } # 如果有图片,转换为base64 if image is not None: _, buffer = cv2.imencode('.jpg', image) image_base64 = base64.b64encode(buffer).decode('utf-8') payload['image_base64'] = image_base64 try: response = requests.post( self.webhook_url, data=json.dumps(payload), headers=self.headers, timeout=5 ) if response.status_code == 200: self.last_alert_time = current_time print("Webhook报警发送成功!") return True else: print(f"Webhook报警失败,状态码: {response.status_code}") return False except Exception as e: print(f"Webhook报警异常: {e}") return False # 使用示例 # webhook_alert = WebhookAlert( # webhook_url='https://your-webhook-endpoint.com/alert', # headers={'Authorization': 'Bearer your_token', 'Content-Type': 'application/json'} # ) # 在检测到异常时调用 # webhook_alert.send_alert( # event_type="fall_detected", # data={"location": "Zone A", "confidence": 0.95}, # image=frame # ) 7.4 界面弹窗报警
在本地界面显示报警信息。
import cv2 import time class PopupAlert: def __init__(self, window_name="ALERT"): self.window_name = window_name self.last_alert_time = 0 self.alert_cooldown = 5 # 界面弹窗可以设置更短的冷却时间 def show_alert(self, message, image=None): current_time = time.time() if current_time - self.last_alert_time < self.alert_cooldown: return False # 创建报警窗口 alert_frame = np.zeros((300, 500, 3), dtype=np.uint8) alert_frame[:] = (0, 0, 150) # 红色背景 # 添加文本 cv2.putText(alert_frame, "ALERT!", (50, 60), cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 255, 255), 3) cv2.putText(alert_frame, message, (50, 120), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2) cv2.putText(alert_frame, "Press 'q' to close", (50, 200), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (200, 200, 200), 1) # 显示窗口 cv2.imshow(self.window_name, alert_frame) # 如果有图片,同时显示 if image is not None: cv2.imshow("Alert Image", image) # 等待用户响应(非阻塞方式) key = cv2.waitKey(100) # 显示100毫秒 if key & 0xFF == ord('q'): cv2.destroyWindow(self.window_name) if image is not None: cv2.destroyWindow("Alert Image") self.last_alert_time = current_time return True # 使用示例 popup_alert = PopupAlert() # 在检测到异常时调用 # popup_alert.show_alert("检测到跌倒行为!", frame) 八、系统集成与完整示例
本节将前面的各个模块整合成一个完整的异常行为分析系统。
8.1 完整系统代码
import cv2 import numpy as np import time import threading from collections import deque class VideoMonitorSystem: def __init__(self, video_source=0, config=None): self.video_source = video_source self.config = config or {} # 初始化模块 self.cap = None self.backSub = None self.fall_detector = None self.loitering_detector = None self.left_object_detector = None self.alert_system = None # 状态管理 self.running = False self.alert_queue = deque(maxlen=10) # 存储最近的警报 # 初始化配置 self._init_modules() def _init_modules(self): # 背景减除器 self.backSub = cv2.createBackgroundSubtractorMOG2( history=self.config.get('bg_history', 500), varThreshold=self.config.get('bg_threshold', 16) ) # 跌倒检测器 from fall_detector import FallDetector # 假设前面定义的跌倒检测器 self.fall_detector = FallDetector( min_width=self.config.get('fall_min_width', 50), min_height=self.config.get('fall_min_height', 100), fall_threshold=self.config.get('fall_threshold', 0.5) ) # 徘徊检测器 from loitering_detector import LoiteringDetector self.loitering_detector = LoiteringDetector( region=self.config.get('loitering_region', (100, 100, 400, 400)), stay_time_threshold=self.config.get('loitering_time', 10) ) # 遗留物检测器 from left_object_detector import LeftObjectDetector self.left_object_detector = LeftObjectDetector( region=self.config.get('left_object_region', (200, 200, 300, 300)), static_time_threshold=self.config.get('left_object_time', 10) ) # 报警系统(示例使用弹窗和邮件) from alert_system import PopupAlert, EmailAlert self.popup_alert = PopupAlert() # 邮件配置(需要实际配置) # self.email_alert = EmailAlert( # smtp_server='smtp.gmail.com', # smtp_port=587, # sender_email='your_email@gmail.com', # sender_password='your_password' # ) def process_frame(self, frame): """处理单帧图像,检测异常行为""" alerts = [] # 1. 获取前景掩码 fgMask = self.backSub.apply(frame) # 2. 跌倒检测 is_fall = self.fall_detector.detect(frame, fgMask) if is_fall: alerts.append(("FALL", frame.copy())) # 3. 徘徊检测 contours, _ = cv2.findContours(fgMask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) detections = [] for contour in contours: x, y, w, h = cv2.boundingRect(contour) if w > 30 and h > 30: detections.append({'bbox': (x, y, w, h)}) loitering_ids = self.loitering_detector.update(detections) if loitering_ids: alerts.append(("LOITERING", frame.copy())) # 4. 遗留物检测 left_ids = self.left_object_detector.update(detections) if left_ids: alerts.append(("LEFT_OBJECT", frame.copy())) # 5. 绘制检测区域 self._draw_regions(frame) return alerts def _draw_regions(self, frame): """绘制检测区域""" # 徘徊区域 if hasattr(self.loitering_detector, 'region'): rx, ry, rw, rh = self.loitering_detector.region cv2.rectangle(frame, (rx, ry), (rx+rw, ry+rh), (0, 255, 255), 1) cv2.putText(frame, "Loitering", (rx, ry-5), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (0, 255, 255), 1) # 遗留物区域 if hasattr(self.left_object_detector, 'region'): rx, ry, rw, rh = self.left_object_detector.region cv2.rectangle(frame, (rx, ry), (rx+rw, ry+rh), (255, 255, 0), 1) cv2.putText(frame, "Left Object", (rx, ry-5), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 0), 1) def handle_alerts(self, alerts): """处理报警""" for alert_type, frame in alerts: timestamp = time.strftime('%Y-%m-%d %H:%M:%S') message = f"{alert_type} detected at {timestamp}" # 添加到队列 self.alert_queue.append({ 'type': alert_type, 'timestamp': timestamp, 'frame': frame }) # 显示弹窗报警 self.popup_alert.show_alert(message, frame) # 发送邮件报警(需要配置) # if hasattr(self, 'email_alert'): # self.email_alert.send_alert( # subject=f"【监控警报】{alert_type}", # body=message, # image=frame # ) # 打印日志 print(f"[{timestamp}] 警报: {alert_type}") def run(self): """运行主循环""" self.cap = cv2.VideoCapture(self.video_source) if not self.cap.isOpened(): print(f"无法打开视频源: {self.video_source}") return self.running = True print("监控系统启动...") while self.running: ret, frame = self.cap.read() if not ret: print("视频流结束") break # 处理帧 alerts = self.process_frame(frame) # 处理报警 if alerts: self.handle_alerts(alerts) # 显示实时画面 cv2.imshow('Video Monitor', frame) # 显示前景掩码(可选) # fgMask = self.backSub.apply(frame) # cv2.imshow('Foreground', fgMask) # 按'q'退出 if cv2.waitKey(1) & 0xFF == ord('q'): break self.stop() def stop(self): """停止系统""" self.running = False if self.cap: self.cap.release() cv2.destroyAllWindows() print("监控系统已停止") # 使用示例 if __name__ == "__main__": # 配置参数 config = { 'bg_history': 500, 'bg_threshold': 16, 'fall_min_width': 50, 'fall_min_height': 100, 'fall_threshold': 0.5, 'loitering_region': (100, 100, 400, 400), 'loitering_time': 8, 'left_object_region': (200, 200, 300, 300), 'left_object_time': 8 } # 创建系统实例 # video_source可以是0(摄像头)、'video.mp4'(视频文件)或'rtsp://...'(网络流) system = VideoMonitorSystem(video_source=0, config=config) # 启动系统 try: system.run() except KeyboardInterrupt: print("n收到中断信号") system.stop() 8.2 配置文件管理
为了便于管理,可以将配置参数放在单独的JSON文件中。
# config.json { "video_source": 0, "bg_history": 500, "bg_threshold": 16, "fall_min_width": 50, "fall_min_height": 100, "fall_threshold": 0.5, "loitering_region": [100, 100, 400, 400], "loitering_time": 8, "left_object_region": [200, 200, 300, 300], "left_object_time": 8, "alert_cooldown": 30, "email_config": { "smtp_server": "smtp.gmail.com", "smtp_port": 587, "sender_email": "your_email@gmail.com", "sender_password": "your_password" } } # 加载配置 import json def load_config(config_path): with open(config_path, 'r') as f: return json.load(f) # 在主程序中使用 # config = load_config('config.json') # system = VideoMonitorSystem(video_source=config['video_source'], config=config) 九、性能优化与部署
9.1 性能优化技巧
降低分辨率:处理低分辨率帧可以显著提高速度。
# 在读取帧后调整大小 frame = cv2.resize(frame, (640, 480))跳帧处理:不是每一帧都进行完整分析。 “`python frame_counter = 0 skip_frames = 2 # 每2帧处理1次
while True:
ret, frame = cap.read() if not ret: break frame_counter += 1 if frame_counter % skip_frames != 0: continue # 处理帧 process_frame(frame) 3. **多线程处理**:将视频采集和处理分离到不同线程。 ```python import threading from queue import Queue class VideoCaptureThread(threading.Thread): def __init__(self, video_source, queue_size=5): super().__init__() self.cap = cv2.VideoCapture(video_source) self.queue = Queue(maxsize=queue_size) self.daemon = True def run(self): while True: ret, frame = self.cap.read() if not ret: break if not self.queue.full(): self.queue.put(frame) def get_frame(self): return self.queue.get() # 使用 # cap_thread = VideoCaptureThread(0) # cap_thread.start() # while True: # frame = cap_thread.get_frame() # process_frame(frame) 使用GPU加速:如果使用深度学习模型,确保使用GPU版本。
# 检查GPU是否可用 import torch device = 'cuda' if torch.cuda.is_available() else 'cpu' model.to(device)
9.2 部署方案
- 本地部署:直接在监控主机上运行Python脚本。
- Docker容器化: “`dockerfile FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt . RUN pip install -r requirements.txt
COPY . .
CMD [“python”, “monitor.py”]
3. **系统服务化**(Linux): ```bash # 创建systemd服务文件 /etc/systemd/system/video-monitor.service [Unit] Description=Video Monitor Service After=network.target [Service] Type=simple User=pi WorkingDirectory=/home/pi/monitor ExecStart=/usr/bin/python3 /home/pi/monitor/monitor.py Restart=always [Install] WantedBy=multi-user.target # 启用服务 sudo systemctl enable video-monitor sudo systemctl start video-monitor 十、总结与展望
本文详细介绍了基于OpenCV的视频监控异常行为分析与实时报警系统的完整实现方案。从环境搭建、视频采集、预处理,到运动目标检测、目标跟踪,再到具体的异常行为检测算法(跌倒、徘徊、打架、遗留物),最后到多种报警方式的集成和系统优化,涵盖了构建一个实用监控系统的所有关键环节。
10.1 关键要点回顾
- 模块化设计:系统采用模块化架构,便于维护和扩展。
- 多种检测算法:提供了基于传统计算机视觉和深度学习的多种检测方法。
- 实时报警:支持邮件、短信、Webhook和界面弹窗等多种报警方式。
- 性能优化:介绍了跳帧、多线程、GPU加速等优化技巧。
10.2 局限性与改进方向
当前方案主要基于传统计算机视觉方法,虽然实现简单,但在复杂场景下(如光照变化、遮挡、多人交互)的鲁棒性有限。未来可以从以下几个方面改进:
- 引入深度学习:使用YOLOv8、SSD等更先进的检测模型,使用SlowFast、PoseC3D等行为识别模型。
- 多摄像头融合:支持多摄像头协同分析,实现跨摄像头跟踪。
- 边缘计算:将计算任务部署到边缘设备(如Jetson Nano),减少网络传输延迟。
- 行为理解:从简单的异常检测向更复杂的行为理解发展,如识别具体行为类型、意图预测等。
- 隐私保护:在检测过程中加入隐私保护机制,如人脸模糊、数据加密等。
通过不断优化和扩展,基于OpenCV的异常行为分析系统将在智能安防、智慧养老、工业安全等领域发挥越来越重要的作用。希望本文能为您的项目开发提供有力的参考和帮助。
支付宝扫一扫
微信扫一扫