"""
@author: meili
@contact: meili02@corp.netease.com
@file: WGC_recorder.py
@date: 2023/08/16 17:18
@desc:
"""
import os
import subprocess
import time
import win32api
from autoease.record_and_screenshot.recorder_base import RecorderBase
from autoease.utils import file as file_utils
[docs]class WGCRecorder(RecorderBase):
def __init__(self, monitor_num=0):
super().__init__()
self._CAPTURE_P: subprocess.Popen = None
self.__initial_time = 0
self.__init_WGC_recorder(monitor_num)
def __init_WGC_recorder(self, monitor_num):
print("init WGC recorder")
hmon = self._get_hmon(monitor_num)
vcodec = self._get_vcodec()
cmd = " ".join(
[
".\\extras\\WGCRecorder.exe",
"-f .\\extras\\ffmpeg.exe",
f"-hmon {hmon}",
f"-vcodec {vcodec}"
]
)
print(cmd)
self._CAPTURE_P = subprocess.Popen(
cmd,
shell=True,
stdin=subprocess.PIPE,
stderr=subprocess.DEVNULL,
stdout=subprocess.DEVNULL
)
self.__initial_time = time.time()
def _get_hmon(self, monitor_num=0):
monitors = []
for hmon, _, _ in win32api.EnumDisplayMonitors():
monitors.append(hmon.__int__())
try:
monitor_hmon = monitors[monitor_num]
return monitor_hmon
except IndexError:
return None
def _get_vcodec(self):
option_cv = "h264"
try:
import GPUtil
gpus = GPUtil.getGPUs()
for _ in gpus:
if "NVIDIA" in _.name or "GeForce" in _.name:
option_cv = "h264_nvenc"
break
except Exception:
pass
try:
from pyadl import ADLManager
gpus = ADLManager.getInstance().getDevices()
for _ in gpus:
if "AMD" in str(_.adapterName):
option_cv = "h264"
break
except Exception:
pass
return option_cv
def _send_record_command(self, cmd):
self._CAPTURE_P.stdin.write(
str.encode(cmd)
)
self._CAPTURE_P.stdin.flush()
[docs] def start_record_video(self, filename="whole_video.mp4"):
print(f"whole_video_name: {filename}")
self.full_video = filename
self._send_record_command(f"start_record {filename}\n")
duration_time = time.time() - self.__initial_time
if duration_time <= 2.5:
time.sleep(2.5 - duration_time)
return filename
[docs] def stop_record_video(self):
is_terminal = self._CAPTURE_P.poll()
if is_terminal is not None:
# subprocess is not alive
return
cmd = "stop_record\n"
self._send_record_command(cmd)
[docs] def close_recorder(self):
print("WGC recorder close")
is_terminal = self._CAPTURE_P.poll()
if is_terminal is not None:
return
cmd = f"quit\n"
self._send_record_command(cmd)
try:
self._CAPTURE_P.wait(timeout=5)
except subprocess.TimeoutExpired as error:
print(error)
try:
os.remove("WGCRecorder.log")
file_utils.remove_files(".", '*.ts')
file_utils.remove_files(".", '*.m3u8')
except Exception as e:
pass
# def screenshot_now(self):
# screenshot_name = self.file_manager.generate_filename(end_with=".png")
# cmd = f"capture {screenshot_name}\n"
# print(f"capture name is {screenshot_name}")
# self._send_record_command(cmd)
# self.append_screenshot_list(screenshot_name)
# return screenshot_name
[docs] def playback_video(self, filename="playback_video.mp4"):
video_name = filename
cmd = f"playback {video_name}\n"
self._send_record_command(cmd)
self.playback_video = video_name
return video_name
def __del__(self):
self.stop_record_video()
self.close_recorder()
pass
if __name__ == '__main__':
recorder = WGCRecorder(monitor_num=1)
recorder.start_record_video()
time.sleep(5)
recorder.stop_record_video()
pass