Source code for autoease.record_and_screenshot.WGC_recorder

"""
@author: meili
@contact: meili02@corp.netease.com
@file: WGC_recorder.py
@date: 2023/08/16 17:18
@desc:
"""
import os
import subprocess
import time
import win32api

from autoease.record_and_screenshot.recorder_base import RecorderBase
from autoease.utils import file as file_utils


[docs]class WGCRecorder(RecorderBase): def __init__(self, monitor_num=0): super().__init__() self._CAPTURE_P: subprocess.Popen = None self.__initial_time = 0 self.__init_WGC_recorder(monitor_num) def __init_WGC_recorder(self, monitor_num): print("init WGC recorder") hmon = self._get_hmon(monitor_num) vcodec = self._get_vcodec() cmd = " ".join( [ ".\\extras\\WGCRecorder.exe", "-f .\\extras\\ffmpeg.exe", f"-hmon {hmon}", f"-vcodec {vcodec}" ] ) print(cmd) self._CAPTURE_P = subprocess.Popen( cmd, shell=True, stdin=subprocess.PIPE, stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL ) self.__initial_time = time.time() def _get_hmon(self, monitor_num=0): monitors = [] for hmon, _, _ in win32api.EnumDisplayMonitors(): monitors.append(hmon.__int__()) try: monitor_hmon = monitors[monitor_num] return monitor_hmon except IndexError: return None def _get_vcodec(self): option_cv = "h264" try: import GPUtil gpus = GPUtil.getGPUs() for _ in gpus: if "NVIDIA" in _.name or "GeForce" in _.name: option_cv = "h264_nvenc" break except Exception: pass try: from pyadl import ADLManager gpus = ADLManager.getInstance().getDevices() for _ in gpus: if "AMD" in str(_.adapterName): option_cv = "h264" break except Exception: pass return option_cv def _send_record_command(self, cmd): self._CAPTURE_P.stdin.write( str.encode(cmd) ) self._CAPTURE_P.stdin.flush()
[docs] def start_record_video(self, filename="whole_video.mp4"): print(f"whole_video_name: {filename}") self.full_video = filename self._send_record_command(f"start_record {filename}\n") duration_time = time.time() - self.__initial_time if duration_time <= 2.5: time.sleep(2.5 - duration_time) return filename
[docs] def stop_record_video(self): is_terminal = self._CAPTURE_P.poll() if is_terminal is not None: # subprocess is not alive return cmd = "stop_record\n" self._send_record_command(cmd)
[docs] def close_recorder(self): print("WGC recorder close") is_terminal = self._CAPTURE_P.poll() if is_terminal is not None: return cmd = f"quit\n" self._send_record_command(cmd) try: self._CAPTURE_P.wait(timeout=5) except subprocess.TimeoutExpired as error: print(error) try: os.remove("WGCRecorder.log") file_utils.remove_files(".", '*.ts') file_utils.remove_files(".", '*.m3u8') except Exception as e: pass
# def screenshot_now(self): # screenshot_name = self.file_manager.generate_filename(end_with=".png") # cmd = f"capture {screenshot_name}\n" # print(f"capture name is {screenshot_name}") # self._send_record_command(cmd) # self.append_screenshot_list(screenshot_name) # return screenshot_name
[docs] def playback_video(self, filename="playback_video.mp4"): video_name = filename cmd = f"playback {video_name}\n" self._send_record_command(cmd) self.playback_video = video_name return video_name
def __del__(self): self.stop_record_video() self.close_recorder() pass
if __name__ == '__main__': recorder = WGCRecorder(monitor_num=1) recorder.start_record_video() time.sleep(5) recorder.stop_record_video() pass