Source code for autoease.device.xbox.xbox

"""
@author: meili
@contact: meili02@corp.netease.com
@file: win.py
@date: 2023/2/20 17:59
@desc: 
"""
import ctypes
import os
import time

import requests
import websocket
import ssl
from airtest.core.api import Template
from autoease.utils import log as log_util
from autoease.utils import windows as win_util
from autoease.utils import file as file_util
from autoease.device.device import Device
from autoease.core.error import KeyboardError
from autoease.utils.file import FileManager
from autoease.record_and_screenshot.xbox_recorder import XboxRecorder
from autoease.cv import airtest_cv
from autoease.ocr import airtest_ocr
from autoease.core import error as errors
from autoease.utils.logger import get_logger

LOGGING = get_logger(__name__)

XBOX_KEY_DICT = {
    "1": 2,
    "2": 3,
    "3": 4,
    "4": 5,
    "5": 6,
    "6": 7,
    "7": 8,
    "8": 9,
    "9": 10,
    "0": 11,
    "-": 12,
    "=": 13,
    "backspace": 14,
    "tab": 15,
    "q": 16,
    "w": 17,
    "e": 18,
    "r": 19,
    "t": 20,
    "y": 21,
    "u": 22,
    "i": 23,
    "o": 24,
    "p": 25,
    "[": 26,
    "]": 27,
    "enter": 28,
    "l_ctrl": 29,
    "a": 30,
    "s": 31,
    "d": 32,
    "f": 33,
    "g": 34,
    "h": 35,
    "j": 36,
    "k": 37,
    "l": 38,
    ";": 39,
    "'": 40,
    "`": 41,
    "l_shift": 42,
    "\\": 43,
    "z": 44,
    "x": 45,
    "c": 46,
    "v": 47,
    "b": 48,
    "n": 49,
    "m": 50,
    ",": 51,
    ".": 52,
    "/": 53,
    "r_shift": 54,
    "*": 55,
    "l_alt": 56,
    "space": 57,
    "caps_lock": 58,
}


[docs]class XBOX_GamePad_TO_CODE(object): RIGHT_THUMBSTICK = 0x8000 LEFT_THUMBSTICK = 0x4000 RIGHT_SHOULDER = 0x2000 LEFT_SHOULDER = 0x1000 DPAD_RIGHT = 0x800 DPAD_LEFT = 0x400 DPAD_DOWN = 0x200 DPAD_UP = 0x100 Y = 0x80 X = 0x40 B = 0x20 A = 0x10 VIEW = 0x8 MENU = 0x4 NEXUS = 0x2 ENROLL = 0x1 NONE = 0
[docs]class Xbox(Device): """Xbox client.""" def __init__(self, ip, http_port=11443, game_project_name=None, is_record=True, file_manager: FileManager = None, xbox_gdk_bin="C:\\Program Files (x86)\\Microsoft GDK\\bin", **kwargs): super(Xbox, self).__init__() print("Initial Xbox") self.ip_address = ip self.http_port = http_port self.xbox_gdk_bin = xbox_gdk_bin self.input_websocket_url = f"wss://{ip}:{self.http_port}/ext/remoteinput" self.file_manager: FileManager = file_manager self.game_name = None self.game_project_name = None self.game_log_path = None self.game_package_full_name = None self.game_AUMID = None self.is_record = is_record self.recorder: XboxRecorder = None self.whole_video = "" self.playback_video = "" self.vide_list = [] self.screenshot_list = [] if game_project_name is not None: self.game_name = game_project_name self.game_project_name = game_project_name self.game_log_path = f"\\\\{ip}\\SystemScratch\\Logs\\{game_project_name}.log" self.__init_packagefullname_and_AUMID() if self.is_record is True: self.whole_video = self.__generate_filename(end_with=".mp4") print(f"whole video: {self.whole_video}") self.recorder: XboxRecorder = XboxRecorder(ip=ip, filename=self.whole_video) self.vide_list.append(self.whole_video) # init dll # TODO 下面目前是写死的,后面需要改 xbox_dll_path = os.path.join(os.getcwd(), "extras/Dll/Xbox") win_util.add_path_to_env(xbox_dll_path) # dll_file = os.path.join(os.path.join("D:/professionalSoftwareDatas/C++Projects/XboxDll/x64/Debug", "XboxDll.dll")) dll_file = os.path.join(os.getcwd(), "extras/Dll/Xbox/XboxDll.dll") self.cpp_dll = ctypes.windll.LoadLibrary(dll_file) self.__init_xbox_client() # init websocket for keyboard self.websocket_instance = websocket.WebSocket(sslopt={"cert_reqs": ssl.CERT_NONE}) self.websocket_instance.connect(self.input_websocket_url) print(f"Connect console {self.ip_address} successfully!") # init ocr and cv airtest_cv.init_airtest_cv() airtest_ocr.init_airtest_ocr() def __init_xbox_client(self): self.cpp_dll.Init_Xbox_Client.argtypes = [ctypes.c_char_p] xbox_ip_address = self.ip_address.encode("utf-8") res = self.cpp_dll.Init_Xbox_Client(xbox_ip_address) def __init_packagefullname_and_AUMID(self): out_str = win_util.run_command_sync("xbapp list", self.xbox_gdk_bin) _t = 0 for line in out_str.splitlines(): if self.game_project_name in line: line = line.replace('\n', '').replace('\t', '').replace(' ', '').strip() if _t == 0: self.game_package_full_name = line _t = 1 else: self.game_AUMID = line def __generate_filename(self, end_with=".txt"): if self.file_manager is None: return file_util.generate_filename(end_with) return self.file_manager.generate_filename(curr_dir="videos", end_with=end_with) def __save_data_to_file(self, data, end_with=".txt"): file_save_path = self.__generate_filename(end_with=end_with) with open(file_save_path, 'wb') as f: f.write(data) return file_save_path @property def get_game_name(self): return self.game_name @property def get_game_path(self): return None @property def get_game_log_path(self): print(f"get_game_log_path {self.game_log_path}") return self.game_log_path @property def get_screenshot_file_list(self): return self.screenshot_list @property def get_video_file_list(self): return self.vide_list """ process api """
[docs] def set_process_fronted(self): # xbox platform not need it pass
[docs] def check_process_alive(self) -> bool: out_str = win_util.run_command_sync(f"xbapp query {self.game_package_full_name}", self.xbox_gdk_bin) if "not registered" in out_str: print(f"{self.game_package_full_name} not registered!") return "running" in out_str
""" video api """
[docs] def playback_record(self): # TODO,没找到相关能够支持的api,可能需要自己搞 pass
[docs] def stop_record(self): if self.is_record is False or self.recorder is None: return self.recorder.stop_record_video()
""" keyboard api """ def _get_keycode(self, key): key_code = XBOX_KEY_DICT.get(key.lower(), None) if key_code is None: raise KeyboardError(f"The key {key} cannot found the key scancode!") return key_code
[docs] def key_press(self, key, duration=0.05, **kwargs): print(f"Key press: {key} Duration: {duration}") self.key_down(key) if duration is not None: time.sleep(duration) self.key_up(key)
[docs] def key_up(self, key, **kwargs): key_code = self._get_keycode(key) self.websocket_instance.send_binary(bytearray([0x02, key_code, 0x00]))
[docs] def key_down(self, key, **kwargs): key_code = self._get_keycode(key) self.websocket_instance.send_binary(bytearray([0x02, key_code, 0x01]))
[docs] def input_reset(self): self.websocket_instance.send_binary(bytearray([0x04]))
[docs] def key_write(self, text, duration=None, **kwargs): # TODO 这里是根据text来模拟按键输入,遇到特殊的字符需要在这里转换为按键 for c in text: if c == " ": self.key_press("space") else: self.key_press(c)
[docs] def input_console_command(self, command: str, platform: str = "unreal"): if platform == "unreal": self.key_press("`") self.key_write(command) time.sleep(0.5) self.key_press("Enter")
""" image identification api """
[docs] def img_exist(self, image: airtest_cv.Image, **kwargs): screenshot_bytes = self.screenshot_return_bytes() open_cv_image = airtest_cv.bytes_2_cv_img(screenshot_bytes) match_pos = image.match_in(open_cv_image) if match_pos is None: return False return match_pos
[docs] def img_assert_exists(self, image: airtest_cv.Image, **kwargs): pos = self.img_exist(image, **kwargs) if pos is False: msg = kwargs.get("msg", "") raise errors.AssertionError("%s does not exist in screen, message: %s" % (image, msg)) return pos
[docs] def img_assert_not_exists(self, image: airtest_cv.Image, **kwargs): pos = self.img_exist(image, **kwargs) if pos is not False: msg = kwargs.get("msg", "") raise errors.AssertionError("%s exists unexpectedly at pos: %s, message: %s" % (image, pos, msg)) return pos
""" ocr api """
[docs] def ocr_exist_sentence(self, word_list: list, **kwargs): screenshot_bytes = self.screenshot_return_bytes() open_cv_image = airtest_cv.bytes_2_cv_img(screenshot_bytes) return airtest_ocr.ocr_exist_from_cv(word_list, open_cv_image)
[docs] def ocr_assert_exist_sentence(self, word_list: list, **kwargs): pos = self.ocr_exist_sentence(word_list, **kwargs) if pos is False: sentence = ''.join(word_list) raise errors.TargetNotFoundError(f'OCR can not find the sentence {sentence} in screen') return pos
[docs] def ocr_exist_sentence_on_screen_rectangle(self, x1: int, y1: int, x2: int, y2: int, word_list: list, **kwargs): screenshot_bytes = self.screenshot_return_bytes() open_cv_image = airtest_cv.bytes_2_cv_img(screenshot_bytes) return airtest_ocr.ocr_exist_from_cv_on_rectangle(x1, y1, x2, y2, word_list, open_cv_image)
""" gamepad api """
[docs] def gamepad_button_down(self, button): self.cpp_dll.Button_Down(button)
[docs] def gamepad_button_up(self, button): self.cpp_dll.Button_Up(button)
[docs] def gamepad_button_press(self, button, duration=0.05): self.gamepad_button_down(button) if duration: time.sleep(duration) self.gamepad_button_up(button)
[docs] def gamepad_left_joystick_set(self, x_value: float, y_value: float, duration=0.05): self.cpp_dll.Left_Joystick.argtypes = [ctypes.c_short, ctypes.c_short] x_value_int, y_value_int = round(x_value * 32767), round(y_value * 32767) self.cpp_dll.Left_Joystick(ctypes.c_short(x_value_int), ctypes.c_short(y_value_int)) if duration: time.sleep(duration)
[docs] def gamepad_left_joystick_set_and_reset(self, x_value: float, y_value: float, duration=0.05): self.gamepad_left_joystick_set(x_value, y_value, duration) self.cpp_dll.Left_Joystick(ctypes.c_short(0), ctypes.c_short(0))
[docs] def gamepad_right_joystick_set(self, x_value: float, y_value: float, duration=0.05): self.cpp_dll.Right_Joystick.argtypes = [ctypes.c_short, ctypes.c_short] x_value_int, y_value_int = round(x_value * 32767), round(y_value * 32767) self.cpp_dll.Right_Joystick(ctypes.c_short(x_value_int), ctypes.c_short(y_value_int)) if duration: time.sleep(duration)
[docs] def gamepad_right_joystick_set_and_reset(self, x_value: float, y_value: float, duration=0.05): self.gamepad_right_joystick_set(x_value, y_value, duration) self.cpp_dll.Right_Joystick(ctypes.c_short(0), ctypes.c_short(0))
[docs] def gamepad_left_trigger_set(self, value: float, duration=0.05): value_int = int(value * 65535) self.cpp_dll.Left_Trigger(value_int) if duration: time.sleep(duration)
[docs] def gamepad_left_trigger_set_and_reset(self, value: float, duration=0.05): self.gamepad_left_trigger_set(value, duration) self.cpp_dll.Left_Trigger(0)
[docs] def gamepad_right_trigger_set(self, value, duration): value_int = int(value * 65535) self.cpp_dll.Right_Trigger(value_int) if duration: time.sleep(duration)
[docs] def gamepad_right_trigger_set_and_reset(self, value, duration): self.gamepad_right_trigger_set(value, duration) self.cpp_dll.Right_Trigger(0)
[docs] def screenshot(self) -> str: screenshot_bytes = self.screenshot_return_bytes() screenshot_filepath = self.__save_data_to_file(screenshot_bytes, end_with="") self.screenshot_list.append(screenshot_filepath) return screenshot_filepath
[docs] def screenshot_return_bytes(self) -> bytes: response = requests.get(f'https://{self.ip_address}:{self.http_port}/ext/screenshot', verify=False) return response.content
[docs] def wait_for_keyword(self, keyword, log_file_path, timeout=20, is_rematch=False, opfunc=None, **kwargs): if log_file_path is None: log_file_path = self.get_game_log_path return log_util.wait_for_keyword(log_file_path, keyword, timeout=timeout, is_rematch=is_rematch, opfunc=opfunc)
def __del__(self): print(f"Close cosole {self.ip_address} connection.") self.websocket_instance.close()
if __name__ == '__main__': xbox_ins = Xbox("10.227.72.44", game_project_name="ThirdPersonGameTemplate") # xbox_ins.input_console_command("bugit") # time.sleep(3) # xbox_ins.key_press("W", duration=3) # # time.sleep(3) # xbox_ins.input_console_command("bugit") # time.sleep(2) xbox_ins.key_press("A", duration=3) # # time.sleep(3) # xbox_ins.input_console_command("bugit") # time.sleep(2) # xbox_ins.key_press("S", duration=3) # # time.sleep(3) # xbox_ins.input_console_command("bugit") # time.sleep(2) # xbox_ins.key_press("D", duration=3) # # time.sleep(5) # xbox_ins.gamepad_button_press(XBOX_GamePad_TO_CODE.A, duration=0.05) # time.sleep(2) # res = xbox_ins.screenshot() # with open('screenshot.png', 'wb') as f: # f.write(res) # print(res) # print(type(res)) img_res = xbox_ins.img_exist(Template("light.png")) print(img_res) ocr_res = xbox_ins.ocr_exist_sentence(["Dev Home"]) print(ocr_res) xbox_ins.stop_record() time.sleep(10) res = xbox_ins.check_process_alive() print(res)