"""
@author: meili
@contact: meili02@corp.netease.com
@file: win.py
@date: 2023/2/20 17:59
@desc:
"""
import ctypes
import os
import time
import requests
import websocket
import ssl
from airtest.core.api import Template
from autoease.utils import log as log_util
from autoease.utils import windows as win_util
from autoease.utils import file as file_util
from autoease.device.device import Device
from autoease.core.error import KeyboardError
from autoease.utils.file import FileManager
from autoease.record_and_screenshot.xbox_recorder import XboxRecorder
from autoease.cv import airtest_cv
from autoease.ocr import airtest_ocr
from autoease.core import error as errors
from autoease.utils.logger import get_logger
LOGGING = get_logger(__name__)
XBOX_KEY_DICT = {
"1": 2,
"2": 3,
"3": 4,
"4": 5,
"5": 6,
"6": 7,
"7": 8,
"8": 9,
"9": 10,
"0": 11,
"-": 12,
"=": 13,
"backspace": 14,
"tab": 15,
"q": 16,
"w": 17,
"e": 18,
"r": 19,
"t": 20,
"y": 21,
"u": 22,
"i": 23,
"o": 24,
"p": 25,
"[": 26,
"]": 27,
"enter": 28,
"l_ctrl": 29,
"a": 30,
"s": 31,
"d": 32,
"f": 33,
"g": 34,
"h": 35,
"j": 36,
"k": 37,
"l": 38,
";": 39,
"'": 40,
"`": 41,
"l_shift": 42,
"\\": 43,
"z": 44,
"x": 45,
"c": 46,
"v": 47,
"b": 48,
"n": 49,
"m": 50,
",": 51,
".": 52,
"/": 53,
"r_shift": 54,
"*": 55,
"l_alt": 56,
"space": 57,
"caps_lock": 58,
}
[docs]class XBOX_GamePad_TO_CODE(object):
RIGHT_THUMBSTICK = 0x8000
LEFT_THUMBSTICK = 0x4000
RIGHT_SHOULDER = 0x2000
LEFT_SHOULDER = 0x1000
DPAD_RIGHT = 0x800
DPAD_LEFT = 0x400
DPAD_DOWN = 0x200
DPAD_UP = 0x100
Y = 0x80
X = 0x40
B = 0x20
A = 0x10
VIEW = 0x8
MENU = 0x4
NEXUS = 0x2
ENROLL = 0x1
NONE = 0
[docs]class Xbox(Device):
"""Xbox client."""
def __init__(self, ip, http_port=11443,
game_project_name=None, is_record=True,
file_manager: FileManager = None,
xbox_gdk_bin="C:\\Program Files (x86)\\Microsoft GDK\\bin",
**kwargs):
super(Xbox, self).__init__()
print("Initial Xbox")
self.ip_address = ip
self.http_port = http_port
self.xbox_gdk_bin = xbox_gdk_bin
self.input_websocket_url = f"wss://{ip}:{self.http_port}/ext/remoteinput"
self.file_manager: FileManager = file_manager
self.game_name = None
self.game_project_name = None
self.game_log_path = None
self.game_package_full_name = None
self.game_AUMID = None
self.is_record = is_record
self.recorder: XboxRecorder = None
self.whole_video = ""
self.playback_video = ""
self.vide_list = []
self.screenshot_list = []
if game_project_name is not None:
self.game_name = game_project_name
self.game_project_name = game_project_name
self.game_log_path = f"\\\\{ip}\\SystemScratch\\Logs\\{game_project_name}.log"
self.__init_packagefullname_and_AUMID()
if self.is_record is True:
self.whole_video = self.__generate_filename(end_with=".mp4")
print(f"whole video: {self.whole_video}")
self.recorder: XboxRecorder = XboxRecorder(ip=ip, filename=self.whole_video)
self.vide_list.append(self.whole_video)
# init dll
# TODO 下面目前是写死的,后面需要改
xbox_dll_path = os.path.join(os.getcwd(), "extras/Dll/Xbox")
win_util.add_path_to_env(xbox_dll_path)
# dll_file = os.path.join(os.path.join("D:/professionalSoftwareDatas/C++Projects/XboxDll/x64/Debug", "XboxDll.dll"))
dll_file = os.path.join(os.getcwd(), "extras/Dll/Xbox/XboxDll.dll")
self.cpp_dll = ctypes.windll.LoadLibrary(dll_file)
self.__init_xbox_client()
# init websocket for keyboard
self.websocket_instance = websocket.WebSocket(sslopt={"cert_reqs": ssl.CERT_NONE})
self.websocket_instance.connect(self.input_websocket_url)
print(f"Connect console {self.ip_address} successfully!")
# init ocr and cv
airtest_cv.init_airtest_cv()
airtest_ocr.init_airtest_ocr()
def __init_xbox_client(self):
self.cpp_dll.Init_Xbox_Client.argtypes = [ctypes.c_char_p]
xbox_ip_address = self.ip_address.encode("utf-8")
res = self.cpp_dll.Init_Xbox_Client(xbox_ip_address)
def __init_packagefullname_and_AUMID(self):
out_str = win_util.run_command_sync("xbapp list", self.xbox_gdk_bin)
_t = 0
for line in out_str.splitlines():
if self.game_project_name in line:
line = line.replace('\n', '').replace('\t', '').replace(' ', '').strip()
if _t == 0:
self.game_package_full_name = line
_t = 1
else:
self.game_AUMID = line
def __generate_filename(self, end_with=".txt"):
if self.file_manager is None:
return file_util.generate_filename(end_with)
return self.file_manager.generate_filename(curr_dir="videos", end_with=end_with)
def __save_data_to_file(self, data, end_with=".txt"):
file_save_path = self.__generate_filename(end_with=end_with)
with open(file_save_path, 'wb') as f:
f.write(data)
return file_save_path
@property
def get_game_name(self):
return self.game_name
@property
def get_game_path(self):
return None
@property
def get_game_log_path(self):
print(f"get_game_log_path {self.game_log_path}")
return self.game_log_path
@property
def get_screenshot_file_list(self):
return self.screenshot_list
@property
def get_video_file_list(self):
return self.vide_list
"""
process api
"""
[docs] def set_process_fronted(self):
# xbox platform not need it
pass
[docs] def check_process_alive(self) -> bool:
out_str = win_util.run_command_sync(f"xbapp query {self.game_package_full_name}", self.xbox_gdk_bin)
if "not registered" in out_str:
print(f"{self.game_package_full_name} not registered!")
return "running" in out_str
"""
video api
"""
[docs] def playback_record(self):
# TODO,没找到相关能够支持的api,可能需要自己搞
pass
[docs] def stop_record(self):
if self.is_record is False or self.recorder is None:
return
self.recorder.stop_record_video()
"""
keyboard api
"""
def _get_keycode(self, key):
key_code = XBOX_KEY_DICT.get(key.lower(), None)
if key_code is None:
raise KeyboardError(f"The key {key} cannot found the key scancode!")
return key_code
[docs] def key_press(self, key, duration=0.05, **kwargs):
print(f"Key press: {key} Duration: {duration}")
self.key_down(key)
if duration is not None:
time.sleep(duration)
self.key_up(key)
[docs] def key_up(self, key, **kwargs):
key_code = self._get_keycode(key)
self.websocket_instance.send_binary(bytearray([0x02, key_code, 0x00]))
[docs] def key_down(self, key, **kwargs):
key_code = self._get_keycode(key)
self.websocket_instance.send_binary(bytearray([0x02, key_code, 0x01]))
[docs] def key_write(self, text, duration=None, **kwargs):
# TODO 这里是根据text来模拟按键输入,遇到特殊的字符需要在这里转换为按键
for c in text:
if c == " ":
self.key_press("space")
else:
self.key_press(c)
"""
image identification api
"""
[docs] def img_exist(self, image: airtest_cv.Image, **kwargs):
screenshot_bytes = self.screenshot_return_bytes()
open_cv_image = airtest_cv.bytes_2_cv_img(screenshot_bytes)
match_pos = image.match_in(open_cv_image)
if match_pos is None:
return False
return match_pos
[docs] def img_assert_exists(self, image: airtest_cv.Image, **kwargs):
pos = self.img_exist(image, **kwargs)
if pos is False:
msg = kwargs.get("msg", "")
raise errors.AssertionError("%s does not exist in screen, message: %s" % (image, msg))
return pos
[docs] def img_assert_not_exists(self, image: airtest_cv.Image, **kwargs):
pos = self.img_exist(image, **kwargs)
if pos is not False:
msg = kwargs.get("msg", "")
raise errors.AssertionError("%s exists unexpectedly at pos: %s, message: %s" % (image, pos, msg))
return pos
"""
ocr api
"""
[docs] def ocr_exist_sentence(self, word_list: list, **kwargs):
screenshot_bytes = self.screenshot_return_bytes()
open_cv_image = airtest_cv.bytes_2_cv_img(screenshot_bytes)
return airtest_ocr.ocr_exist_from_cv(word_list, open_cv_image)
[docs] def ocr_assert_exist_sentence(self, word_list: list, **kwargs):
pos = self.ocr_exist_sentence(word_list, **kwargs)
if pos is False:
sentence = ''.join(word_list)
raise errors.TargetNotFoundError(f'OCR can not find the sentence {sentence} in screen')
return pos
[docs] def ocr_exist_sentence_on_screen_rectangle(self,
x1: int,
y1: int,
x2: int,
y2: int,
word_list: list,
**kwargs):
screenshot_bytes = self.screenshot_return_bytes()
open_cv_image = airtest_cv.bytes_2_cv_img(screenshot_bytes)
return airtest_ocr.ocr_exist_from_cv_on_rectangle(x1, y1, x2, y2, word_list, open_cv_image)
"""
gamepad api
"""
[docs] def gamepad_left_joystick_set(self, x_value: float, y_value: float, duration=0.05):
self.cpp_dll.Left_Joystick.argtypes = [ctypes.c_short, ctypes.c_short]
x_value_int, y_value_int = round(x_value * 32767), round(y_value * 32767)
self.cpp_dll.Left_Joystick(ctypes.c_short(x_value_int), ctypes.c_short(y_value_int))
if duration:
time.sleep(duration)
[docs] def gamepad_left_joystick_set_and_reset(self, x_value: float, y_value: float, duration=0.05):
self.gamepad_left_joystick_set(x_value, y_value, duration)
self.cpp_dll.Left_Joystick(ctypes.c_short(0), ctypes.c_short(0))
[docs] def gamepad_right_joystick_set(self, x_value: float, y_value: float, duration=0.05):
self.cpp_dll.Right_Joystick.argtypes = [ctypes.c_short, ctypes.c_short]
x_value_int, y_value_int = round(x_value * 32767), round(y_value * 32767)
self.cpp_dll.Right_Joystick(ctypes.c_short(x_value_int), ctypes.c_short(y_value_int))
if duration:
time.sleep(duration)
[docs] def gamepad_right_joystick_set_and_reset(self, x_value: float, y_value: float, duration=0.05):
self.gamepad_right_joystick_set(x_value, y_value, duration)
self.cpp_dll.Right_Joystick(ctypes.c_short(0), ctypes.c_short(0))
[docs] def gamepad_left_trigger_set(self, value: float, duration=0.05):
value_int = int(value * 65535)
self.cpp_dll.Left_Trigger(value_int)
if duration:
time.sleep(duration)
[docs] def gamepad_left_trigger_set_and_reset(self, value: float, duration=0.05):
self.gamepad_left_trigger_set(value, duration)
self.cpp_dll.Left_Trigger(0)
[docs] def gamepad_right_trigger_set(self, value, duration):
value_int = int(value * 65535)
self.cpp_dll.Right_Trigger(value_int)
if duration:
time.sleep(duration)
[docs] def gamepad_right_trigger_set_and_reset(self, value, duration):
self.gamepad_right_trigger_set(value, duration)
self.cpp_dll.Right_Trigger(0)
[docs] def screenshot(self) -> str:
screenshot_bytes = self.screenshot_return_bytes()
screenshot_filepath = self.__save_data_to_file(screenshot_bytes, end_with="")
self.screenshot_list.append(screenshot_filepath)
return screenshot_filepath
[docs] def screenshot_return_bytes(self) -> bytes:
response = requests.get(f'https://{self.ip_address}:{self.http_port}/ext/screenshot', verify=False)
return response.content
[docs] def wait_for_keyword(self, keyword, log_file_path, timeout=20, is_rematch=False, opfunc=None, **kwargs):
if log_file_path is None:
log_file_path = self.get_game_log_path
return log_util.wait_for_keyword(log_file_path, keyword, timeout=timeout, is_rematch=is_rematch, opfunc=opfunc)
def __del__(self):
print(f"Close cosole {self.ip_address} connection.")
self.websocket_instance.close()
if __name__ == '__main__':
xbox_ins = Xbox("10.227.72.44", game_project_name="ThirdPersonGameTemplate")
# xbox_ins.input_console_command("bugit")
# time.sleep(3)
# xbox_ins.key_press("W", duration=3)
#
# time.sleep(3)
# xbox_ins.input_console_command("bugit")
# time.sleep(2)
xbox_ins.key_press("A", duration=3)
#
# time.sleep(3)
# xbox_ins.input_console_command("bugit")
# time.sleep(2)
# xbox_ins.key_press("S", duration=3)
#
# time.sleep(3)
# xbox_ins.input_console_command("bugit")
# time.sleep(2)
# xbox_ins.key_press("D", duration=3)
#
# time.sleep(5)
# xbox_ins.gamepad_button_press(XBOX_GamePad_TO_CODE.A, duration=0.05)
# time.sleep(2)
# res = xbox_ins.screenshot()
# with open('screenshot.png', 'wb') as f:
# f.write(res)
# print(res)
# print(type(res))
img_res = xbox_ins.img_exist(Template("light.png"))
print(img_res)
ocr_res = xbox_ins.ocr_exist_sentence(["Dev Home"])
print(ocr_res)
xbox_ins.stop_record()
time.sleep(10)
res = xbox_ins.check_process_alive()
print(res)