Текущая пандемия коронавируса COVID-19 значительно осложнила жизнь людей по всему миру. Одним из обязательных условий для борьбы с распространением коронавируса стало обязательное ношение масок во многих общественных местах. Но ставить в каждом таком месте человека для соблюдения масочного режима не всегда оправдано с точки зрения значительных экономических трат на людей, которые осуществляют проверку масочного режима. Поэтому в данной статье мы рассмотрим автоматическое обнаружение масок на лицах людей с помощью платы Raspberry Pi и библиотеки OpenCV.
Наш проект будет состоять из 3-х основных частей:
- сбор данных;
- обучение (тренировка) модели;
- обнаружение масок на лицах людей.
На первом этапе мы соберем набор изображений, на которых изображены лица людей с масками и без них. Этот набор будет состоять из 500 изображений, относящихся к двум классам. На втором этапе мы будем тренировать (обучать) наш "распознаватель" (Recognizer) на обнаружение людей с масками и без них. А на заключительном этапе работы проекта мы будем использовать эти тренировочные данные чтобы определить находится ли на лице человека маска или нет.
Также на нашем сайте вы можете посмотреть следующие проекты, в которых использовалась обработка изображений с помощью платы Raspberry Pi и библиотеки OpenCV:
- определение социальной дистанции с помощью Raspberry Pi и OpenCV;
- обнаружение движения на видео с помощью Raspberry Pi и OpenCV;
- сканер QR кодов на основе Raspberry Pi и OpenCV;
- распознавание лиц с помощью Raspberry Pi и библиотеки OpenCV.
Необходимые компоненты
- Плата Raspberry Pi (купить на AliExpress).
- Камера для Raspberry Pi (купить на AliExpress).
Установка OpenCV и других библиотек для работы проекта
Перед установкой всех необходимых нам пакетов обновите программное обеспечение платы Raspberry Pi до последней версии с помощью команды:
1 |
sudo apt-get update |
Далее установим пакеты, которые нам потребуются для дальнейшей установки OpenCV:
1 2 3 4 5 6 |
sudo apt-get install libhdf5-dev -y sudo apt-get install libhdf5-serial-dev –y sudo apt-get install libatlas-base-dev –y sudo apt-get install libjasper-dev -y sudo apt-get install libqtgui4 –y sudo apt-get install libqt4-test –y |
После этого установим OpenCV на Raspberry Pi с помощью установщика pip.
1 |
pip3 install opencv-contrib-python==4.1.0.25 |
Далее установим пакет imutils, который позволяет выполнять базовые функции обработки изображений: перевод, ротация, изменение размера, скелетирование и другие. Для его установки используйте следующую команду:
1 |
pip3 install imutils |
Затем установим Tensorflow с помощью команды:
1 |
sudo pip3 install https://github.com/lhelontra/tensorflow-on-arm/releases/download/v2.1.0/tensorflow-2.1.0-cp37-none-linux_armv7l.whl |
И, наконец, установим пакет sklearn.
1 |
pip3 install sklearn |
Объяснение программы для Raspberry Pi для обнаружения масок на лицах людей
Полный код программы приведен в конце статьи, здесь же мы кратко рассмотрим его основные фрагменты.
Как было указано в начале статьи, наш проект будет состоять из 3-х этапов: сбор данных, обучение (тренировка) модели и обнаружение масок на лицах людей. Каталог проекта будет включать:
- набор данных (Dataset): в этом каталоге будут находиться изображения лиц людей с масками и без них;
- gather_images.py – простой скрипт на python для сбора изображений с лицами;
- training.py – считывает набор данных, производит точную настройку MobileNetV2 чтобы создать нашу модель обнаружения лиц (MaskDetector.h5);
- detect_mask.py – код программы на python, который использует тренировочные (обучающие) данные чтобы классифицировать каждое лицо – с маской оно или без нее.
Полный каталог со всем необходимым для данного проекта можно скачать по следующей ссылке с Github.
1. Сбор данных
На первом этапе работы нашего проекта мы создадим набор данных, содержащий изображения лиц людей с масками и без них. Скрипт gather_image.py будет использовать библиотеку OpenCV для сбора изображений лиц.
В первых двух строках этого скрипта мы определим системные аргументы. В первой строке определяется label name (имя метки), а во второй – количество изображений, которые мы хотим собрать. К примеру, если вы хотите собрать 250 изображений с масками, то используйте команду: python3 gather_images.py with_mask 250. А для сбора изображений без масок используйте команду: python3 gather_images.py without_mask 250. В этих командах with_mask/without_mask – это метка изображения, а 250 – количество изображений.
1 2 |
label_name = sys.argv[1] num_samples = int(sys.argv[2]) |
Далее укажем пути к каталогу с изображениями и классами изображений: то есть лица с масками и без них.
1 2 |
IMG_SAVE_PATH = 'image_data' IMG_CLASS_PATH = os.path.join(IMG_SAVE_PATH, label_name) |
Следующие строки создают каталог с изображениями и каталог с классами изображений с именами, соответствующими их меткам.
1 2 3 4 5 6 |
try: os.mkdir(IMG_SAVE_PATH) except FileExistsError: pass try: os.mkdir(IMG_CLASS_PATH) |
Затем мы начнем считывать кадры (frames) с видеопотока и если номер кадра будет равен числу отсчетов, мы будем останавливать (завершать) цикл.
1 2 3 4 5 |
ret, frame = cap.read() if not ret: continue if count == num_samples: break |
Далее мы будем сохранять все изображения по определенному пути. Все изображения будут нумероваться соответственно их порядковому номеру.
1 2 3 |
save_path = os.path.join(IMG_CLASS_PATH, '{}.jpg'.format(count + 1)) cv2.imwrite(save_path, frame) count += 1 |
2. Обучение (тренировка) модели
Теперь, когда у нас есть образцы лиц, мы можем передать ив нейронную сеть и начать процесс ее обучения чтобы в дальнейшем можно было производить автоматическое распознавание того, есть ли маска на лице человека или нет. Для этого откройте файл training.py в каталоге с обнаружителем масок (mask detector directory) и вставьте в него приведенный ниже фрагмент кода. Затем начните процесс обучения нейронной сети с помощью команды:
1 |
python3 training.py |
Вначале в скрипте training.py мы импортируем все необходимые библиотеки. Далее инициализируем необходимые нам переменные: начальный уровень обучения (initial learning rate), число этапов/эпох (epochs) для обучения и размер партии/пачки (batch size).
1 2 3 |
INIT_LR = 1e-4 EPOCHS = 20 BS = 32 |
В следующих строках мы вводим путь к каталогу, в котором хранятся наши изображения, и инициализируем списки данных и меток.
1 2 3 |
imagePaths = "dataset" data = [] labels = [] |
После этого в цикле мы считываем все изображения из каталога по указанному пути, после этого мы можем начать обучение модели. Шаги предварительной обработки (Pre-processing steps) изменение размера изображений до разрешения 224×224 пикселов, конвертирования их в формат массива и использования функции preprocess_input для масштабирования интенсивностей пикселов на изображении в диапазон [-1, 1].
1 2 3 4 5 6 7 8 9 10 11 12 |
for directory in os.listdir(imagePaths): label = os.path.join(imagePaths, directory) if not os.path.isdir(label): continue for item in os.listdir(label): if item.startswith("."): continue image = load_img(os.path.join(label, item), target_size=(224, 224)) image = img_to_array(image) image = preprocess_input(image) data.append(image) labels.append(label) |
В следующих строках мы будем выполнять прямое кодирование меток, которое используется для представления изменяющихся переменных в виде двоичных векторов.
1 2 3 |
lb = LabelBinarizer() labels = lb.fit_transform(labels) labels = to_categorical(labels) |
Затем мы будем разделять данные на тренировочный и тестовый наборы. 80% данных будут использоваться для тренировки (обучения), а оставшиеся 20% – для тестирования.
1 2 |
(trainX, testX, trainY, testY) = train_test_split(data, labels, test_size=0.20, stratify=labels, random_state=42) |
Далее мы загрузим модель MobileNet с заранее натренированными весами ImageNet, отбрасывая головную часть сети.
1 2 |
baseModel = MobileNetV2(weights="imagenet", include_top=False, input_tensor=Input(shape=(224, 224, 3))) |
В следующих строках мы сконструируем головную часть модели, которая будет помещена на верх базовой модели. Функция AveragePooling2D будет рассчитывать среднее выходное значение характеристической карты (изображения) на предыдущем слое. Чтобы предотвратить переполнение мы используем 50% процент отсева.
1 2 3 4 5 6 |
headModel = baseModel.output headModel = AveragePooling2D(pool_size=(7, 7))(headModel) headModel = Flatten(name="flatten")(headModel) headModel = Dense(128, activation="relu")(headModel) headModel = Dropout(0.5)(headModel) headModel = Dense(2, activation="softmax")(headModel) |
После конструирования головной части модели мы будем компилировать модель с помощью оптимизатора Adam.
1 2 3 |
opt = Adam(lr=INIT_LR, decay=INIT_LR / EPOCHS) model.compile(loss="binary_crossentropy", optimizer=opt, metrics=["accuracy"]) |
Затем мы начнем тренировку (обучение) модели и после того как оно будет закончено, мы сохраним модель под именем “MaskDetector.h5”.
1 2 3 4 |
print(classification_report(testY.argmax(axis=1), predIdxs, target_names=lb.classes_)) print("[INFO] saving mask detector model...") model.save("MaskDetector.h5") |
3. Обнаружение масок на лицах людей
Теперь, на финальном шаге нашего проекта, мы будем использовать тренировочные данные для классификации каждого лица – с маской оно или без. Лица мы будем считывать из видео потока в реальном времени. Для этого откройте файл detect_mask.py в каталоге с обнаружителем масок (mask detector directory) и вставьте в него приведенный ниже фрагмент кода.
Также, как и в скрипте training.py, эту программу мы начнем с импорта необходимых библиотек. В следующих строках мы определим размеры кадра и сконструируем из него blob (кляксу, шар).
1 2 3 |
(h, w) = frame.shape[:2] blob = cv2.dnn.blobFromImage(frame, 1.0, (300, 300), (104.0, 177.0, 123.0)) |
Затем мы пропустим эту кляксу через нейронную сеть и произведем обнаружение лица с помощью модели faceNet.
1 2 |
faceNet.setInput(blob) detections = faceNet.forward() |
Затем мы сделаем цикл по всем обнаруженным лицам и извлечем уровень доверия, ассоциированный с каждым обнаружением.
1 2 |
for i in range(0, detections.shape[2]): confidence = detections[0, 0, i, 2] |
После этого отфильтруем обнаружения, чей уровень доверия меньше 0.5. Далее вычислим координаты (x, y) прямоугольника, окружающего обнаруженные лица.
1 2 3 4 5 |
if confidence > 0.5: box = detections[0, 0, i, 3:7] * np.array([w, h, w, h]) (startX, startY, endX, endY) = box.astype("int") (startX, startY) = (max(0, startX), max(0, startY)) (endX, endY) = (min(w - 1, endX), min(h - 1, endY)) |
Затем рассчитаем координаты для ограничивающего контура лица, конвертируем изображение из BGR в RGB, изменим его разрешение до 224x224 пикселов и произведем его предварительную обработку.
1 2 3 4 5 6 7 8 9 10 |
face = frame[startY:endY, startX:endX] face = cv2.cvtColor(face, cv2.COLOR_BGR2RGB) face = cv2.resize(face, (224, 224)) face = img_to_array(face) face = preprocess_input(face) faces.append(face) locs.append((startX, startY, endX, endY)) In next line start the video stream vs = VideoStream(src=0).start() time.sleep(2.0) |
Далее организуем цикл по всем кадрам видео потока и произведем изменение их разрешения (resize). Затем обнаружим лица в каждом кадре и определим есть ли на них маски или нет.
1 2 3 |
frame = vs.read() frame = imutils.resize(frame, width=500) (locs, preds) = detect_and_predict_mask(frame, faceNet, maskNet) |
Далее организуем цикл по всем прогнозам (которые мы сформировали в результате определения масок на лицах), сравним эти прогнозы с метками и отобразим метку и ограничивающий контур вокруг каждого обнаруженного лица в выходном кадре.
1 2 3 4 5 6 7 8 9 10 11 |
for (box, pred) in zip(locs, preds): (startX, startY, endX, endY) = box (mask, withoutMask) = pred if mask > withoutMask: label = "Mask Detected." color = (0, 255, 0) else: label = "No Mask Detected" color = (0, 0, 255) cv2.putText(frame, label, (startX-50, startY - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2) cv2.rectangle(frame, (startX, startY), (endX, endY), color, 2) |
Тестирование работы обнаружителя масок
Чтобы проверить наш проект обнаружителя масок на лицах людей подключите модуль камеры к плате Raspberry Pi.
Запустите скрипт detect_mask.py. Спустя несколько секунд вы увидите изображение с камеры во всплывающем окне. Если маска на лице будет обнаружена, то вокруг лица человека будет нарисован прямоугольник зеленого цвета и высветится надпись ‘Mask Detected’ (маска обнаружена). Если же маски на обнаруженном лице нет, то вокруг него будет нарисован прямоугольник красного цвета и высветится надпись ‘No Mask Detected’ (маски не обнаружено).
Более подробно работу проекта вы можете посмотреть на видео, приведенном в конце статьи.
Исходный код программы на Python
Обнаружение масок
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input from tensorflow.keras.preprocessing.image import img_to_array from tensorflow.keras.models import load_model from imutils.video import VideoStream import numpy as np import imutils import time import cv2 import os def detect_and_predict_mask(frame, faceNet, maskNet): # grab the dimensions of the frame and then construct a blob # from it (h, w) = frame.shape[:2] blob = cv2.dnn.blobFromImage(frame, 1.0, (300, 300), (104.0, 177.0, 123.0)) # pass the blob through the network and obtain the face detections faceNet.setInput(blob) detections = faceNet.forward() # initialize our list of faces, their corresponding locations, # and the list of predictions from our face mask network faces = [] locs = [] preds = [] # loop over the detections for i in range(0, detections.shape[2]): # extract the confidence (i.e., probability) associated with # the detection confidence = detections[0, 0, i, 2] # filter out weak detections by ensuring the confidence is # greater than the minimum confidence if confidence > 0.5: # compute the (x, y)-coordinates of the bounding box for # the object box = detections[0, 0, i, 3:7] * np.array([w, h, w, h]) (startX, startY, endX, endY) = box.astype("int") # ensure the bounding boxes fall within the dimensions of # the frame (startX, startY) = (max(0, startX), max(0, startY)) (endX, endY) = (min(w - 1, endX), min(h - 1, endY)) # extract the face ROI, convert it from BGR to RGB channel # ordering, resize it to 224x224, and preprocess it face = frame[startY:endY, startX:endX] face = cv2.cvtColor(face, cv2.COLOR_BGR2RGB) face = cv2.resize(face, (224, 224)) face = img_to_array(face) face = preprocess_input(face) # add the face and bounding boxes to their respective # lists faces.append(face) locs.append((startX, startY, endX, endY)) # only make a predictions if at least one face was detected if len(faces) > 0: # for faster inference we'll make batch predictions on *all* # faces at the same time rather than one-by-one predictions # in the above `for` loop faces = np.array(faces, dtype="float32") preds = maskNet.predict(faces, batch_size=32) # return a 2-tuple of the face locations and their corresponding # locations return (locs, preds) prototxtPath = "face_detector/deploy.prototxt" weightsPath = "face_detector/res10_300x300_ssd_iter_140000.caffemodel" faceNet = cv2.dnn.readNet(prototxtPath, weightsPath) # load the face mask detector model from disk print("[INFO] loading face mask detector model...") maskNet = load_model("MaskDetector.h5") # initialize the video stream and allow the camera sensor to warm up print("[INFO] starting video stream...") vs = VideoStream(src=0).start() time.sleep(2.0) # loop over the frames from the video stream while True: # grab the frame from the threaded video stream and resize it # to have a maximum width of 400 pixels frame = vs.read() frame = imutils.resize(frame, width=500) # detect faces in the frame and determine if they are wearing a # face mask or not (locs, preds) = detect_and_predict_mask(frame, faceNet, maskNet) # loop over the detected face locations and their corresponding # locations for (box, pred) in zip(locs, preds): # unpack the bounding box and predictions (startX, startY, endX, endY) = box (mask, withoutMask) = pred # determine the class label and color we'll use to draw # the bounding box and text if mask > withoutMask: label = "Mask Detected." color = (0, 255, 0) else: label = "No Mask Detected" color = (0, 0, 255) # display the label and bounding box rectangle on the output # frame cv2.putText(frame, label, (startX-50, startY - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2) cv2.rectangle(frame, (startX, startY), (endX, endY), color, 2) # show the output frame cv2.imshow("Mask Detector", frame) key = cv2.waitKey(1) & 0xFF # if the `q` key was pressed, break from the loop if key == ord("q"): break # do a bit of cleanup cv2.destroyAllWindows() vs.stop() |
Сбор изображений
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
import cv2 import os import sys import time try: #used to enter label name i.e. with mask/without mask label_name = sys.argv[1] #used to enter the number of image you want to collect num_samples = int(sys.argv[2]) except: print("Arguments missing.") exit(-1) # path to directory where the image will be saved IMG_SAVE_PATH = 'dataset' IMG_CLASS_PATH = os.path.join(IMG_SAVE_PATH, label_name) try: os.mkdir(IMG_SAVE_PATH) except FileExistsError: pass try: os.mkdir(IMG_CLASS_PATH) except FileExistsError: print("{} directory already exists.".format(IMG_CLASS_PATH)) print("All images gathered will be saved along with existing items in this folder") cap = cv2.VideoCapture(0) start = False count = 0 while True: ret, frame = cap.read() if not ret: continue if count == num_samples: break # cv2.rectangle(frame, (10, 30), (310, 330), (0, 255, 0), 2) if start: # roi = frame[100:500, 100:500] #time.sleep(1) save_path = os.path.join(IMG_CLASS_PATH, '{}.jpg'.format(count + 1)) cv2.imwrite(save_path, frame) count += 1 font = cv2.FONT_HERSHEY_SIMPLEX cv2.putText(frame, "Collecting {}".format(count), (5, 10), font, 0.7, (0, 255, 255), 2, cv2.LINE_AA) cv2.imshow("Collecting images", frame) k = cv2.waitKey(10) if k == ord('a'): start = not start if k == ord('q'): break print("\n{} image(s) saved to {}".format(count, IMG_CLASS_PATH)) cap.release() cv2.destroyAllWindows() |
Tensor Flow
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
from tensorflow.keras.preprocessing.image import ImageDataGenerator from tensorflow.keras.applications import MobileNetV2 from tensorflow.keras.layers import AveragePooling2D from tensorflow.keras.layers import Dropout from tensorflow.keras.layers import Flatten from tensorflow.keras.layers import Dense from tensorflow.keras.layers import Input from tensorflow.keras.models import Model from tensorflow.keras.optimizers import Adam from tensorflow.keras.applications.mobilenet_v2 import preprocess_input from tensorflow.keras.preprocessing.image import img_to_array from tensorflow.keras.preprocessing.image import load_img from tensorflow.keras.utils import to_categorical from sklearn.preprocessing import LabelBinarizer from sklearn.model_selection import train_test_split from sklearn.metrics import classification_report from imutils import paths import numpy as np import os # initialize the initial learning rate, number of epochs to train for, # and batch size INIT_LR = 1e-4 EPOCHS = 20 BS = 32 # grab the list of images in our dataset directory, then initialize # the list of data (i.e., images) and class images print("[INFO] loading images...") #imagePaths = list(paths.list_images(args["dataset"])) imagePaths = "dataset" data = [] labels = [] # loop over the image paths for directory in os.listdir(imagePaths): label = os.path.join(imagePaths, directory) if not os.path.isdir(label): continue for item in os.listdir(label): # to make sure no hidden files get in our way if item.startswith("."): continue # load the input image (224x224) and preprocess it image = load_img(os.path.join(label, item), target_size=(224, 224)) image = img_to_array(image) image = preprocess_input(image) # update the data and labels lists, respectively data.append(image) labels.append(label) # convert the data and labels to NumPy arrays data = np.array(data, dtype="float32") labels = np.array(labels) # perform one-hot encoding on the labels lb = LabelBinarizer() labels = lb.fit_transform(labels) labels = to_categorical(labels) # partition the data into training and testing splits using 75% of # the data for training and the remaining 25% for testing (trainX, testX, trainY, testY) = train_test_split(data, labels, test_size=0.20, stratify=labels, random_state=42) # construct the training image generator for data augmentation aug = ImageDataGenerator( rotation_range=20, zoom_range=0.15, width_shift_range=0.2, height_shift_range=0.2, shear_range=0.15, horizontal_flip=True, fill_mode="nearest") # load the MobileNetV2 network, ensuring the head FC layer sets are # left off baseModel = MobileNetV2(weights="imagenet", include_top=False, input_tensor=Input(shape=(224, 224, 3))) # construct the head of the model that will be placed on top of the # the base model headModel = baseModel.output headModel = AveragePooling2D(pool_size=(7, 7))(headModel) headModel = Flatten(name="flatten")(headModel) headModel = Dense(128, activation="relu")(headModel) headModel = Dropout(0.5)(headModel) headModel = Dense(2, activation="softmax")(headModel) # place the head FC model on top of the base model (this will become # the actual model we will train) model = Model(inputs=baseModel.input, outputs=headModel) # loop over all layers in the base model and freeze them so they will # *not* be updated during the first training process for layer in baseModel.layers: layer.trainable = False # compile our model print("[INFO] compiling model...") opt = Adam(lr=INIT_LR, decay=INIT_LR / EPOCHS) model.compile(loss="binary_crossentropy", optimizer=opt, metrics=["accuracy"]) # train the head of the network print("[INFO] training head...") H = model.fit( aug.flow(trainX, trainY, batch_size=BS), steps_per_epoch=len(trainX) // BS, validation_data=(testX, testY), validation_steps=len(testX) // BS, epochs=EPOCHS) # make predictions on the testing set print("[INFO] evaluating network...") predIdxs = model.predict(testX, batch_size=BS) # for each image in the testing set we need to find the index of the # label with corresponding largest predicted probability predIdxs = np.argmax(predIdxs, axis=1) # show a nicely formatted classification report print(classification_report(testY.argmax(axis=1), predIdxs, target_names=lb.classes_)) # serialize the model to disk print("[INFO] saving mask detector model...") model.save("MaskDetector.h5") |