Add: [ALAS] Auto emulator start

This commit is contained in:
LmeSzinc 2024-03-28 12:45:36 +08:00
parent 6987d60c96
commit a144908e14
8 changed files with 557 additions and 491 deletions

166
deploy/Windows/utils.py Normal file
View File

@ -0,0 +1,166 @@
import os
import re
from dataclasses import dataclass
from typing import Callable, Generic, Iterable, TypeVar
T = TypeVar("T")
DEPLOY_CONFIG = './config/deploy.yaml'
DEPLOY_TEMPLATE = './deploy/Windows/template.yaml'
class cached_property(Generic[T]):
"""
cached-property from https://github.com/pydanny/cached-property
Add typing support
A property that is only computed once per instance and then replaces itself
with an ordinary attribute. Deleting the attribute resets the property.
Source: https://github.com/bottlepy/bottle/commit/fa7733e075da0d790d809aa3d2f53071897e6f76
"""
def __init__(self, func: Callable[..., T]):
self.func = func
def __get__(self, obj, cls) -> T:
if obj is None:
return self
value = obj.__dict__[self.func.__name__] = self.func(obj)
return value
def iter_folder(folder, is_dir=False, ext=None):
"""
Args:
folder (str):
is_dir (bool): True to iter directories only
ext (str): File extension, such as `.yaml`
Yields:
str: Absolute path of files
"""
for file in os.listdir(folder):
sub = os.path.join(folder, file)
if is_dir:
if os.path.isdir(sub):
yield sub.replace('\\\\', '/').replace('\\', '/')
elif ext is not None:
if not os.path.isdir(sub):
_, extension = os.path.splitext(file)
if extension == ext:
yield os.path.join(folder, file).replace('\\\\', '/').replace('\\', '/')
else:
yield os.path.join(folder, file).replace('\\\\', '/').replace('\\', '/')
def poor_yaml_read(file):
"""
Poor implementation to load yaml without pyyaml dependency, but with re
Args:
file (str):
Returns:
dict:
"""
if not os.path.exists(file):
return {}
data = {}
regex = re.compile(r'^(.*?):(.*?)$')
with open(file, 'r', encoding='utf-8') as f:
for line in f.readlines():
line = line.strip('\n\r\t ').replace('\\', '/')
if line.startswith('#'):
continue
result = re.match(regex, line)
if result:
k, v = result.group(1), result.group(2).strip('\n\r\t\' ')
if v:
if v.lower() == 'null':
v = None
elif v.lower() == 'false':
v = False
elif v.lower() == 'true':
v = True
elif v.isdigit():
v = int(v)
data[k] = v
return data
def poor_yaml_write(data, file, template_file=DEPLOY_TEMPLATE):
"""
Args:
data (dict):
file (str):
template_file (str):
"""
with open(template_file, 'r', encoding='utf-8') as f:
text = f.read().replace('\\', '/')
for key, value in data.items():
if value is None:
value = 'null'
elif value is True:
value = "true"
elif value is False:
value = "false"
text = re.sub(f'{key}:.*?\n', f'{key}: {value}\n', text)
with open(file, 'w', encoding='utf-8', newline='') as f:
f.write(text)
@dataclass
class DataProcessInfo:
proc: object # psutil.Process or psutil._pswindows.Process
pid: int
@cached_property
def name(self):
name = self.proc.name()
return name
@cached_property
def cmdline(self):
try:
cmdline = self.proc.cmdline()
except:
# psutil.AccessDenied
cmdline = []
cmdline = ' '.join(cmdline).replace(r'\\', '/').replace('\\', '/')
return cmdline
def __str__(self):
# Don't print `proc`, it will take some time to get process properties
return f'DataProcessInfo(name="{self.name}", pid={self.pid}, cmdline="{self.cmdline}")'
__repr__ = __str__
def iter_process() -> Iterable[DataProcessInfo]:
try:
import psutil
except ModuleNotFoundError:
return
if psutil.WINDOWS:
# Since this is a one-time-usage, we access psutil._psplatform.Process directly
# to bypass the call of psutil.Process.is_running().
# This only costs about 0.017s.
for pid in psutil.pids():
proc = psutil._psplatform.Process(pid)
yield DataProcessInfo(
proc=proc,
pid=proc.pid,
)
else:
# This will cost about 0.45s, even `attr` is given.
for proc in psutil.process_iter():
yield DataProcessInfo(
proc=proc,
pid=proc.pid,
)

View File

@ -7,16 +7,15 @@ from module.config.utils import get_server_next_update
from module.device.app_control import AppControl
from module.device.control import Control
from module.device.screenshot import Screenshot
from module.exception import (GameNotRunningError, GameStuckError,
GameTooManyClickError, RequestHumanTakeover)
from module.exception import (EmulatorNotRunningError, GameNotRunningError, GameStuckError, GameTooManyClickError,
RequestHumanTakeover)
from module.handler.assets import GET_MISSION
from module.logger import logger
if sys.platform == 'win32':
from module.device.emulator import EmulatorManager
from module.device.platform.platform_windows import PlatformWindows as Platform
else:
class EmulatorManager:
pass
from module.device.platform.platform_base import PlatformBase as Platform
def show_function_call():
@ -59,7 +58,7 @@ def show_function_call():
logger.info('Function calls:' + ''.join(func_list))
class Device(Screenshot, Control, AppControl, EmulatorManager):
class Device(Screenshot, Control, AppControl, Platform):
_screen_size_checked = False
detect_record = set()
click_record = collections.deque(maxlen=15)
@ -68,13 +67,27 @@ class Device(Screenshot, Control, AppControl, EmulatorManager):
stuck_long_wait_list = ['BATTLE_STATUS_S', 'PAUSE', 'LOGIN_CHECK']
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
for _ in range(2):
try:
super().__init__(*args, **kwargs)
break
except EmulatorNotRunningError:
# Try to start emulator
if self.emulator_instance is not None:
self.emulator_start()
else:
logger.critical(
f'No emulator with serial "{self.config.Emulator_Serial}" found, '
f'please set a correct serial'
)
raise
# Auto-fill emulator info
if self.config.EmulatorInfo_Emulator == 'auto':
_ = self.emulator_instance
self.screenshot_interval_set()
# Temp fix for MuMu 12 before DroidCast updated
if self.is_mumu_family:
logger.info('Patching screenshot method for mumu')
self.config.override(Emulator_ScreenshotMethod='ADB_nc')
# Auto-select the fastest screenshot method
if not self.config.is_template_config and self.config.Emulator_ScreenshotMethod == 'auto':
self.run_simple_screenshot_benchmark()

View File

@ -1,325 +0,0 @@
import os
import re
import winreg
import subprocess
from adbutils.errors import AdbError
from deploy.emulator import VirtualBoxEmulator
from module.base.decorator import cached_property
from module.device.connection import Connection
from module.device.method.utils import get_serial_pair
from module.exception import RequestHumanTakeover, EmulatorNotRunningError
from module.logger import logger
class EmulatorInstance(VirtualBoxEmulator):
def __init__(self, name, root_path, emu_path,
vbox_path=None, vbox_name=None, kill_para=None, multi_para=None):
"""
Args:
name (str): Emulator name in windows uninstall list.
root_path (str): Relative path from uninstall.exe to emulator installation folder.
emu_path (str): Relative path to executable simulator file.
vbox_path (str): Relative path to virtual box folder.
vbox_name (str): Regular Expression to match the name of .vbox file.
kill_para (str): Parameters required by kill emulator.
multi_para (str): Parameters required by start multi open emulator,
#id will be replaced with the real ID.
"""
super().__init__(
name=name,
root_path=root_path,
adb_path=None,
vbox_path=vbox_path,
vbox_name=vbox_name,
)
self.emu_path = emu_path
self.kill_para = kill_para
self.multi_para = multi_para
@cached_property
def id_and_serial(self):
"""
Returns:
list[str, str]: List of multi_id and serial.
"""
vbox = []
for path, folders, files in os.walk(os.path.join(self.root, self.vbox_path)):
for file in files:
if re.match(self.vbox_name, file):
file = os.path.join(path, file)
vbox.append(file)
serial = []
for file in vbox:
with open(file, 'r', encoding='utf-8', errors='ignore') as f:
for line in f.readlines():
# <Forwarding name="port2" proto="1" hostip="127.0.0.1" hostport="62026" guestport="5555"/>
res = re.search('<*?hostport="(.*?)".*?guestport="5555"/>', line)
if res:
serial.append([os.path.basename(file).split(".")[0], f'127.0.0.1:{res.group(1)}'])
return serial
class Bluestacks5Instance(EmulatorInstance):
@cached_property
def root(self):
try:
return super().root
except FileNotFoundError:
self.name = 'BlueStacks_nxt_cn'
return super().root
@cached_property
def id_and_serial(self):
try:
reg = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\BlueStacks_nxt")
except FileNotFoundError:
reg = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\BlueStacks_nxt_cn")
directory = winreg.QueryValueEx(reg, 'UserDefinedDir')[0]
with open(os.path.join(directory, 'bluestacks.conf'), encoding='utf-8') as f:
content = f.read()
emulators = re.findall(r'bst.instance.(\w+).status.adb_port="(\d+)"', content)
serial = []
for emulator in emulators:
serial.append([emulator[0], f'127.0.0.1:{emulator[1]}'])
return serial
class EmulatorManager(Connection):
pid = None
SUPPORTED_EMULATORS = {
'nox_player': EmulatorInstance(
name="Nox",
root_path=".",
emu_path="./Nox.exe",
vbox_path="./BignoxVMS",
vbox_name='.*.vbox$',
kill_para='-quit',
multi_para='-clone:#id',
),
'mumu_player': EmulatorInstance(
name="Nemu",
root_path=".",
emu_path="./EmulatorShell/NemuPlayer.exe",
vbox_path="./vms",
vbox_name='.*.nemu$',
),
'bluestacks_5': Bluestacks5Instance(
name='BlueStacks_nxt',
root_path='.',
emu_path='./HD-Player.exe',
multi_para='--instance #id',
),
}
def detect_emulator(self, serial, emulator=None):
"""
Args:
serial (str):
emulator (EmulatorInstance):
Returns:
list[EmulatorInstance, str]:Emulator and multi_id
"""
if emulator is None:
logger.info('Detect emulator from all emulators installed')
emulators = []
for emulator in self.SUPPORTED_EMULATORS.values():
try:
serials = emulator.id_and_serial
for cur_serial in serials:
if cur_serial[1] == serial:
emulators.append([emulator, cur_serial[0]])
except FileNotFoundError:
pass
logger.info('Detected emulators:')
for emulator in emulators:
logger.info(f'Name: {emulator[0].name}, Multi_id: {emulator[1]}')
if len(emulators) == 1 or \
(len(emulators) > 0 and emulators[0][0] == self.SUPPORTED_EMULATORS['mumu_player']):
logger.info('Find the only emulator, using it')
return emulators[0][0], emulators[0][1]
elif len(emulators) == 0:
logger.warning('The emulator corresponding to serial is not found, '
'please check the setting or use custom command')
else:
logger.warning('Multiple emulators with the same serial have been found, '
'please select one manually or use custom command')
raise RequestHumanTakeover
else:
try:
logger.info(f'Detect emulator from {emulator.name}')
serials = emulator.id_and_serial
for cur_serial in serials:
if cur_serial[1] == serial:
logger.info('Find the only emulator, using it')
return emulator, cur_serial[0]
except FileNotFoundError:
pass
logger.warning('The emulator corresponding to serial is not found, '
'please check the setting or use custom command')
raise RequestHumanTakeover
@staticmethod
def execute(command):
"""
Args:
command (str):
Returns:
subprocess.Popen:
"""
command = command.replace(r"\\", "/").replace("\\", "/").replace('"', '"')
logger.info(f'Execute: {command}')
return subprocess.Popen(command, close_fds=True) # only work on Windows
@staticmethod
def task_kill(pid=None, name=None):
"""
Args:
pid (list, int):
name (list, str):
Returns:
subprocess.Popen:
"""
command = 'taskkill '
if pid is not None:
if isinstance(pid, list):
for p in pid:
command += f'/pid {p} '
else:
command += f'/pid {pid} '
elif name is not None:
if isinstance(name, list):
for n in name:
command += f'/im {n} '
else:
command += f'/im {name} '
else:
raise RequestHumanTakeover
command += '/t /f'
return EmulatorManager.execute(command)
def adb_connect(self, serial):
try:
return super(EmulatorManager, self).adb_connect(serial)
except EmulatorNotRunningError:
raise RequestHumanTakeover
def detect_emulator_status(self, serial):
devices = self.list_device()
for device in devices:
if device.serial == serial:
return device.status
return 'offline'
def emulator_start(self, serial, emulator=None, multi_id=None, command=None):
"""
Args:
serial (str): Expected serial after simulator starts successfully.
emulator (EmulatorInstance): Emulator to start.
multi_id (str): Emulator ID used by multi open emulator.
command (str): Customized path and parameters of the simulator to start.
Return:
bool: If start successful.
"""
if command is None:
command = '\"' + os.path.abspath(os.path.join(emulator.root, emulator.emu_path)) + '\"'
if emulator.multi_para is not None and multi_id is not None:
command += " " + emulator.multi_para.replace("#id", multi_id)
logger.info('Start emulator')
pipe = self.execute(command)
self.pid = pipe.pid
self.sleep(10)
for _ in range(20):
if pipe.poll() is not None:
break
try:
if super().adb_connect(serial):
# Wait until emulator start completely
self.sleep(10)
return True
except EmulatorNotRunningError:
pass
self.sleep(5)
return False
def emulator_kill(self, serial, emulator=None, multi_id=None, command=None):
"""
Args:
serial (str): Expected serial after simulator starts successfully.
emulator (EmulatorInstance): Emulator to start.
multi_id (str): Emulator ID used by multi open emulator.
command (str): Customized path and parameters of the simulator to start.
Return:
bool: If kill successful.
"""
if command is None and emulator.kill_para is not None:
command = '\"' + os.path.abspath(os.path.join(emulator.root, emulator.emu_path)) + '\"'
if emulator.multi_para is not None and multi_id is not None:
command += " " + emulator.multi_para.replace("#id", multi_id)
command += " " + emulator.kill_para
logger.info('Kill emulator')
if emulator == self.SUPPORTED_EMULATORS['bluestacks_5']:
try:
self.adb_command(['reboot', '-p'], timeout=20)
if self.detect_emulator_status(serial) == 'offline':
self.pid = None
return True
except AdbError:
return False
if emulator == self.SUPPORTED_EMULATORS['mumu_player']:
self.task_kill(pid=None, name=['NemuHeadless.exe', 'NemuPlayer.exe', 'NemuSvc.exe'])
elif command is not None:
self.execute(command)
else:
self.task_kill(pid=self.pid, name=os.path.basename(emulator.emu_path))
self.sleep(5)
for _ in range(10):
if self.detect_emulator_status(serial) == 'offline':
self.pid = None
return True
self.sleep(2)
return False
def emulator_restart(self):
serial, _ = get_serial_pair(self.serial)
if serial is None:
serial = self.serial
if os.name != 'nt':
logger.warning('Restart simulator only works under Windows platform')
return False
logger.hr('Emulator restart')
if self.config.RestartEmulator_EmulatorType == 'auto':
emulator, multi_id = self.detect_emulator(serial)
else:
emulator = self.SUPPORTED_EMULATORS[self.config.RestartEmulator_EmulatorType]
emulator, multi_id = self.detect_emulator(serial, emulator=emulator)
for _ in range(3):
if not self.emulator_kill(serial, emulator, multi_id):
continue
if self.emulator_start(serial, emulator, multi_id):
return True
logger.warning('Restart emulator failed for 3 times, please check your settings')
raise RequestHumanTakeover

View File

@ -1,14 +1,41 @@
import os
import re
import typing as t
from dataclasses import dataclass
from deploy.utils import cached_property, iter_folder
from module.device.platform.utils import cached_property, iter_folder
def abspath(path):
return os.path.abspath(path).replace('\\', '/')
def get_serial_pair(serial):
"""
Args:
serial (str):
Returns:
str, str: `127.0.0.1:5555+{X}` and `emulator-5554+{X}`, 0 <= X <= 32
"""
if serial.startswith('127.0.0.1:'):
try:
port = int(serial[10:])
if 5555 <= port <= 5555 + 32:
return f'127.0.0.1:{port}', f'emulator-{port - 1}'
except (ValueError, IndexError):
pass
if serial.startswith('emulator-'):
try:
port = int(serial[9:])
if 5554 <= port <= 5554 + 32:
return f'127.0.0.1:{port + 1}', f'emulator-{port}'
except (ValueError, IndexError):
pass
return None, None
@dataclass
class EmulatorInstanceBase:
# Serial for adb connection
@ -52,22 +79,46 @@ class EmulatorInstanceBase:
def __bool__(self):
return True
@cached_property
def MuMuPlayer12_id(self):
"""
Convert MuMu 12 instance name to instance id.
Example names:
MuMuPlayer-12.0-3
YXArkNights-12.0-1
Returns:
int: Instance ID, or None if this is not a MuMu 12 instance
"""
res = re.search(r'MuMuPlayer-12.0-(\d+)', self.name)
if res:
return int(res.group(1))
res = re.search(r'YXArkNights-12.0-(\d+)', self.name)
if res:
return int(res.group(1))
return None
class EmulatorBase:
# Values here must match those in argument.yaml EmulatorInfo.Emulator.option
NoxPlayer = 'NoxPlayer'
NoxPlayer64 = 'NoxPlayer64'
NoxPlayerFamily = [NoxPlayer, NoxPlayer64]
BlueStacks4 = 'BlueStacks4'
BlueStacks5 = 'BlueStacks5'
BlueStacks4HyperV = 'BlueStacks4HyperV'
BlueStacks5HyperV = 'BlueStacks5HyperV'
BlueStacksFamily = [BlueStacks4, BlueStacks5]
LDPlayer3 = 'LDPlayer3'
LDPlayer4 = 'LDPlayer4'
LDPlayer9 = 'LDPlayer9'
LDPlayerFamily = [LDPlayer3, LDPlayer4, LDPlayer9]
MumuPlayer = 'MumuPlayer'
MumuPlayer9 = 'MumuPlayer9'
MumuPlayerFamily = [MumuPlayer, MumuPlayer9]
MemuPlayer = 'MemuPlayer'
MuMuPlayer = 'MuMuPlayer'
MuMuPlayerX = 'MuMuPlayerX'
MuMuPlayer12 = 'MuMuPlayer12'
MuMuPlayerFamily = [MuMuPlayer, MuMuPlayerX, MuMuPlayer12]
MEmuPlayer = 'MEmuPlayer'
@classmethod
def path_to_type(cls, path: str) -> str:
@ -81,12 +132,19 @@ class EmulatorBase:
"""
return ''
def iter_instances(self):
def iter_instances(self) -> t.Iterable[EmulatorInstanceBase]:
"""
Yields:
EmulatorInstance: Emulator instances found in this emulator
"""
return
pass
def iter_adb_binaries(self) -> t.Iterable[str]:
"""
Yields:
str: Filepath to adb binaries found in this emulator
"""
pass
def __init__(self, path):
# Path to .exe file
@ -143,10 +201,7 @@ class EmulatorBase:
list[str]:
"""
folder = self.abspath(folder)
try:
return list(iter_folder(folder, is_dir=is_dir, ext=ext))
except FileNotFoundError:
return []
return list(iter_folder(folder, is_dir=is_dir, ext=ext))
class EmulatorManagerBase:
@ -163,3 +218,30 @@ class EmulatorManagerBase:
Get all emulator instances installed on current computer.
"""
return []
@cached_property
def all_emulator_serials(self) -> t.List[str]:
"""
Returns:
list[str]: All possible serials on current computer.
"""
out = []
for emulator in self.all_emulator_instances:
out.append(emulator.serial)
# Also add serial like `emulator-5554`
port_serial, emu_serial = get_serial_pair(emulator.serial)
if emu_serial:
out.append(emu_serial)
return out
@cached_property
def all_adb_binaries(self) -> t.List[str]:
"""
Returns:
list[str]: All adb binaries of emulators on current computer.
"""
out = []
for emulator in self.all_emulators:
for exe in emulator.iter_adb_binaries():
out.append(exe)
return out

View File

@ -5,11 +5,11 @@ import typing as t
import winreg
from dataclasses import dataclass
import psutil
from module.base.decorator import cached_property
from module.config.utils import iter_folder
# module/device/platform/emulator_base.py
# module/device/platform/emulator_windows.py
# Will be used in Alas Easy Install, they shouldn't import any Alas modules.
from module.device.platform.emulator_base import EmulatorBase, EmulatorInstanceBase, EmulatorManagerBase
from module.device.platform.utils import cached_property, iter_folder
@dataclass
@ -118,13 +118,15 @@ class Emulator(EmulatorBase):
return cls.LDPlayer3
if exe == 'NemuPlayer.exe':
if dir2 == 'nemu':
return cls.MumuPlayer
return cls.MuMuPlayer
elif dir2 == 'nemu9':
return cls.MumuPlayer9
return cls.MuMuPlayerX
else:
return cls.MumuPlayer
return cls.MuMuPlayer
if exe == 'MuMuPlayer.exe':
return cls.MuMuPlayer12
if exe == 'MEmu.exe':
return cls.MemuPlayer
return cls.MEmuPlayer
return ''
@ -148,6 +150,8 @@ class Emulator(EmulatorBase):
yield exe.replace('dnmultiplayer.exe', 'dnplayer.exe')
elif 'NemuMultiPlayer.exe' in exe:
yield exe.replace('NemuMultiPlayer.exe', 'NemuPlayer.exe')
elif 'MuMuMultiPlayer.exe' in exe:
yield exe.replace('MuMuMultiPlayer.exe', 'MuMuManager.exe')
elif 'MEmuConsole.exe' in exe:
yield exe.replace('MEmuConsole.exe', 'MEmu.exe')
else:
@ -250,14 +254,14 @@ class Emulator(EmulatorBase):
name=folder,
path=self.path
)
elif self == Emulator.MumuPlayer:
elif self == Emulator.MuMuPlayer:
# MuMu has no multi instances, on 7555 only
yield EmulatorInstance(
serial='127.0.0.1:7555',
name='',
path=self.path,
)
elif self == Emulator.MumuPlayer9:
elif self == Emulator.MuMuPlayerX:
# vms/nemu-12.0-x64-default
for folder in self.list_folder('../vms', is_dir=True):
for file in iter_folder(folder, ext='.nemu'):
@ -268,7 +272,18 @@ class Emulator(EmulatorBase):
name=os.path.basename(folder),
path=self.path,
)
elif self == Emulator.MemuPlayer:
elif self == Emulator.MuMuPlayer12:
# vms/MuMuPlayer-12.0-0
for folder in self.list_folder('../vms', is_dir=True):
for file in iter_folder(folder, ext='.nemu'):
serial = Emulator.vbox_file_to_serial(file)
if serial:
yield EmulatorInstance(
serial=serial,
name=os.path.basename(folder),
path=self.path,
)
elif self == Emulator.MEmuPlayer:
# ./MemuHyperv VMs/{name}/{name}.memu
for folder in self.list_folder('./MemuHyperv VMs', is_dir=True):
for file in iter_folder(folder, ext='.memu'):
@ -280,6 +295,27 @@ class Emulator(EmulatorBase):
path=self.path,
)
def iter_adb_binaries(self) -> t.Iterable[str]:
"""
Yields:
str: Filepath to adb binaries found in this emulator
"""
if self == Emulator.NoxPlayerFamily:
exe = self.abspath('./nox_adb.exe')
if os.path.exists(exe):
yield exe
if self == Emulator.MuMuPlayerFamily:
# From MuMu9\emulator\nemu9\EmulatorShell
# to MuMu9\emulator\nemu9\vmonitor\bin\adb_server.exe
exe = self.abspath('../vmonitor/bin/adb_server.exe')
if os.path.exists(exe):
yield exe
# All emulators have adb.exe
exe = self.abspath('./adb.exe')
if os.path.exists(exe):
yield exe
class EmulatorManager(EmulatorManagerBase):
@staticmethod
@ -294,17 +330,26 @@ class EmulatorManager(EmulatorManagerBase):
path = r'Software\Microsoft\Windows\CurrentVersion\Explorer\UserAssist'
# {XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX}\xxx.exe
regex_hash = re.compile(r'{.*}')
with winreg.OpenKey(winreg.HKEY_CURRENT_USER, path) as reg:
folders = list_key(reg)
try:
with winreg.OpenKey(winreg.HKEY_CURRENT_USER, path) as reg:
folders = list_key(reg)
except FileNotFoundError:
return
for folder in folders:
with winreg.OpenKey(winreg.HKEY_CURRENT_USER, f'{path}\\{folder}\\Count') as reg:
for key in list_reg(reg):
key = codecs.decode(key.name, 'rot-13')
# Skip those with hash
if regex_hash.search(key):
continue
for file in Emulator.multi_to_single(key):
yield file
try:
with winreg.OpenKey(winreg.HKEY_CURRENT_USER, f'{path}\\{folder}\\Count') as reg:
for key in list_reg(reg):
key = codecs.decode(key.name, 'rot-13')
# Skip those with hash
if regex_hash.search(key):
continue
for file in Emulator.multi_to_single(key):
yield file
except FileNotFoundError:
# FileNotFoundError: [WinError 2] 系统找不到指定的文件。
# Might be a random directory without "Count" subdirectory
continue
@staticmethod
def iter_mui_cache():
@ -317,8 +362,11 @@ class EmulatorManager(EmulatorManagerBase):
str: Path to emulator executable, may contains duplicate values
"""
path = r'Software\Classes\Local Settings\Software\Microsoft\Windows\Shell\MuiCache'
with winreg.OpenKey(winreg.HKEY_CURRENT_USER, path) as reg:
rows = list_reg(reg)
try:
with winreg.OpenKey(winreg.HKEY_CURRENT_USER, path) as reg:
rows = list_reg(reg)
except FileNotFoundError:
return
regex = re.compile(r'(^.*\.exe)\.')
for row in rows:
@ -380,36 +428,32 @@ class EmulatorManager(EmulatorManagerBase):
'leidian9',
'Nemu',
'Nemu9',
'MuMuPlayer-12.0'
'MEmu',
]
for path in known_uninstall_registry_path:
with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, path) as reg:
for software in list_key(reg):
if software not in known_emulator_registry_name:
continue
try:
with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, path) as reg:
software_list = list_key(reg)
except FileNotFoundError:
continue
for software in software_list:
if software not in known_emulator_registry_name:
continue
try:
with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, f'{path}\\{software}') as software_reg:
try:
uninstall = winreg.QueryValueEx(software_reg, 'UninstallString')[0]
except FileNotFoundError:
continue
if not uninstall:
continue
# UninstallString is like:
# C:\Program Files\BlueStacks_nxt\BlueStacksUninstaller.exe -tmp
# "E:\ProgramFiles\Microvirt\MEmu\uninstall\uninstall.exe" -u
# Extract path in ""
res = re.search('"(.*?)"', uninstall)
uninstall = res.group(1) if res else uninstall
yield uninstall
@staticmethod
def iter_running_emulator() -> t.Iterable[psutil.Process]:
"""
This may cost some time.
"""
for proc in psutil.process_iter():
if Emulator.is_emulator(str(proc.name())):
yield proc
uninstall = winreg.QueryValueEx(software_reg, 'UninstallString')[0]
except FileNotFoundError:
continue
if not uninstall:
continue
# UninstallString is like:
# C:\Program Files\BlueStacks_nxt\BlueStacksUninstaller.exe -tmp
# "E:\ProgramFiles\Microvirt\MEmu\uninstall\uninstall.exe" -u
# Extract path in ""
res = re.search('"(.*?)"', uninstall)
uninstall = res.group(1) if res else uninstall
yield uninstall
@cached_property
def all_emulators(self) -> t.List[Emulator]:
@ -448,11 +492,9 @@ class EmulatorManager(EmulatorManagerBase):
if Emulator.is_emulator(file) and os.path.exists(file):
exe.add(file)
# MuMu specific directory
folder = abspath(os.path.join(os.path.dirname(uninstall), 'EmulatorShell'))
if os.path.exists(folder):
for file in iter_folder(folder, ext='.exe'):
if Emulator.is_emulator(file) and os.path.exists(file):
exe.add(file)
for file in iter_folder(abspath(os.path.join(os.path.dirname(uninstall), 'EmulatorShell')), ext='.exe'):
if Emulator.is_emulator(file) and os.path.exists(file):
exe.add(file)
exe = [Emulator(path).path for path in exe if Emulator.is_emulator(path)]
exe = sorted(set(exe))

View File

@ -1,23 +1,23 @@
import sys
import typing as t
import yaml
from pydantic import BaseModel, SecretStr
from pydantic import BaseModel
from module.base.decorator import cached_property, del_cached_property
from module.device.connection import Connection
from module.device.platform.emulator_base import EmulatorInstanceBase, EmulatorManagerBase
from module.logger import logger
from module.map.map_grids import SelectedGrids
from module.base.decorator import cached_property, del_cached_property
class EmulatorData(BaseModel):
class EmulatorInfo(BaseModel):
emulator: str = ''
name: str = ''
path: str = ''
# For APIs of chinac.com, a phone cloud platform.
access_key: SecretStr = ''
secret: SecretStr = ''
# access_key: SecretStr = ''
# secret: SecretStr = ''
class PlatformBase(Connection, EmulatorManagerBase):
@ -36,23 +36,25 @@ class PlatformBase(Connection, EmulatorManagerBase):
- Retry is required.
- Using bored sleep to wait startup is forbidden.
"""
pass
logger.info(f'Current platform {sys.platform} does not support emulator_start, skip')
def emulator_stop(self):
"""
Stop a emulator.
"""
pass
logger.info(f'Current platform {sys.platform} does not support emulator_stop, skip')
@cached_property
def emulator_data(self) -> EmulatorData:
try:
data = yaml.safe_load(self.config.RestartEmulator_EmulatorData)
return EmulatorData(**data)
except Exception as e:
logger.error(e)
logger.error("Failed to load EmulatorData, no emulator_instance")
return EmulatorData()
def emulator_info(self) -> EmulatorInfo:
emulator = self.config.EmulatorInfo_Emulator
name = str(self.config.EmulatorInfo_name).strip().replace('\n', '')
path = str(self.config.EmulatorInfo_path).strip().replace('\n', '')
return EmulatorInfo(
emulator=emulator,
name=name,
path=path,
)
@cached_property
def emulator_instance(self) -> t.Optional[EmulatorInstanceBase]:
@ -60,7 +62,7 @@ class PlatformBase(Connection, EmulatorManagerBase):
Returns:
EmulatorInstanceBase: Emulator instance or None
"""
data = self.emulator_data
data = self.emulator_info
old_info = dict(
emulator=data.emulator,
path=data.path,
@ -74,14 +76,18 @@ class PlatformBase(Connection, EmulatorManagerBase):
)
# Write complete emulator data
new_info = dict(
emulator=instance.type,
path=instance.path,
name=instance.name,
)
if new_info != old_info:
self.config.RestartEmulator_EmulatorData = yaml.safe_dump(new_info).strip()
del_cached_property(self, 'emulator_data')
if instance is not None:
new_info = dict(
emulator=instance.type,
path=instance.path,
name=instance.name,
)
if new_info != old_info:
with self.config.multi_set():
self.config.EmulatorInfo_Emulator = instance.type
self.config.EmulatorInfo_name = instance.name
self.config.EmulatorInfo_path = instance.path
del_cached_property(self, 'emulator_info')
return instance
@ -102,7 +108,7 @@ class PlatformBase(Connection, EmulatorManagerBase):
Returns:
EmulatorInstance: Emulator instance or None if no instances not found.
"""
logger.hr('Find emulator instance')
logger.hr('Find emulator instance', level=2)
instances = SelectedGrids(self.all_emulator_instances)
for instance in instances:
logger.info(instance)
@ -115,6 +121,7 @@ class PlatformBase(Connection, EmulatorManagerBase):
return None
if select.count == 1:
instance = select[0]
logger.hr('Emulator instance', level=2)
logger.info(f'Found emulator instance: {instance}')
return instance
@ -127,6 +134,7 @@ class PlatformBase(Connection, EmulatorManagerBase):
return None
if select.count == 1:
instance = select[0]
logger.hr('Emulator instance', level=2)
logger.info(f'Found emulator instance: {instance}')
return instance
@ -139,6 +147,7 @@ class PlatformBase(Connection, EmulatorManagerBase):
return None
if select.count == 1:
instance = select[0]
logger.hr('Emulator instance', level=2)
logger.info(f'Found emulator instance: {instance}')
return instance
@ -151,15 +160,10 @@ class PlatformBase(Connection, EmulatorManagerBase):
return None
if select.count == 1:
instance = select[0]
logger.hr('Emulator instance', level=2)
logger.info(f'Found emulator instance: {instance}')
return instance
# Still too many instances
logger.warning(f'Found multiple emulator instances with {search_args}')
return None
if __name__ == '__main__':
self = PlatformBase('alas')
d = self.emulator_instance
print(d)

View File

@ -1,14 +1,15 @@
import ctypes
import re
import subprocess
import typing as t
import psutil
from deploy.Windows.utils import DataProcessInfo
from module.base.decorator import run_once
from module.base.timer import Timer
from module.device.connection import AdbDeviceWithStatus
from module.device.platform.platform_base import PlatformBase
from module.device.platform.windows_emulator import Emulator, EmulatorInstance, EmulatorManager
from module.device.platform.emulator_windows import Emulator, EmulatorInstance, EmulatorManager
from module.logger import logger
@ -30,11 +31,11 @@ def minimize_window(hwnd):
def get_window_title(hwnd):
"""Returns the window title as a string."""
textLenInCharacters = ctypes.windll.user32.GetWindowTextLengthW(hwnd)
stringBuffer = ctypes.create_unicode_buffer(
textLenInCharacters + 1) # +1 for the \0 at the end of the null-terminated string.
ctypes.windll.user32.GetWindowTextW(hwnd, stringBuffer, textLenInCharacters + 1)
return stringBuffer.value
text_len_in_characters = ctypes.windll.user32.GetWindowTextLengthW(hwnd)
string_buffer = ctypes.create_unicode_buffer(
text_len_in_characters + 1) # +1 for the \0 at the end of the null-terminated string.
ctypes.windll.user32.GetWindowTextW(hwnd, string_buffer, text_len_in_characters + 1)
return string_buffer.value
def flash_window(hwnd, flash=True):
@ -42,8 +43,8 @@ def flash_window(hwnd, flash=True):
class PlatformWindows(PlatformBase, EmulatorManager):
@staticmethod
def execute(command):
@classmethod
def execute(cls, command):
"""
Args:
command (str):
@ -55,67 +56,53 @@ class PlatformWindows(PlatformBase, EmulatorManager):
logger.info(f'Execute: {command}')
return subprocess.Popen(command, close_fds=True) # only work on Windows
@staticmethod
def taskkill(process):
@classmethod
def kill_process_by_regex(cls, regex: str) -> int:
"""
Args:
process (str, list[str]): Process name or a list of them
Returns:
subprocess.Popen:
"""
if not isinstance(process, list):
process = [process]
return self.execute(f'taskkill /t /f /im ' + ''.join(process))
@staticmethod
def find_running_emulator(instance: EmulatorInstance) -> t.Optional[psutil.Process]:
for proc in EmulatorManager.iter_running_emulator():
cmdline = [arg.replace('\\', '/').replace(r'\\', '/') for arg in proc.cmdline()]
cmdline = ' '.join(cmdline)
if instance.path in cmdline and instance.name in cmdline:
return proc
logger.warning(f'Cannot find a running emulator process with path={instance.path}, name={instance.name}')
return None
def emulator_kill_by_process(self, instance: EmulatorInstance) -> bool:
"""
Kill a emulator by finding its process.
Kill processes with cmdline match the given regex.
Args:
instance:
regex:
Returns:
bool: If success
int: Number of processes killed
"""
proc = self.find_running_emulator(instance)
if proc is not None:
proc.kill()
return True
else:
return False
count = 0
for proc in psutil.process_iter():
cmdline = DataProcessInfo(proc=proc, pid=proc.pid).cmdline
if re.search(regex, cmdline):
logger.info(f'Kill emulator: {cmdline}')
proc.kill()
count += 1
return count
def _emulator_start(self, instance: EmulatorInstance):
"""
Start a emulator without error handling
"""
exe = instance.emulator.path
if instance == Emulator.MumuPlayer:
if instance == Emulator.MuMuPlayer:
# NemuPlayer.exe
self.execute(exe)
if instance == Emulator.MumuPlayer9:
elif instance == Emulator.MuMuPlayerX:
# NemuPlayer.exe -m nemu-12.0-x64-default
self.execute(f'{exe} -m {instance.name}')
self.execute(f'"{exe}" -m {instance.name}')
elif instance == Emulator.MuMuPlayer12:
# MuMuPlayer.exe -v 0
if instance.MuMuPlayer12_id is None:
logger.warning(f'Cannot get MuMu instance index from name {instance.name}')
self.execute(f'"{exe}" -v {instance.MuMuPlayer12_id}')
elif instance == Emulator.NoxPlayerFamily:
# Nox.exe -clone:Nox_1
self.execute(f'{exe} -clone:{instance.name}')
self.execute(f'"{exe}" -clone:{instance.name}')
elif instance == Emulator.BlueStacks5:
# HD-Player.exe -instance Pie64
self.execute(f'{exe} -instance {instance.name}')
self.execute(f'"{exe}" -instance {instance.name}')
elif instance == Emulator.BlueStacks4:
# BlueStacks\Client\Bluestacks.exe -vmname Android_1
self.execute(f'{exe} -vmname {instance.name}')
self.execute(f'"{exe}" -vmname {instance.name}')
else:
raise EmulatorUnknown(f'Cannot start an unknown emulator instance: {instance}')
@ -123,16 +110,53 @@ class PlatformWindows(PlatformBase, EmulatorManager):
"""
Stop a emulator without error handling
"""
logger.hr('Emulator stop', level=2)
exe = instance.emulator.path
if instance == Emulator.MumuPlayer:
# taskkill /t /f /im NemuHeadless.exe NemuPlayer.exe NemuSvc.exe
self.taskkill(['NemuHeadless.exe', 'NemuPlayer.exe', 'NemuSvc.exe'])
elif instance == Emulator.MumuPlayer9:
# Kill by process
self.emulator_kill_by_process(instance)
if instance == Emulator.MuMuPlayer:
# MuMu6 does not have multi instance, kill one means kill all
# Has 4 processes
# "C:\Program Files\NemuVbox\Hypervisor\NemuHeadless.exe" --comment nemu-6.0-x64-default --startvm
# "E:\ProgramFiles\MuMu\emulator\nemu\EmulatorShell\NemuPlayer.exe"
# E:\ProgramFiles\MuMu\emulator\nemu\EmulatorShell\NemuService.exe
# "C:\Program Files\NemuVbox\Hypervisor\NemuSVC.exe" -Embedding
self.kill_process_by_regex(
rf'('
rf'NemuHeadless.exe'
rf'|NemuPlayer.exe\"'
rf'|NemuPlayer.exe$'
rf'|NemuService.exe'
rf'|NemuSVC.exe'
rf')'
)
elif instance == Emulator.MuMuPlayerX:
# MuMu X has 3 processes
# "E:\ProgramFiles\MuMu9\emulator\nemu9\EmulatorShell\NemuPlayer.exe" -m nemu-12.0-x64-default -s 0 -l
# "C:\Program Files\Muvm6Vbox\Hypervisor\Muvm6Headless.exe" --comment nemu-12.0-x64-default --startvm xxx
# "C:\Program Files\Muvm6Vbox\Hypervisor\Muvm6SVC.exe" --Embedding
self.kill_process_by_regex(
rf'('
rf'NemuPlayer.exe.*-m {instance.name}'
rf'|Muvm6Headless.exe'
rf'|Muvm6SVC.exe'
rf')'
)
elif instance == Emulator.MuMuPlayer12:
# MuMu 12 has 2 processes:
# E:\ProgramFiles\Netease\MuMuPlayer-12.0\shell\MuMuPlayer.exe -v 0
# "C:\Program Files\MuMuVMMVbox\Hypervisor\MuMuVMMHeadless.exe" --comment MuMuPlayer-12.0-0 --startvm xxx
if instance.MuMuPlayer12_id is None:
logger.warning(f'Cannot get MuMu instance index from name {instance.name}')
self.kill_process_by_regex(
rf'('
rf'MuMuVMMHeadless.exe.*--comment {instance.name}'
rf'|MuMuPlayer.exe.*-v {instance.MuMuPlayer12_id}'
rf')'
)
# There is also a shared service, no need to kill it
# "C:\Program Files\MuMuVMMVbox\Hypervisor\MuMuVMMSVC.exe" --Embedding
elif instance == Emulator.NoxPlayerFamily:
# Nox.exe -clone:Nox_1 -quit
self.execute(f'{exe} -clone:{instance.name} -quit')
self.execute(f'"{exe}" -clone:{instance.name} -quit')
else:
raise EmulatorUnknown(f'Cannot stop an unknown emulator instance: {instance}')
@ -152,8 +176,10 @@ class PlatformWindows(PlatformBase, EmulatorManager):
# OSError: [WinError 740] 请求的操作需要提升。
if 'WinError 740' in msg:
logger.error('To start/stop MumuAppPlayer, ALAS needs to be run as administrator')
except Exception as e:
except EmulatorUnknown as e:
logger.error(e)
except Exception as e:
logger.exception(e)
logger.error(f'Emulator function {func.__name__}() failed')
return False
@ -164,6 +190,7 @@ class PlatformWindows(PlatformBase, EmulatorManager):
bool: True if startup completed
False if timeout
"""
logger.hr('Emulator start', level=2)
current_window = get_focused_window()
serial = self.emulator_instance.serial
logger.info(f'Current window: {current_window}')
@ -263,6 +290,7 @@ class PlatformWindows(PlatformBase, EmulatorManager):
return True
def emulator_start(self):
logger.hr('Emulator start', level=1)
for _ in range(3):
# Stop
if not self._emulator_function_wrapper(self._emulator_stop):
@ -283,9 +311,11 @@ class PlatformWindows(PlatformBase, EmulatorManager):
return False
def emulator_stop(self):
logger.hr('Emulator stop', level=1)
return self._emulator_function_wrapper(self._emulator_stop)
if __name__ == '__main__':
self = PlatformWindows('alas')
self.emulator_start()
d = self.emulator_instance
print(d)

View File

@ -0,0 +1,54 @@
import os
from typing import Callable, Generic, TypeVar
T = TypeVar("T")
class cached_property(Generic[T]):
"""
cached-property from https://github.com/pydanny/cached-property
Add typing support
A property that is only computed once per instance and then replaces itself
with an ordinary attribute. Deleting the attribute resets the property.
Source: https://github.com/bottlepy/bottle/commit/fa7733e075da0d790d809aa3d2f53071897e6f76
"""
def __init__(self, func: Callable[..., T]):
self.func = func
def __get__(self, obj, cls) -> T:
if obj is None:
return self
value = obj.__dict__[self.func.__name__] = self.func(obj)
return value
def iter_folder(folder, is_dir=False, ext=None):
"""
Args:
folder (str):
is_dir (bool): True to iter directories only
ext (str): File extension, such as `.yaml`
Yields:
str: Absolute path of files
"""
try:
files = os.listdir(folder)
except FileNotFoundError:
return
for file in files:
sub = os.path.join(folder, file)
if is_dir:
if os.path.isdir(sub):
yield sub.replace('\\\\', '/').replace('\\', '/')
elif ext is not None:
if not os.path.isdir(sub):
_, extension = os.path.splitext(file)
if extension == ext:
yield os.path.join(folder, file).replace('\\\\', '/').replace('\\', '/')
else:
yield os.path.join(folder, file).replace('\\\\', '/').replace('\\', '/')