You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Readme
Files and versions
391 lines
15 KiB
391 lines
15 KiB
2 years ago
|
import os
|
||
|
import math
|
||
|
import cv2
|
||
|
import numpy as np
|
||
|
import torch
|
||
|
from PIL import Image
|
||
|
from albumentations.augmentations.functional import image_compression
|
||
|
from facenet_pytorch.models.mtcnn import MTCNN
|
||
|
from concurrent.futures import ThreadPoolExecutor
|
||
|
import matplotlib.pyplot as plt
|
||
|
from torchvision.transforms import Normalize
|
||
|
import logging
|
||
|
|
||
|
log = logging.getLogger()
|
||
|
mean = [0.485, 0.456, 0.406]
|
||
|
std = [0.229, 0.224, 0.225]
|
||
|
normalize_transform = Normalize(mean, std)
|
||
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
||
|
|
||
|
class VideoReader:
|
||
|
"""Helper class for reading one or more frames from a video file."""
|
||
|
|
||
|
def __init__(self, verbose=True, insets=(0, 0)):
|
||
|
"""Creates a new VideoReader.
|
||
|
|
||
|
Arguments:
|
||
|
verbose: whether to print warnings and error messages
|
||
|
insets: amount to inset the image by, as a percentage of
|
||
|
(width, height). This lets you "zoom in" to an image
|
||
|
to remove unimportant content around the borders.
|
||
|
Useful for face detection, which may not work if the
|
||
|
faces are too small.
|
||
|
"""
|
||
|
self.verbose = verbose
|
||
|
self.insets = insets
|
||
|
|
||
|
def read_frames(self, path, num_frames, jitter=0, seed=None):
|
||
|
"""Reads frames that are always evenly spaced throughout the video.
|
||
|
|
||
|
Arguments:
|
||
|
path: the video file
|
||
|
num_frames: how many frames to read, -1 means the entire video
|
||
|
(warning: this will take up a lot of memory!)
|
||
|
jitter: if not 0, adds small random offsets to the frame indices;
|
||
|
this is useful so we don't always land on even or odd frames
|
||
|
seed: random seed for jittering; if you set this to a fixed value,
|
||
|
you probably want to set it only on the first video
|
||
|
"""
|
||
|
assert num_frames > 0
|
||
|
|
||
|
capture = cv2.VideoCapture(path)
|
||
|
frame_count = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))
|
||
|
if frame_count <= 0: return None
|
||
|
|
||
|
frame_idxs = np.linspace(0, frame_count - 1, num_frames, endpoint=True, dtype=np.int)
|
||
|
if jitter > 0:
|
||
|
np.random.seed(seed)
|
||
|
jitter_offsets = np.random.randint(-jitter, jitter, len(frame_idxs))
|
||
|
frame_idxs = np.clip(frame_idxs + jitter_offsets, 0, frame_count - 1)
|
||
|
|
||
|
result = self._read_frames_at_indices(path, capture, frame_idxs)
|
||
|
capture.release()
|
||
|
return result
|
||
|
|
||
|
def read_random_frames(self, path, num_frames, seed=None):
|
||
|
"""Picks the frame indices at random.
|
||
|
|
||
|
Arguments:
|
||
|
path: the video file
|
||
|
num_frames: how many frames to read, -1 means the entire video
|
||
|
(warning: this will take up a lot of memory!)
|
||
|
"""
|
||
|
assert num_frames > 0
|
||
|
np.random.seed(seed)
|
||
|
|
||
|
capture = cv2.VideoCapture(path)
|
||
|
frame_count = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))
|
||
|
if frame_count <= 0: return None
|
||
|
|
||
|
frame_idxs = sorted(np.random.choice(np.arange(0, frame_count), num_frames))
|
||
|
result = self._read_frames_at_indices(path, capture, frame_idxs)
|
||
|
|
||
|
capture.release()
|
||
|
return result
|
||
|
|
||
|
def read_frames_at_indices(self, path, frame_idxs):
|
||
|
"""Reads frames from a video and puts them into a NumPy array.
|
||
|
|
||
|
Arguments:
|
||
|
path: the video file
|
||
|
frame_idxs: a list of frame indices. Important: should be
|
||
|
sorted from low-to-high! If an index appears multiple
|
||
|
times, the frame is still read only once.
|
||
|
|
||
|
Returns:
|
||
|
- a NumPy array of shape (num_frames, height, width, 3)
|
||
|
- a list of the frame indices that were read
|
||
|
|
||
|
Reading stops if loading a frame fails, in which case the first
|
||
|
dimension returned may actually be less than num_frames.
|
||
|
|
||
|
Returns None if an exception is thrown for any reason, or if no
|
||
|
frames were read.
|
||
|
"""
|
||
|
assert len(frame_idxs) > 0
|
||
|
capture = cv2.VideoCapture(path)
|
||
|
result = self._read_frames_at_indices(path, capture, frame_idxs)
|
||
|
capture.release()
|
||
|
return result
|
||
|
|
||
|
def _read_frames_at_indices(self, path, capture, frame_idxs):
|
||
|
try:
|
||
|
frames = []
|
||
|
idxs_read = []
|
||
|
for frame_idx in range(frame_idxs[0], frame_idxs[-1] + 1):
|
||
|
# Get the next frame, but don't decode if we're not using it.
|
||
|
ret = capture.grab()
|
||
|
if not ret:
|
||
|
if self.verbose:
|
||
|
log.error("Error grabbing frame %d from movie %s" % (frame_idx, path))
|
||
|
break
|
||
|
|
||
|
# Need to look at this frame?
|
||
|
current = len(idxs_read)
|
||
|
if frame_idx == frame_idxs[current]:
|
||
|
ret, frame = capture.retrieve()
|
||
|
if not ret or frame is None:
|
||
|
if self.verbose:
|
||
|
log.error("Error retrieving frame %d from movie %s" % (frame_idx, path))
|
||
|
break
|
||
|
|
||
|
frame = self._postprocess_frame(frame)
|
||
|
frames.append(frame)
|
||
|
idxs_read.append(frame_idx)
|
||
|
|
||
|
if len(frames) > 0:
|
||
|
return np.stack(frames), idxs_read
|
||
|
if self.verbose:
|
||
|
log.error("No frames read from movie %s" % path)
|
||
|
return None
|
||
|
except:
|
||
|
if self.verbose:
|
||
|
log.error("Exception while reading movie %s" % path)
|
||
|
return None
|
||
|
|
||
|
def read_middle_frame(self, path):
|
||
|
"""Reads the frame from the middle of the video."""
|
||
|
capture = cv2.VideoCapture(path)
|
||
|
frame_count = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))
|
||
|
result = self._read_frame_at_index(path, capture, frame_count // 2)
|
||
|
capture.release()
|
||
|
return result
|
||
|
|
||
|
def read_frame_at_index(self, path, frame_idx):
|
||
|
"""Reads a single frame from a video.
|
||
|
|
||
|
If you just want to read a single frame from the video, this is more
|
||
|
efficient than scanning through the video to find the frame. However,
|
||
|
for reading multiple frames it's not efficient.
|
||
|
|
||
|
My guess is that a "streaming" approach is more efficient than a
|
||
|
"random access" approach because, unless you happen to grab a keyframe,
|
||
|
the decoder still needs to read all the previous frames in order to
|
||
|
reconstruct the one you're asking for.
|
||
|
|
||
|
Returns a NumPy array of shape (1, H, W, 3) and the index of the frame,
|
||
|
or None if reading failed.
|
||
|
"""
|
||
|
capture = cv2.VideoCapture(path)
|
||
|
result = self._read_frame_at_index(path, capture, frame_idx)
|
||
|
capture.release()
|
||
|
return result
|
||
|
|
||
|
def _read_frame_at_index(self, path, capture, frame_idx):
|
||
|
capture.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
|
||
|
ret, frame = capture.read()
|
||
|
if not ret or frame is None:
|
||
|
if self.verbose:
|
||
|
log.error("Error retrieving frame %d from movie %s" % (frame_idx, path))
|
||
|
return None
|
||
|
else:
|
||
|
frame = self._postprocess_frame(frame)
|
||
|
return np.expand_dims(frame, axis=0), [frame_idx]
|
||
|
|
||
|
def _postprocess_frame(self, frame):
|
||
|
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
||
|
|
||
|
if self.insets[0] > 0:
|
||
|
W = frame.shape[1]
|
||
|
p = int(W * self.insets[0])
|
||
|
frame = frame[:, p:-p, :]
|
||
|
|
||
|
if self.insets[1] > 0:
|
||
|
H = frame.shape[1]
|
||
|
q = int(H * self.insets[1])
|
||
|
frame = frame[q:-q, :, :]
|
||
|
|
||
|
return frame
|
||
|
|
||
|
|
||
|
class FaceExtractor:
|
||
|
def __init__(self, video_read_fn):
|
||
|
self.video_read_fn = video_read_fn
|
||
|
self.detector = MTCNN(margin=0, thresholds=[0.7, 0.8, 0.8], device=device)
|
||
|
|
||
|
def process_videos(self, input_dir, filenames, video_idxs):
|
||
|
videos_read = []
|
||
|
frames_read = []
|
||
|
frames = []
|
||
|
results = []
|
||
|
for video_idx in video_idxs:
|
||
|
# Read the full-size frames from this video.
|
||
|
filename = filenames[video_idx]
|
||
|
video_path = os.path.join(input_dir, filename)
|
||
|
result = self.video_read_fn(video_path)
|
||
|
# Error? Then skip this video.
|
||
|
if result is None: continue
|
||
|
|
||
|
videos_read.append(video_idx)
|
||
|
|
||
|
# Keep track of the original frames (need them later).
|
||
|
my_frames, my_idxs = result
|
||
|
|
||
|
frames.append(my_frames)
|
||
|
frames_read.append(my_idxs)
|
||
|
for i, frame in enumerate(my_frames):
|
||
|
h, w = frame.shape[:2]
|
||
|
img = Image.fromarray(frame.astype(np.uint8))
|
||
|
img = img.resize(size=[s // 2 for s in img.size])
|
||
|
|
||
|
batch_boxes, probs = self.detector.detect(img, landmarks=False)
|
||
|
|
||
|
faces = []
|
||
|
scores = []
|
||
|
if batch_boxes is None:
|
||
|
continue
|
||
|
for bbox, score in zip(batch_boxes, probs):
|
||
|
if bbox is not None:
|
||
|
xmin, ymin, xmax, ymax = [int(b * 2) for b in bbox]
|
||
|
w = xmax - xmin
|
||
|
h = ymax - ymin
|
||
|
p_h = h // 3
|
||
|
p_w = w // 3
|
||
|
crop = frame[max(ymin - p_h, 0):ymax + p_h, max(xmin - p_w, 0):xmax + p_w]
|
||
|
faces.append(crop)
|
||
|
scores.append(score)
|
||
|
|
||
|
frame_dict = {"video_idx": video_idx,
|
||
|
"frame_idx": my_idxs[i],
|
||
|
"frame_w": w,
|
||
|
"frame_h": h,
|
||
|
"faces": faces,
|
||
|
"scores": scores}
|
||
|
results.append(frame_dict)
|
||
|
|
||
|
return results
|
||
|
|
||
|
def process_video(self, video_path):
|
||
|
"""Convenience method for doing face extraction on a single video."""
|
||
|
input_dir = os.path.dirname(video_path)
|
||
|
filenames = [os.path.basename(video_path)]
|
||
|
return self.process_videos(input_dir, filenames, [0])
|
||
|
|
||
|
|
||
|
|
||
|
def confident_strategy(pred, t=0.8):
|
||
|
pred = np.array(pred)
|
||
|
sz = len(pred)
|
||
|
fakes = np.count_nonzero(pred > t)
|
||
|
# 11 frames are detected as fakes with high probability
|
||
|
if fakes > sz // 2.5 and fakes > 11:
|
||
|
return np.mean(pred[pred > t])
|
||
|
elif np.count_nonzero(pred < 0.2) > 0.9 * sz:
|
||
|
return np.mean(pred[pred < 0.2])
|
||
|
else:
|
||
|
return np.mean(pred)
|
||
|
|
||
|
strategy = confident_strategy
|
||
|
|
||
|
|
||
|
def put_to_center(img, input_size):
|
||
|
img = img[:input_size, :input_size]
|
||
|
image = np.zeros((input_size, input_size, 3), dtype=np.uint8)
|
||
|
start_w = (input_size - img.shape[1]) // 2
|
||
|
start_h = (input_size - img.shape[0]) // 2
|
||
|
image[start_h:start_h + img.shape[0], start_w: start_w + img.shape[1], :] = img
|
||
|
return image
|
||
|
|
||
|
|
||
|
def isotropically_resize_image(img, size, interpolation_down=cv2.INTER_AREA, interpolation_up=cv2.INTER_CUBIC):
|
||
|
h, w = img.shape[:2]
|
||
|
if max(w, h) == size:
|
||
|
return img
|
||
|
if w > h:
|
||
|
scale = size / w
|
||
|
h = h * scale
|
||
|
w = size
|
||
|
else:
|
||
|
scale = size / h
|
||
|
w = w * scale
|
||
|
h = size
|
||
|
interpolation = interpolation_up if scale > 1 else interpolation_down
|
||
|
resized = cv2.resize(img, (int(w), int(h)), interpolation=interpolation)
|
||
|
return resized
|
||
|
|
||
|
def dist(p1, p2):
|
||
|
return math.sqrt((p1[0] - p2[0]) ** 2 + (p1[1] - p2[1]) ** 2)
|
||
|
|
||
|
detector = MTCNN(margin=0, thresholds=(0.7, 0.8, 0.8), device=device)
|
||
|
def predict_on_video(distill, face_extractor, video_path, batch_size, input_size, models, strategy=np.mean,
|
||
|
apply_compression=False):
|
||
|
batch_size *= 4
|
||
|
try:
|
||
|
faces = face_extractor.process_video(video_path)
|
||
|
if len(faces) > 0:
|
||
|
x = np.zeros((batch_size, input_size, input_size, 3), dtype=np.uint8)
|
||
|
#e = np.zeros((batch_size, 32, 32, 3), dtype=np.uint8) #eye
|
||
|
n = 0
|
||
|
for frame_data in faces:
|
||
|
for face in frame_data["faces"]:
|
||
|
#print(face)
|
||
|
# _,_,landmark = detector.detect(face, landmarks=True)
|
||
|
'''# eye 0524
|
||
|
try:
|
||
|
landmark = np.around(landmark[0]).astype(np.int16)
|
||
|
(x1, y1), (x2, y2) = landmark[:2]
|
||
|
w = dist((x1, y1), (x2, y2))
|
||
|
dilation = int(w // 4)
|
||
|
eye_image = face[y2 - dilation:y1 + dilation, x1 - dilation:x2 + dilation]
|
||
|
eye_image = cv2.resize(eye_image, dsize=(32, 32), interpolation=cv2.INTER_CUBIC)
|
||
|
except Exception as ex:
|
||
|
eye_image = cv2.resize(face, dsize=(32, 32), interpolation=cv2.INTER_CUBIC)
|
||
|
'''#
|
||
|
resized_face = isotropically_resize_image(face, input_size)
|
||
|
resized_face = put_to_center(resized_face, input_size)
|
||
|
|
||
|
if apply_compression:
|
||
|
resized_face = image_compression(resized_face, quality=90, image_type=".jpg")
|
||
|
#eye_image = image_compression(eye_image, quality=90, image_type=".jpg")#eye
|
||
|
if n + 1 < batch_size:
|
||
|
x[n] = resized_face
|
||
|
#e[n] = eye_image#eye
|
||
|
n += 1
|
||
|
else:
|
||
|
pass
|
||
|
if n > 0:
|
||
|
x = torch.tensor(x, device=device).float()
|
||
|
#e = torch.tensor(e, device="cuda").float() #eye
|
||
|
# Preprocess the images.
|
||
|
x = x.permute((0, 3, 1, 2))
|
||
|
#e = e.permute((0, 3, 1, 2))#eye
|
||
|
for i in range(len(x)):
|
||
|
x[i] = normalize_transform(x[i] / 255.)
|
||
|
#e[i] = normalize_transform(e[i] / 255.) #eye
|
||
|
# Make a prediction, then take the average.
|
||
|
with torch.no_grad():
|
||
|
preds = []
|
||
|
for model in models:
|
||
|
if distill:
|
||
|
_, y_pred, _ = model(x[:n]) #eye , e[:n].half()
|
||
|
else:
|
||
|
y_pred = model(x[:n])
|
||
|
y_pred = torch.sigmoid(y_pred.squeeze())
|
||
|
bpred = y_pred[:n].cpu().numpy()
|
||
|
preds.append(strategy(bpred))
|
||
|
return np.mean(preds)
|
||
|
except Exception as e:
|
||
|
log.error("Prediction error on video %s: %s" % (video_path, str(e)))
|
||
|
|
||
|
return 0.5
|
||
|
|
||
|
|
||
|
def predict_on_video_set(distill, face_extractor, videos, input_size, num_workers, test_dir, frames_per_video, models,
|
||
|
strategy=np.mean,
|
||
|
apply_compression=False):
|
||
|
def process_file(i):
|
||
|
filename = videos[i]
|
||
|
y_pred = predict_on_video(distill, face_extractor=face_extractor, video_path=os.path.join(test_dir, filename),
|
||
|
input_size=input_size,
|
||
|
batch_size=frames_per_video,
|
||
|
models=models, strategy=strategy, apply_compression=apply_compression)
|
||
|
return y_pred
|
||
|
|
||
|
with ThreadPoolExecutor(max_workers=num_workers) as ex:
|
||
|
predictions = ex.map(process_file, range(len(videos)))
|
||
|
#predictions = []
|
||
|
#for i in range(len(videos)):
|
||
|
# predictions.append(process_file(i))
|
||
|
return list(predictions)
|
||
|
|