Video-Summary/Application/VideoReader.py

190 lines
6.4 KiB
Python

"""Video reading utility with buffering support."""
import multiprocessing
import os
import queue
import threading
from typing import Optional, Set, Tuple
import cv2
class VideoReader:
"""
Asynchronous video reader with frame buffering.
This class provides efficient video reading by using a separate thread/process
to continuously load frames into a buffer, improving I/O performance for video
processing pipelines.
Attributes:
list_of_frames: Optional list of specific frame numbers to read
w: Video width in pixels
h: Video height in pixels
"""
list_of_frames: Optional[list] = None
w: Optional[int] = None
h: Optional[int] = None
def __init__(self, config, set_of_frames: Optional[Set[int]] = None, multiprocess: bool = False):
"""
Initialize VideoReader.
Args:
config: Configuration dictionary containing:
- inputPath: Path to video file
- videoBufferLength: Size of frame buffer
set_of_frames: Optional set of specific frame numbers to read.
If None, reads all frames.
multiprocess: If True, uses multiprocessing for buffer.
If False, uses threading (default).
Raises:
Exception: If video_path is not provided in config.
"""
video_path = config["inputPath"]
if video_path is None:
raise Exception("ERROR: Video reader needs a video_path!")
self.video_path = video_path
self.last_frame = 0
# buffer data struct:
# buffer = Queue([(frameNumber, frame), ])
self.multiprocess = multiprocess
if multiprocess:
self.buffer = multiprocessing.Manager().Queue(config["videoBufferLength"])
else:
self.buffer = queue.Queue(config["videoBufferLength"])
self.stopped = False
self.get_wh()
self.calc_fps()
self.calc_length()
self.calc_start_time()
if set_of_frames is not None:
self.list_of_frames = sorted(set_of_frames)
def __enter__(self):
"""Context manager entry - starts buffer filling."""
self.fill_buffer()
return self
def __exit__(self, type, value, traceback):
"""Context manager exit - stops video reading."""
self.stop()
def stop(self):
"""Stop the video reading thread and wait for it to complete."""
self.thread.join()
def pop(self) -> Tuple[int, Optional[any]]:
"""
Pop next frame from buffer.
Returns:
Tuple of (frame_number, frame). Frame is None when video ends.
"""
frame_number, frame = self.buffer.get(block=True)
if frame is None:
self.stopped = True
return frame_number, frame
def fill_buffer(self, list_of_frames: Optional[list] = None):
"""
Start asynchronous buffer filling.
Args:
list_of_frames: Optional list of specific frame numbers to read.
If None, reads all frames sequentially.
"""
self.end_frame = int(cv2.VideoCapture(self.video_path).get(cv2.CAP_PROP_FRAME_COUNT))
if list_of_frames is not None:
self.list_of_frames = list_of_frames
if self.multiprocess:
if self.list_of_frames is not None:
self.thread = multiprocessing.Process(target=self.read_frames_by_list, args=())
else:
self.thread = multiprocessing.Process(target=self.read_frames, args=())
else:
if self.list_of_frames is not None:
self.thread = threading.Thread(target=self.read_frames_by_list, args=())
else:
self.thread = threading.Thread(target=self.read_frames, args=())
self.thread.start()
def read_frames(self):
"""Read video frames sequentially from start to finish."""
self.vc = cv2.VideoCapture(self.video_path)
while self.last_frame < self.end_frame:
res, frame = self.vc.read()
if res:
self.buffer.put((self.last_frame, frame))
self.last_frame += 1
self.buffer.put((self.last_frame, None))
def read_frames_by_list(self):
"""Reads all frames from a list of frame numbers"""
self.vc = cv2.VideoCapture(self.video_path)
self.vc.set(1, self.list_of_frames[0])
self.last_frame = self.list_of_frames[0]
self.end_frame = self.list_of_frames[-1]
while self.last_frame < self.end_frame:
if self.last_frame in self.list_of_frames:
res, frame = self.vc.read()
if res:
self.buffer.put((self.last_frame, frame))
else:
print("Couldn't read Frame")
# since the list is sorted the first element is always the lowest relevant framenumber
# [0,1,2,3,32,33,34,35,67,68,69]
self.list_of_frames.pop(0)
self.last_frame += 1
else:
# if current Frame number is not in list of Frames, we can skip a few frames
self.vc.set(1, self.list_of_frames[0])
self.last_frame = self.list_of_frames[0]
self.buffer.put((self.last_frame, None))
def video_ended(self):
if self.stopped and self.buffer.empty():
return True
else:
return False
def calc_fps(self):
self.fps = cv2.VideoCapture(self.video_path).get(cv2.CAP_PROP_FPS)
def get_fps(self):
if self.fps is None:
self.calc_fps()
return self.fps
def calc_length(self):
fc = int(cv2.VideoCapture(self.video_path).get(cv2.CAP_PROP_FRAME_COUNT))
self.length = fc / self.get_fps()
def get_length(self):
if self.length is None:
self.calc_length()
return self.length
def calc_start_time(self):
starttime = os.stat(self.video_path).st_mtime
length = self.get_length()
starttime = starttime - length
self.starttime = starttime
def get_start_time(self):
return self.starttime
def get_wh(self):
"""get width and height"""
vc = cv2.VideoCapture(self.video_path)
if self.w is None or self.h is None:
res, image = vc.read()
self.w = image.shape[1]
self.h = image.shape[0]
return (self.w, self.h)