OpenCV вместе с Raspberry Pi могут стать мощным инструментом для работы в различных приложениях, требующих производить обработку видео в реальном времени. В предыдущей статье на нашем сайте мы рассмотрели как транслировать видеопоток из системы видеонаблюдения в Raspberry Pi с помощью OpenCV и протокола RTSP – рекомендуем прочитать эту статью прежде чем переходить к данной статье. В данном проекте мы рассмотрим применение библиотеки OpenCV для обнаружения движения (Motion Detection) в видеопотоке, транслируемом в плату Raspberry Pi из системы видеонаблюдения (CCTV). Если у вас нет под рукой системы видеонаблюдения, на которой вы могли бы протестировать данный проект, то вместо нее можно использовать проект камеры видеонаблюдения на основе платы Raspberry Pi.
Мы напишем программу на python, которая будет одновременно мониторить 4 камеры системы видеонаблюдения (CCTV cameras) и обнаруживать на них движения. Если на изображении с какой либо из этих камер будет обнаруживаться движение, то Raspberry Pi будет автоматически переключаться на экран этой камеры и подсвечивать (выделять) место на изображении, на котором было зафиксировано движение. Это будет происходить в режиме реального времени, с задержкой 1,5 секунды. Также мы добавили звуковой сигнал тревоги, который будет подаваться с помощью зуммера при обнаружении движения. Но вы можете усовершенствовать данный проект, например, при обнаружении движения будет передаваться SMS на ваш телефон или отправляться E-mail на вашу электронную почту (похожий проект описан в этой статье).
Установка OpenCV на Raspberry Pi
Автор данного проекта (ссылка на оригинал приведена в конце статьи) использовал плату Raspberry Pi 3 B+ с установленной на нее Buster OS, но можно использовать и другие типы плат Raspberry Pi с другими операционными системами. Также автор проекта использовал OpenCV версии 4.1 – можно использовать и другие версии данной библиотеки, но не ниже 4.1.
Вы можете скомпилировать OpenCV на плату Raspberry Pi с помощью CMake (это займет несколько часов, но это более надежно для "тяжелых" проектов) или вы можете непосредственно установить ее с помощью установщика pip, используя следующие команды:
1 |
$ pip install opencv-contrib-python==4.1.0.25 |
Если вы устанавливаете OpenCV с помощью pip, то вам необходимо будет установить еще ряд дополнений с помощью следующих команд:
1 2 3 4 5 |
$ sudo apt-get install libavcodec-dev libavformat-dev libswscale-dev libv4l-dev $ sudo apt-get install libxvidcore-dev libx264-dev $sudo apt-get install libatlas-base-dev gfortran $ sudo apt-get install libhdf5-dev libhdf5-serial-dev libhdf5-103 $ sudo apt-get install libqtgui4 libqtwebkit4 libqt4-test python3-pyqt5 |
Также вы можете посмотреть все проекты на нашем сайте, в которых использовалась OpenCV.
Установка зуммера на 5-дюймовый дисплей
После подключения 5-дюймового сенсорного дисплея к плате Raspberry Pi, мы можем непосредственно подключить зуммер к задней стороне дисплея, к его неиспользуемым для подключения к плате Raspberry Pi контактам. Мы закрепили зуммер на тыльной стороне 5-дюймового дисплея следующим образом:
В таблице ниже представлено назначение контактов (распиновка) 5-дюймового дисплея. Как вы можете видеть из этой таблицы, большинство контактов используется самим же дисплеем для реализации в нем функций сенсорного экрана (реагирующего на прикосновения), но, несмотря на это, свободными остаются контакты 3, 5, 7, 8, 10, 11, 12, 13, 15, 16 и 24, которые мы можем использовать для подключения зуммера (buzzer). В нашем случае мы подключили зуммер к контакту GPIO 3.
Объяснение программы для Raspberry Pi для обнаружения движения на видео
Полный код программы приведен в конце статьи, здесь же мы кратко рассмотрим его основные фрагменты.
Мониторинг нескольких камер на Raspberry Pi с помощью протокола RTSP
Основной задачей при написании этого фрагмента программы является уменьшение нагрузки на Raspberry Pi чтобы предотвратить задержки при трансляции видеопотока (streaming). Изначально автор проекта пытался переключаться между всеми 4 камерами и отслеживать на них движение, но этот процесс происходил с большими задержками (около 10 секунд). Поэтому он объединил видеопотоки со всех 4 камер в одно изображение и производил обнаружение движения на этом объединенном изображении. Для этого он запрограммировал две функции, с названиями create_camera и read_camera.
Функция create_camera используется для открытия камеры с соответствующим номером канала. Заметьте, что в этой функции RTSP URL заканчивается на “02” – это означает что мы использовали трансляцию видео с под-потоков (sub-stream). Видео в этих под-потоках имеет меньшее разрешение чем в основном канале и поэтому быстрее обрабатывается. Также существенное влияние на скорость обработки видео оказывает тип используемого видеокодека. Автор проекта экспериментировал с различными видеокодеками и обнаружил, что наилучшие показатели производительности обеспечивает кодек FFMPEG.
1 2 3 4 5 6 7 |
def create_camera (channel): rtsp = "rtsp://" + rtsp_username + ":" + rtsp_password + "@" + rtsp_IP + ":554/Streaming/channels/" + channel + "02" #change the IP to suit yours cap = cv2.VideoCapture(rtsp, cv2.CAP_FFMPEG) cap.set(3, cam_width) # ID number for width is 3 cap.set(4, cam_height) # ID number for height is 480 cap.set(10, 100) # ID number for brightness is 10 return cap |
В функции read_camera мы будем считывать видео со всех 4 камер с именами cam1, cam2, cam3 и cam4 и затем объединять их в единое изображение с названием Main_screen. Как только это изображение будет готово, мы можем "запускать" на него OpenCV.
1 2 3 4 5 6 7 8 9 10 |
def read_camera (): success, current_screen = cam1.read() Main_screen [:cam_height, :cam_width, :3] = current_screen success, current_screen = cam2.read() Main_screen[cam_height:cam_height*2, :cam_width, :3] = current_screen success, current_screen = cam3.read() Main_screen[:cam_height, cam_width:cam_width*2, :3] = current_screen success, current_screen = cam4.read() Main_screen[cam_height:cam_height*2, cam_width:cam_width*2, :3] = current_screen return (Main_screen) |
Внешний вид получившегося у нас подобного объединенного изображения показан на следующем рисунке.
Обнаружение движения с помощью OpenCV и Raspberry Pi
Когда наше объединенное изображение будет готово, мы можем начинать на нем обнаружение движения. Внутри цикла while мы начнем считывание двух различных кадров с именами frame1 и frame2, после чего будем конвертировать их в черно-белое изображение с оттенками серого (grayscale).
1 2 3 4 |
frame1 = read_camera() #Read the first frame grayImage_F1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY) # Convert to gray frame2 = read_camera() #Read the 2nd frame grayImage_F2 = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY) |
Затем мы будем анализировать различия (с определенной границей (threshold) между этими двумя изображениями чтобы определить что изменилось. Затем мы будем помечать "пятнами" все места, в которых зафиксированы изменения и растягивать изображение чтобы предотвратить появление острых краев.
1 2 3 4 |
diffImage = cv2.absdiff(grayImage_F1,grayImage_F2) #get the differance --this is cool blurImage = cv2.GaussianBlur(diffImage, (5,5), 0) _, thresholdImage = cv2.threshold(blurImage, 20,255,cv2.THRESH_BINARY) dilatedImage = cv2.dilate(thresholdImage,kernal,iterations=5) |
Далее мы находим контуры всех этих областей, в которых мы обнаружили движение. Зная контур области мы можем определить насколько велика область, в которой зафиксировалось движение. Если величина (площадь) этой области больше заранее определенного значения (motion_threshold), то мы полагаем что в этой области произошло движение и выделяем цветным прямоугольником эту область для пользователя.
1 2 3 4 5 6 |
contours, _ = cv2.findContours (dilatedImage, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) #find contour is a magic function for contour in contours: #for every change that is detected (x,y,w,h) = cv2.boundingRect(contour) #get the location where change was found if cv2.contourArea(contour) > motion_threshold: cv2.rectangle(frame1, (x, y), (x + w, y + h), (255, 255, 0), 1) display_screen = find_screen() |
Функция find_screen() используется для определения того, на какой из 4-х камер зафиксировано движение. Мы можем определить это поскольку знаем координаты x и y, на которых зафиксировано движение. Мы сравниваем эти значения x и y с местоположением экрана каждой из камер (на объединенном изображении) и, таким образом, определяем на каком экране зафиксировано движение. Затем мы вырезаем с объединенного изображения картинку этого экрана и выводим ее на весь экран Raspberry Pi.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
def find_screen(): if (x < cam_width): if (y < cam_height): screen = frame1[0:cam_height, 0:cam_width] print("Activity in cam screen 1") else: screen = frame1[cam_height:cam_height*2, :cam_width] print("Activity in cam screen 2") else: if (y < cam_height): screen = frame1[:cam_height, cam_width:cam_width*2] print("Activity in cam screen 3") else: screen = frame1[cam_height:cam_height*2, cam_width:cam_width*2] print("Activity in cam screen 4") return (screen) |
Установка срабатывания сигнала тревоги при обнаружении движения
Поскольку теперь мы знаем на каком экране обнаружено движение, мы можем добавить любой тип сигнала тревоги (alarm) какой нам необходим. В этом проекте мы для этой цели использовали зуммер, подключенный к контакту GPIO 3. Мы проверяем с помощью условия if обнаружено ли движение на 3-м экране – если это так, то мы инкрементируем значение переменной trig_alarm. Вы можете изменить срабатывание этого условия на любой экран или даже на несколько экранов.
1 2 3 4 |
if ((x>cam_width) and (y<cam_height)): #screen 3 trig_alarm+=1 else: trig_alarm =0 |
Если значение переменной trig_alarm станет больше или равно 3, то зуммер сработает один раз (кратковременно). Введение этого условия необходимо для предотвращения ложных срабатываний системы. Если его не вводить, то, к примеру, ложные срабатывания системы могут происходить из-за теней пролетающих птиц. То есть мы подаем сигнал тревоги только тогда, когда движение будет зафиксировано не менее чем в 3-х кадрах.
1 2 3 4 5 6 |
if (trig_alarm>=3):#wait for conts 3 motions #Beep the Buzzer GPIO.output(BUZZER,1) time.sleep(0.02) GPIO.output (BUZZER,0) trig_alarm =0 |
Мониторинг температуры системы
Поскольку предполагается что наша система обнаружения движения в видеопотоке с камер должна работать в режиме 24x7, то вследствие этого плата Raspberry Pi может достаточно сильно нагреваться. Вследствие этого мы решили выводить на экран дисплея температуру платы и интенсивность использования процессора (CPU) платы. Мы будем получать эту информацию с помощью библиотеки gpiozero.
1 2 3 4 5 6 7 8 |
cpu = CPUTemperature() load = LoadAverage() cpu_temperature = str((cpu.temperature)//1) load_average = str(load.load_average) #print (cpu.temperature) #print(load.load_average) cv2.putText(display_screen, cpu_temperature, (250,250), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (0,0,255), 1) cv2.putText(display_screen, load_average, (300,250), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (0,255,0), 2) |
Тестирование работы проекта
Автор проекта тестировал его работу на протяжении нескольких дней и нашел его работу весьма удовлетворительной. Во время его тестирования он повредил одну из камер, но все равно система продолжила достаточно устойчивую работу. Все эти процессы можно более подробно посмотреть на видео, приведенном в конце статьи.
Если вы планируете эксплуатировать подобный проект достаточно долгое время, то его автор строго рекомендует использовать правильный корпус для платы и вентилятор охлаждения для ее процессора. Также, если вы собираетесь эксплуатировать данный проект в месте, где случаются перебои с электричеством, то его целесообразно дополнить источником бесперебойного питания (UPS) и автозапуском OpenCV при перезагрузке платы.
Исходный код программы на Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
#!/usr/bin/env python3 import cv2 import numpy as np import time import RPi.GPIO as GPIO from gpiozero import CPUTemperature, LoadAverage #Enter credentials for CCTV rtsp_username = "admin" rtsp_password = "aswinth347653" rtsp_IP = "192.168.29.100" cam_width = 352 #set to resolution of incoming video from DVR (установка разрешения видео – ширина) cam_height = 288 #set to resolution of incoming video from DVR (установка разрешения видео – высота) motion_threshold = 1000 #decrease this value to increase sensitivity (уменьшите это значение чтобы увеличить чувствительность) cam_no = 1 trig_alarm =0 GPIO.setmode(GPIO.BCM) GPIO.setwarnings (False) BUZZER = 3 GPIO.setup(BUZZER,GPIO.OUT) def create_camera (channel): rtsp = "rtsp://" + rtsp_username + ":" + rtsp_password + "@" + rtsp_IP + ":554/Streaming/channels/" + channel + "02" #change the IP to suit yours (измените IP на свой) cap = cv2.VideoCapture(rtsp, cv2.CAP_FFMPEG) cap.set(3, cam_width) # ID number for width is 3 (установка ширины) cap.set(4, cam_height) # ID number for height is 4 (установка высоты) cap.set(10, 100) # ID number for brightness is 10 (установка яркости) return cap def read_camera (): success, current_screen = cam1.read() Main_screen [:cam_height, :cam_width, :3] = current_screen success, current_screen = cam2.read() Main_screen[cam_height:cam_height*2, :cam_width, :3] = current_screen success, current_screen = cam3.read() Main_screen[:cam_height, cam_width:cam_width*2, :3] = current_screen success, current_screen = cam4.read() Main_screen[cam_height:cam_height*2, cam_width:cam_width*2, :3] = current_screen return (Main_screen) def find_screen(): if (x < cam_width): if (y < cam_height): screen = frame1[0:cam_height, 0:cam_width] print("Activity in cam screen 1") else: screen = frame1[cam_height:cam_height*2, :cam_width] print("Activity in cam screen 2") else: if (y < cam_height): screen = frame1[:cam_height, cam_width:cam_width*2] print("Activity in cam screen 3") else: screen = frame1[cam_height:cam_height*2, cam_width:cam_width*2] print("Activity in cam screen 4") return (screen) #Open all four camera Framers (открываем все 4 камеры) cam1 = create_camera(str(1)) cam2 = create_camera(str(2)) cam3 = create_camera(str(3)) cam4 = create_camera(str(4)) print ("Reading camera successfull") Main_screen = np.zeros(( (cam_height*2), (cam_width*2), 3) , np.uint8) # создаем экран, на котором будут объединены изображения со всех 4-х камер display_screen = np.zeros(( (cam_height*2), (cam_width*2), 3) , np.uint8) # изменяем размеры экрана чтобы отображать его на 5-дюймовом TFT дисплее kernal = np.ones((5,5),np.uint8) #form a 5x5 matrix with all ones range is 8-bit while True: frame1 = read_camera() # считываем первый кадр grayImage_F1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY) # конвертируем его в серое изображение frame2 = read_camera() # считываем второй кадр grayImage_F2 = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY) diffImage = cv2.absdiff(grayImage_F1,grayImage_F2) #определяем разницу между двумя кадрами blurImage = cv2.GaussianBlur(diffImage, (5,5), 0) _, thresholdImage = cv2.threshold(blurImage, 20,255,cv2.THRESH_BINARY) dilatedImage = cv2.dilate(thresholdImage,kernal,iterations=5) contours, _ = cv2.findContours (dilatedImage, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) #find contour is a magic function for contour in contours: #for every change that is detected (x,y,w,h) = cv2.boundingRect(contour) # находим местоположение где зафиксировано изменение if cv2.contourArea(contour) > motion_threshold: cv2.rectangle(frame1, (x, y), (x + w, y + h), (255, 0, 0), 1) display_screen = find_screen() if ((x>cam_width) and (y<cam_height)): #screen 3 trig_alarm+=1 else: trig_alarm =0 if (trig_alarm>=3):#wait for conts 3 motions (если движение фиксируется в 3-х или более кадрах) #Beep the Buzzer (включаем зуммер) GPIO.output(BUZZER,1) time.sleep(0.02) GPIO.output (BUZZER,0) trig_alarm =0 cpu = CPUTemperature() load = LoadAverage() cpu_temperature = str((cpu.temperature)//1) load_average = str(load.load_average) #print (cpu.temperature) #print(load.load_average) cv2.putText(display_screen, cpu_temperature, (250,250), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (0,0,255), 1) cv2.putText(display_screen, load_average, (300,250), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (0,255,0), 2) print(trig_alarm) dim = (800, 480) Full_frame = cv2.resize (display_screen,dim,interpolation=cv2.INTER_AREA) cv2.namedWindow("AISHA", cv2.WINDOW_NORMAL) cv2.setWindowProperty('AISHA', cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN) cv2.imshow("AISHA",Full_frame) if cv2.waitKey(1) & 0xFF == ord('p'): cam1.release() cam2.release() cam3.release() cam4.release() cv2.destroyAllWindows() break |