Opt: Add auto device detection and auto package detection, should be foolproof

- Add: Real-time server change
This commit is contained in:
LmeSzinc 2022-04-18 05:01:32 +08:00
parent 9da18a87ad
commit beabf938fc
13 changed files with 285 additions and 99 deletions

View File

@ -4,7 +4,6 @@ import traceback
import imageio
from PIL import ImageDraw
import module.config.server as server
from module.base.decorator import cached_property
from module.base.resource import Resource
from module.base.utils import *
@ -29,31 +28,53 @@ class Button(Resource):
button=(1562, 908, 1864, 1003)
)
"""
self.server = server.server
self.area = area[self.server] if isinstance(area, dict) else area
self.color = color[self.server] if isinstance(color, dict) else color
self._button = button[self.server] if isinstance(button, dict) else button
self.raw_area = area
self.raw_color = color
self.raw_button = button
self.raw_file = file
self.raw_name = name
self._button_offset = None
self._match_init = False
self.file = file[self.server] if isinstance(file, dict) else file
self.image = None
if self.file:
self.name = os.path.splitext(os.path.split(self.file)[1])[0]
elif name:
self.name = name
else:
(filename, line_number, function_name, text) = traceback.extract_stack()[-2]
self.name = text[:text.find('=')].strip()
if self.file:
self.is_gif = os.path.splitext(self.file)[1] == '.gif'
else:
self.is_gif = False
if self.file:
self.resource_add(key=self.file)
cached = ['area', 'color', '_button', 'file', 'name', 'is_gif']
@cached_property
def area(self):
return self.parse_property(self.raw_area)
@cached_property
def color(self):
return self.parse_property(self.raw_color)
@cached_property
def _button(self):
return self.parse_property(self.raw_button)
@cached_property
def file(self):
return self.parse_property(self.raw_file)
@cached_property
def name(self):
if self.raw_name:
return self.raw_name
elif self.file:
return os.path.splitext(os.path.split(self.file)[1])[0]
else:
return 'BUTTON'
@cached_property
def is_gif(self):
if self.file:
return os.path.splitext(self.file)[1] == '.gif'
else:
return False
def __str__(self):
return self.name
@ -101,9 +122,9 @@ class Button(Resource):
Returns:
tuple: Color (r, g, b).
"""
self.color = get_color(image, self.area)
self.__dict__['color'] = get_color(image, self.area)
self.image = crop(image, self.area)
self.is_gif = False
self.__dict__['is_gif'] = False
return self.color
def load_offset(self, button):
@ -136,6 +157,7 @@ class Button(Resource):
self._match_init = True
def resource_release(self):
super().resource_release()
self.image = None
self._match_init = False

View File

@ -1,6 +1,8 @@
import gc
import re
import gc
import module.config.server as server
from module.base.decorator import cached_property
from module.logger import logger
@ -52,13 +54,17 @@ _preserved_assets = PreservedAssets()
class Resource:
# Class property, record all button and templates
instances = {}
# Instance property, record cached properties of instance
cached = []
def resource_add(self, key):
Resource.instances[key] = self
def resource_release(self):
pass
for cache in self.cached:
del_cached_property(self, cache)
@classmethod
def is_loaded(cls, obj):
@ -76,6 +82,19 @@ class Resource:
continue
logger.info(f'{obj}: {key}')
def parse_property(self, data):
"""
Parse properties of Button or Template object input.
Such as `area`, `color` and `button`.
Args:
data: Dict or str
"""
if isinstance(data, dict):
return data[server.server]
else:
return data
def release_resources(next_task=''):
# Release all OCR models

View File

@ -2,7 +2,6 @@ import os
import imageio
import module.config.server as server
from module.base.button import Button
from module.base.decorator import cached_property
from module.base.resource import Resource
@ -16,14 +15,25 @@ class Template(Resource):
Args:
file (dict[str], str): Filepath of template file.
"""
self.server = server.server
self.file = file[self.server] if isinstance(file, dict) else file
self.name = os.path.splitext(os.path.basename(self.file))[0].upper()
self.is_gif = os.path.splitext(self.file)[1] == '.gif'
self.raw_file = file
self._image = None
self.resource_add(self.file)
cached = ['file', 'name', 'is_gif']
@cached_property
def file(self):
return self.parse_property(self.raw_file)
@cached_property
def name(self):
return os.path.splitext(os.path.basename(self.file))[0].upper()
@cached_property
def is_gif(self):
return os.path.splitext(self.file)[1] == '.gif'
@property
def image(self):
if self._image is None:
@ -51,6 +61,7 @@ class Template(Resource):
self._image = value
def resource_release(self):
super().resource_release()
self._image = None
def pre_process(self, image):

View File

@ -4,7 +4,9 @@ import module.config.server as server
class ManualConfig:
SERVER = server.server
@property
def SERVER(self):
return server.server
SCHEDULER_PRIORITY = """
Restart

View File

@ -3,3 +3,11 @@ This file stores server, such as 'cn', 'en'.
Use 'import module.config.server as server' to import, don't use 'from xxx import xxx'.
"""
server = 'cn' # Setting default to cn, will avoid errors when using dev_tools
def set_server(target):
global server
server = target
from module.base.resource import release_resources
release_resources()

View File

@ -29,20 +29,20 @@ class AppControl(Adb, WSA, Uiautomator2):
method = self.config.Emulator_ControlMethod
logger.info(f'App start: {package}')
if self.config.Emulator_Serial == 'wsa-0':
self.app_start_wsa(package, display=0)
self.app_start_wsa(display=0)
elif method == 'uiautomator2' or method == 'minitouch':
self.app_start_uiautomator2(package)
self.app_start_uiautomator2()
else:
self.app_start_adb(package)
self.app_start_adb()
def app_stop(self):
package = self.config.Emulator_PackageName
method = self.config.Emulator_ControlMethod
logger.info(f'App stop: {package}')
if method == 'uiautomator2' or method == 'minitouch':
self.app_stop_uiautomator2(package)
self.app_stop_uiautomator2()
else:
self.app_stop_adb(package)
self.app_stop_adb()
def dump_hierarchy(self) -> etree._Element:
"""

View File

@ -13,6 +13,7 @@ from deploy.utils import DEPLOY_CONFIG, poor_yaml_read
from module.base.decorator import cached_property
from module.base.utils import ensure_time
from module.config.config import AzurLaneConfig
from module.device.game_package import package_to_server
from module.device.method.utils import (del_cached_property, possible_reasons,
random_port, recv_all)
from module.exception import RequestHumanTakeover
@ -34,12 +35,23 @@ class Connection:
Args:
config (AzurLaneConfig, str): Name of the user config under ./config
"""
logger.hr('Device')
logger.hr('Device', level=1)
if isinstance(config, str):
self.config = AzurLaneConfig(config, task=None)
else:
self.config = config
# Init adb client
logger.attr('Adb_binary', self.adb_binary)
# Monkey patch to custom adb
adbutils.adb_path = lambda: self.adb_binary
# Remove global proxies, or uiautomator2 will go through it
for k in list(os.environ.keys()):
if k.lower().endswith('_proxy'):
del os.environ[k]
self.adb_client = AdbClient('127.0.0.1', 5037)
# Parse custom serial
self.serial = str(self.config.Emulator_Serial)
if "bluestacks4-hyperv" in self.serial:
self.serial = self.find_bluestacks4_hyperv(self.serial)
@ -56,20 +68,10 @@ class Connection:
with self.config.multi_set():
self.config.Emulator_ScreenshotMethod = 'uiautomator2'
self.config.Emulator_ControlMethod = 'uiautomator2'
self.detect_device()
logger.attr('Adb_binary', self.adb_binary)
# Monkey patch to custom adb
adbutils.adb_path = lambda: self.adb_binary
# Remove global proxies, or uiautomator2 will go through it
for k in list(os.environ.keys()):
if k.lower().endswith('_proxy'):
del os.environ[k]
self.adb_client = AdbClient('127.0.0.1', 5037)
# Connect
self.adb_connect(self.serial)
self.adb = AdbDevice(self.adb_client, self.serial)
logger.attr('Adb_device', self.adb)
@staticmethod
@ -170,6 +172,10 @@ class Connection:
file = 'adb.exe'
return file
@cached_property
def adb(self) -> AdbDevice:
return AdbDevice(self.adb_client, self.serial)
def adb_command(self, cmd, timeout=10):
"""
Execute ADB commands in a subprocess,
@ -385,7 +391,7 @@ class Connection:
possible_reasons('Serial incorrect, might be a typo')
raise RequestHumanTakeover
logger.warning(f'Failed to connect {serial} after 3 trial, assume connected')
self.show_devices()
self.detect_device()
return False
def adb_disconnect(self, serial):
@ -472,34 +478,118 @@ class Connection:
logger.attr('Device Orientation', f'{o} ({Connection._orientation_description.get(o, "Unknown")})')
return o
def show_devices(self):
def iter_device(self):
"""
Show all available devices on current computer.
Returns:
iter of AdbDevice
"""
logger.hr('Show devices')
class AdbDeviceWithStatus(AdbDevice):
def __init__(self, client: AdbClient, serial: str, status: str):
self.status = status
super().__init__(client, serial)
def __str__(self):
return f'AdbDevice({self.serial}, {self.status})'
__repr__ = __str__
with self.adb_client._connect() as c:
c.send_command("host:devices")
c.check_okay()
output = c.read_string_block()
for line in output.splitlines():
parts = line.strip().split("\t")
if len(parts) != 2:
continue
yield AdbDeviceWithStatus(self.adb_client, parts[0], parts[1])
def detect_device(self):
"""
Find available devices
If serial=='auto' and only 1 device detected, use it
"""
logger.hr('Detect device')
logger.info('Here are the available devices, '
'copy to Alas.Emulator.Serial to use it')
devices = list(self.adb_client.iter_device())
if devices:
for device in devices:
logger.info(device.serial)
else:
'copy to Alas.Emulator.Serial to use it or set Alas.Emulator.Serial="auto"')
devices = list(self.iter_device())
# Show available devices
available = [d for d in devices if d.status == 'device']
for device in available:
logger.info(device.serial)
if not len(available):
logger.info('No available devices')
def show_packages(self, keyword='azurlane'):
# Show unavailable devices if having any
unavailable = [d for d in devices if d.status != 'device']
if len(unavailable):
logger.info('Here are the devices detected but unavailable')
for device in unavailable:
logger.info(f'{device.serial} ({device.status})')
# Auto device detection
if self.config.Emulator_Serial == 'auto':
if len(devices) == 0:
logger.critical('No available device found, auto device detection cannot work, '
'please set an exact serial in Alas.Emulator.Serial instead of using "auto"')
raise RequestHumanTakeover
elif len(devices) == 1:
logger.info(f'Auto device detection found only one device, using it')
self.serial = devices[0].serial
del_cached_property(self, 'adb')
else:
logger.critical('Multiple devices found, auto device detection cannot decide which to choose, '
'please copy one of the available devices listed above to Alas.Emulator.Serial')
raise RequestHumanTakeover
def detect_package(self, keywords=('azurlane', 'blhx')):
"""
Show all possible packages with the given keyword on this device.
"""
logger.hr('Show packages')
logger.hr('Detect package')
logger.info('Fetching package list')
output = self.adb_shell(['pm', 'list', 'packages'])
packages = re.findall(r'package:([^\s]+)', output)
logger.info(f'Here are the available packages in device {self.serial}, '
f'copy to Alas.Emulator.PackageName to use it')
if not packages:
packages = []
packages = [p for p in packages if any([k in p for k in keywords])]
if packages:
# Show packages
logger.info(f'Here are the available packages in device "{self.serial}", '
f'copy to Alas.Emulator.PackageName to use it')
if len(packages):
for package in packages:
if keyword in package.lower():
logger.info(package)
logger.info(package)
else:
logger.info(f'No available package with keyword: {keyword}')
logger.info(f'No available packages on device "{self.serial}"')
# Auto package detection
if len(packages) == 0:
logger.critical(f'No {keywords[0]} package found, '
f'please confirm {keywords[0]} has been installed on device "{self.serial}"')
raise RequestHumanTakeover
if len(packages) == 1:
logger.info('Auto package detection found only one package, using it')
package = packages[0]
# Get server
server = package_to_server(package)
if server is not None:
logger.info(f'Package "{package}" is {keywords[0]} {server.upper()}, using it')
else:
logger.info(f'Package "{package}" might be {keywords[0]} CN channel, using it')
server = 'cn'
# Set config
with self.config.multi_set():
self.config.Emulator_PackageName = package
self.config.Emulator_Server = server
# Set server
logger.info('Server changed, release resources')
from module.config.server import set_server
set_server(server)
else:
logger.critical(
f'Multiple {keywords[0]} packages found, auto package detection cannot decide which to choose, '
'please copy one of the available devices listed above to Alas.Emulator.PackageName '
'and set Alas.Emulator.Server to the corresponding server')
raise RequestHumanTakeover

View File

@ -0,0 +1,10 @@
GAME_PACKAGE = {
'com.bilibili.azurlane': 'cn',
'com.YoStarEN.AzurLane': 'en',
'com.YoStarJP.AzurLane': 'jp',
'com.hkmanjuu.azurlane.gp': 'tw',
}
def package_to_server(package: str) -> str:
return GAME_PACKAGE.get(package, 'cn')

View File

@ -8,7 +8,7 @@ from lxml import etree
from module.device.connection import Connection
from module.device.method.utils import (RETRY_DELAY, RETRY_TRIES,
handle_adb_error, possible_reasons,
handle_adb_error, PackageNotInstalled,
recv_all)
from module.exception import RequestHumanTakeover, ScriptError
from module.logger import logger
@ -46,6 +46,12 @@ def retry(func):
self.adb_connect(self.serial)
else:
break
# Package not installed
except PackageNotInstalled as e:
logger.error(e)
def init():
self.detect_package()
# Unknown, probably a trucked image
except Exception as e:
logger.exception(e)
@ -179,7 +185,7 @@ class Adb(Connection):
raise OSError("Couldn't get focused app")
@retry
def app_start_adb(self, package_name, allow_failure=False):
def app_start_adb(self, package_name=None, allow_failure=False):
"""
Args:
package_name (str):
@ -188,6 +194,8 @@ class Adb(Connection):
Returns:
bool: If success to start
"""
if not package_name:
package_name = self.config.Emulator_PackageName
result = self.adb_shell([
'monkey', '-p', package_name, '-c',
'android.intent.category.LAUNCHER', '1'
@ -198,17 +206,17 @@ class Adb(Connection):
return False
else:
logger.error(result)
possible_reasons(f'"{package_name}" not found, please check setting Emulator.PackageName')
self.show_packages()
raise RequestHumanTakeover
raise PackageNotInstalled(package_name)
else:
# Events injected: 1
# ## Network stats: elapsed time=4ms (0ms mobile, 0ms wifi, 4ms not connected)
return True
@retry
def app_stop_adb(self, package_name):
def app_stop_adb(self, package_name=None):
""" Stop one application: am force-stop"""
if not package_name:
package_name = self.config.Emulator_PackageName
self.adb_shell(['am', 'force-stop', package_name])
@retry

View File

@ -139,7 +139,7 @@ class Hermit(Adb):
self.adb_shell(['input', 'keyevent', '3'])
# Switch back to AzurLane
self.app_start_adb(self.config.Emulator_PackageName)
self.app_start_adb()
def uninstall_hermit(self):
self.adb_command(['uninstall', self._hermit_package_name])

View File

@ -9,7 +9,7 @@ from module.base.decorator import cached_property
from module.base.utils import *
from module.device.connection import Connection
from module.device.method.utils import (RETRY_DELAY, RETRY_TRIES,
handle_adb_error, possible_reasons)
handle_adb_error, PackageNotInstalled, possible_reasons)
from module.exception import RequestHumanTakeover
from module.logger import logger
@ -74,6 +74,12 @@ def retry(func):
'please enable ADB in the settings of your emulator'
)
break
# Package not installed
except PackageNotInstalled as e:
logger.error(e)
def init():
self.detect_package()
# Unknown, probably a trucked image
except Exception as e:
logger.exception(e)
@ -194,18 +200,20 @@ class Uiautomator2(Connection):
return result['package']
@retry
def app_start_uiautomator2(self, package_name):
def app_start_uiautomator2(self, package_name=None):
if not package_name:
package_name = self.config.Emulator_PackageName
try:
self.u2.app_start(package_name)
except u2.exceptions.BaseError as e:
# BaseError: package "com.bilibili.azurlane" not found
logger.error(e)
possible_reasons(f'"{package_name}" not found, please check setting Emulator.PackageName')
self.show_packages()
raise RequestHumanTakeover
raise PackageNotInstalled(package_name)
@retry
def app_stop_uiautomator2(self, package_name):
def app_stop_uiautomator2(self, package_name=None):
if not package_name:
package_name = self.config.Emulator_PackageName
self.u2.app_stop(package_name)
@retry

View File

@ -76,6 +76,10 @@ def possible_reasons(*args):
logger.critical(f'Possible reason #{index}: {reason}')
class PackageNotInstalled(Exception):
pass
def handle_adb_error(e):
"""
Args:

View File

@ -5,7 +5,7 @@ from adbutils.errors import AdbError
from module.device.connection import Connection
from module.device.method.utils import (RETRY_DELAY, RETRY_TRIES,
handle_adb_error, possible_reasons)
handle_adb_error, PackageNotInstalled)
from module.exception import RequestHumanTakeover
from module.logger import logger
@ -42,6 +42,12 @@ def retry(func):
self.adb_connect(self.serial)
else:
break
# Package not installed
except PackageNotInstalled as e:
logger.error(e)
def init():
self.detect_package()
# Unknown, probably a trucked image
except Exception as e:
logger.exception(e)
@ -83,38 +89,37 @@ class WSA(Connection):
raise OSError("Couldn't get focused app")
@retry
def app_start_wsa(self, package_name, display, allow_failure=False):
def app_start_wsa(self, package_name=None, display=0):
"""
Args:
package_name (str):
display (int):
allow_failure (bool):
Returns:
bool: If success to start
"""
if not package_name:
package_name = self.config.Emulator_PackageName
self.adb_shell(['svc', 'power', 'stayon', 'true'])
activity_name = self.get_main_activity_name(package_name=package_name)
result = self.adb_shell(
['am', 'start', '--display', display, package_name +
'/' + activity_name])
if 'No activities found' in result:
# ** No activities found to run, monkey aborted.
if allow_failure:
return False
else:
logger.error(result)
possible_reasons(f'"{package_name}" not found, please check setting Emulator.PackageName')
self.show_packages()
raise RequestHumanTakeover
result = self.adb_shell(['am', 'start', '--display', display, f'{package_name}/{activity_name}'])
if 'Activity not started' in result or 'does not exist' in result:
# Starting: Intent { act=android.intent.action.MAIN cat=[android.intent.category.LAUNCHER] pkg=xxx }
# Error: Activity not started, unable to resolve Intent { act=android.intent.action.MAIN cat=[android.intent.category.LAUNCHER] flg=0x10000000 pkg=xxx }
# Starting: Intent { act=android.intent.action.MAIN cat=[android.intent.category.LAUNCHER] cmp=com.bilibili.azurlane/xxx }
# Error type 3
# Error: Activity class {com.bilibili.azurlane/com.manjuu.azurlane.MainAct} does not exist.
logger.error(result)
raise PackageNotInstalled(package_name)
else:
# Events injected: 1
# ## Network stats: elapsed time=4ms (0ms mobile, 0ms wifi, 4ms not connected)
# Starting: Intent { act=android.intent.action.MAIN cat=[android.intent.category.LAUNCHER] cmp=com.bilibili.azurlane/com.manjuu.azurlane.MainActivity }
return True
@retry
def get_main_activity_name(self, package_name):
def get_main_activity_name(self, package_name=None):
if not package_name:
package_name = self.config.Emulator_PackageName
try:
output = self.adb_shell(['dumpsys', 'package', package_name])
_activityRE = re.compile(
@ -124,8 +129,7 @@ class WSA(Connection):
ret = next(ms).group('activity')
return ret
except StopIteration:
self.show_packages()
raise RequestHumanTakeover("Couldn't get activity name, please check setting Emulator.PackageName")
raise PackageNotInstalled(package_name)
@retry
def get_display_id(self):