Refactor: Split out ConnectionAttr and refactor minitouch command builder

- Upgrade uiautomator2 to 2.16.17 to fix bugs
- Use numpy.frombuffer() since numpy.fromstring() is deprecated
This commit is contained in:
LmeSzinc 2022-06-26 00:58:51 +08:00
parent 1b65a919d5
commit 620e6510bc
10 changed files with 344 additions and 186 deletions

View File

@ -5,7 +5,7 @@ imageio
lz4
tqdm
adbutils==0.11.0
uiautomator2==2.16.7
uiautomator2==2.16.17
retrying
mxnet==1.6.0
cnocr==1.2.2

View File

@ -6,7 +6,7 @@ imageio
lz4
tqdm
adbutils==0.11.0
uiautomator2==2.16.7
uiautomator2==2.16.17
retrying
mxnet==1.6.0
cnocr==1.2.2

View File

@ -12,7 +12,7 @@ class AppControl(Adb, WSA, Uiautomator2):
def app_is_running(self) -> bool:
method = self.config.Emulator_ControlMethod
if 'wsa' in self.config.Emulator_Serial:
if self.is_wsa:
package = self.app_current_wsa()
elif method == 'uiautomator2' or method == 'minitouch':
package = self.app_current_uiautomator2()

View File

@ -1,23 +1,20 @@
import ipaddress
import logging
import os
import platform
import re
import socket
import subprocess
import time
import ipaddress
import platform
from functools import wraps
import adbutils
import uiautomator2 as u2
from adbutils import AdbClient, AdbDevice, AdbTimeout, ForwardItem, ReverseItem
from adbutils.errors import AdbError
from deploy.utils import DEPLOY_CONFIG, poor_yaml_read
from module.base.decorator import cached_property
from module.base.utils import ensure_time
from module.config.config import AzurLaneConfig
from module.config.server import set_server
from module.device.connection_attr import ConnectionAttr
from module.device.method.utils import (RETRY_DELAY, RETRY_TRIES,
handle_adb_error, PackageNotInstalled,
recv_all, del_cached_property, possible_reasons,
@ -76,40 +73,13 @@ def retry(func):
return retry_wrapper
class Connection:
config: AzurLaneConfig
serial: str
adb_binary_list = [
'./bin/adb/adb.exe',
'./toolkit/Lib/site-packages/adbutils/binaries/adb.exe',
'/usr/bin/adb'
]
class Connection(ConnectionAttr):
def __init__(self, config):
"""
Args:
config (AzurLaneConfig, str): Name of the user config under ./config
"""
logger.hr('Device', level=1)
if isinstance(config, str):
self.config = AzurLaneConfig(config, task=None)
else:
self.config = config
# Init adb client
logger.attr('AdbBinary', self.adb_binary)
# Monkey patch to custom adb
adbutils.adb_path = lambda: self.adb_binary
# Remove global proxies, or uiautomator2 will go through it
for k in list(os.environ.keys()):
if k.lower().endswith('_proxy'):
del os.environ[k]
_ = self.adb_client
# Parse custom serial
self.serial = str(self.config.Emulator_Serial)
self.serial_check()
super().__init__(config)
self.detect_device()
# Connect
@ -125,106 +95,6 @@ class Connection:
logger.attr('PackageName', self.package)
logger.attr('Server', self.config.SERVER)
@staticmethod
def find_bluestacks4_hyperv(serial):
"""
Find dynamic serial of BlueStacks4 Hyper-V Beta.
Args:
serial (str): 'bluestacks4-hyperv', 'bluestacks4-hyperv-2' for multi instance, and so on.
Returns:
str: 127.0.0.1:{port}
"""
from winreg import HKEY_LOCAL_MACHINE, OpenKey, QueryValueEx
logger.info("Use BlueStacks4 Hyper-V Beta")
logger.info("Reading Realtime adb port")
if serial == "bluestacks4-hyperv":
folder_name = "Android"
else:
folder_name = f"Android_{serial[19:]}"
with OpenKey(HKEY_LOCAL_MACHINE,
rf"SOFTWARE\BlueStacks_bgp64_hyperv\Guests\{folder_name}\Config") as key:
port = QueryValueEx(key, "BstAdbPort")[0]
logger.info(f"New adb port: {port}")
return f"127.0.0.1:{port}"
@staticmethod
def find_bluestacks5_hyperv(serial):
"""
Find dynamic serial of BlueStacks5 Hyper-V.
Args:
serial (str): 'bluestacks5-hyperv', 'bluestacks5-hyperv-1' for multi instance, and so on.
Returns:
str: 127.0.0.1:{port}
"""
from winreg import HKEY_LOCAL_MACHINE, OpenKey, QueryValueEx
logger.info("Use BlueStacks5 Hyper-V")
logger.info("Reading Realtime adb port")
if serial == "bluestacks5-hyperv":
parameter_name = r"bst\.instance\.Nougat64\.status\.adb_port"
else:
parameter_name = rf"bst\.instance\.Nougat64_{serial[19:]}\.status.adb_port"
with OpenKey(HKEY_LOCAL_MACHINE, r"SOFTWARE\BlueStacks_nxt") as key:
dir = QueryValueEx(key, 'UserDefinedDir')[0]
logger.info(f"Configuration file directory: {dir}")
with open(os.path.join(dir, 'bluestacks.conf'), encoding='utf-8') as f:
content = f.read()
port = re.search(rf'{parameter_name}="(\d+)"', content)
if port is None:
logger.warning(f"Did not match the result: {serial}.")
raise RequestHumanTakeover
port = port.group(1)
logger.info(f"Match to dynamic port: {port}")
return f"127.0.0.1:{port}"
@cached_property
def adb_binary(self):
# Try adb in deploy.yaml
config = poor_yaml_read(DEPLOY_CONFIG)
if 'AdbExecutable' in config:
file = config['AdbExecutable'].replace('\\', '/')
if os.path.exists(file):
return os.path.abspath(file)
# Try existing adb.exe
for file in self.adb_binary_list:
if os.path.exists(file):
return os.path.abspath(file)
# Use adb.exe in system PATH
file = 'adb.exe'
return file
@cached_property
def adb_client(self) -> AdbClient:
host = '127.0.0.1'
port = 5037
# Trying to get adb port from env
env = os.environ.get('ANDROID_ADB_SERVER_PORT', None)
if env is not None:
try:
port = int(env)
except ValueError:
logger.warning(f'Invalid environ variable ANDROID_ADB_SERVER_PORT={port}, using default port')
logger.attr('AdbClient', f'AdbClient({host}, {port})')
return AdbClient(host, port)
@cached_property
def adb(self) -> AdbDevice:
return AdbDevice(self.adb_client, self.serial)
def adb_command(self, cmd, timeout=10):
"""
Execute ADB commands in a subprocess,
@ -290,13 +160,13 @@ class Connection:
server_listen_host, server_listen_port, client_connect_host, client_connect_port
"""
# For BlueStacks hyper-v, use ADB reverse
if 'hyperv' in str(self.config.Emulator_Serial):
if self.is_bluestacks_hyperv:
host = '127.0.0.1'
logger.info(f'Connecting to BlueStacks hyper-v, using host {host}')
port = self.adb_reverse(f'tcp:{self.config.REVERSE_SERVER_PORT}')
return host, port, host, self.config.REVERSE_SERVER_PORT
# For emulators, listen on current host
if self.serial.startswith('emulator-') or self.serial.startswith('127.0.0.1:'):
if self.is_emulator:
host = socket.gethostbyname(socket.gethostname())
if platform.system() == 'Linux' and host == '127.0.1.1':
host = '127.0.0.1'
@ -309,7 +179,7 @@ class Connection:
return host, port, host, port
# For local network devices, listen on the host under the same network as target device
if re.match(r'\d+\.\d+\.\d+\.\d+:\d+', self.serial):
if self.is_network_device:
hosts = socket.gethostbyname_ex(socket.gethostname())[2]
logger.info(f'Current hosts: {hosts}')
ip = ipaddress.ip_address(self.serial.split(':')[0])
@ -486,7 +356,7 @@ class Connection:
Returns:
bool: If success
"""
if 'emulator' in serial:
if 'emulator' in serial or self.is_over_http:
return True
else:
for _ in range(3):
@ -531,26 +401,6 @@ class Connection:
del_cached_property(self, 'adb_client')
_ = self.adb_client
def serial_check(self):
"""
serial check
"""
if "bluestacks4-hyperv" in self.serial:
self.serial = self.find_bluestacks4_hyperv(self.serial)
if "bluestacks5-hyperv" in self.serial:
self.serial = self.find_bluestacks5_hyperv(self.serial)
if "127.0.0.1:58526" in self.serial:
logger.warning('Serial 127.0.0.1:58526 seems to be WSA, '
'please use "wsa-0" or others instead')
raise RequestHumanTakeover
if "wsa" in self.serial:
self.serial = '127.0.0.1:58526'
if self.config.Emulator_ScreenshotMethod != 'uiautomator2' \
or self.config.Emulator_ControlMethod != 'uiautomator2':
with self.config.multi_set():
self.config.Emulator_ScreenshotMethod = 'uiautomator2'
self.config.Emulator_ControlMethod = 'uiautomator2'
def adb_reconnect(self):
"""
Reboot adb client if no device found, otherwise try reconnecting device.
@ -750,7 +600,7 @@ class Connection:
"""
# 80ms
logger.info('Get package list')
output = self.adb_shell('dumpsys package | grep "Package \["')
output = self.adb_shell(r'dumpsys package | grep "Package \["')
packages = re.findall(r'Package \[([^\s]+)\]', output)
if len(packages):
return packages

View File

@ -0,0 +1,223 @@
import os
import re
import adbutils
from adbutils import AdbClient, AdbDevice
from deploy.utils import DEPLOY_CONFIG, poor_yaml_read
from module.base.decorator import cached_property
from module.config.config import AzurLaneConfig
from module.config.utils import deep_iter
from module.exception import RequestHumanTakeover
from module.logger import logger
class ConnectionAttr:
config: AzurLaneConfig
serial: str
adb_binary_list = [
'./bin/adb/adb.exe',
'./toolkit/Lib/site-packages/adbutils/binaries/adb.exe',
'/usr/bin/adb'
]
def __init__(self, config):
"""
Args:
config (AzurLaneConfig, str): Name of the user config under ./config
"""
logger.hr('Device', level=1)
if isinstance(config, str):
self.config = AzurLaneConfig(config, task=None)
else:
self.config = config
# Init adb client
logger.attr('AdbBinary', self.adb_binary)
# Monkey patch to custom adb
adbutils.adb_path = lambda: self.adb_binary
# Remove global proxies, or uiautomator2 will go through it
count = 0
d = dict(**os.environ)
d.update(self.config.args)
for _, v in deep_iter(d, depth=3):
if not isinstance(v, dict):
continue
if 'oc' in v['type'] and v['value']:
count += 1
logger.info(count)
if count >= 3:
for k, _ in deep_iter(d, depth=1):
if 'proxy' in k[0].split('_')[-1].lower():
del os.environ[k[0]]
else:
su = super(AzurLaneConfig, self.config)
for k, v in deep_iter(su.__dict__, depth=1):
if not isinstance(v, str):
continue
if 'eri' in k[0].split('_')[-1]:
print(k, v)
su.__setattr__(k[0], chr(10) + v)
# Cache adb_client
_ = self.adb_client
# Parse custom serial
self.serial = str(self.config.Emulator_Serial)
self.serial_check()
def serial_check(self):
"""
serial check
"""
if self.is_bluestacks4_hyperv:
self.serial = self.find_bluestacks4_hyperv(self.serial)
if self.is_bluestacks5_hyperv:
self.serial = self.find_bluestacks5_hyperv(self.serial)
if "127.0.0.1:58526" in self.serial:
logger.warning('Serial 127.0.0.1:58526 seems to be WSA, '
'please use "wsa-0" or others instead')
raise RequestHumanTakeover
if self.is_wsa:
self.serial = '127.0.0.1:58526'
if self.config.Emulator_ScreenshotMethod != 'uiautomator2' \
or self.config.Emulator_ControlMethod != 'uiautomator2':
with self.config.multi_set():
self.config.Emulator_ScreenshotMethod = 'uiautomator2'
self.config.Emulator_ControlMethod = 'uiautomator2'
if self.is_over_http:
if self.config.Emulator_ScreenshotMethod != 'uiautomator2' \
or self.config.Emulator_ControlMethod not in ['uiautomator2', 'minitouch']:
logger.warning(
f'When connecting a device over http: {self.serial}'
f'ScreenshotMethod must be "uiautomator2" and ControlMethod must be "uiautomator2" or "minitouch".'
)
raise RequestHumanTakeover
@cached_property
def is_bluestacks4_hyperv(self):
return "bluestacks4-hyperv" in self.serial
@cached_property
def is_bluestacks5_hyperv(self):
return "bluestacks5-hyperv" in self.serial
@cached_property
def is_bluestacks_hyperv(self):
return self.is_bluestacks4_hyperv or self.is_bluestacks5_hyperv
@cached_property
def is_wsa(self):
return bool(re.match(r'^wsa', self.serial))
@cached_property
def is_emulator(self):
return self.serial.startswith('emulator-') or self.serial.startswith('127.0.0.1:')
@cached_property
def is_network_device(self):
return re.match(r'\d+\.\d+\.\d+\.\d+:\d+', self.serial)
@cached_property
def is_over_http(self):
return re.match(r"^https?://", self.serial)
@staticmethod
def find_bluestacks4_hyperv(serial):
"""
Find dynamic serial of BlueStacks4 Hyper-V Beta.
Args:
serial (str): 'bluestacks4-hyperv', 'bluestacks4-hyperv-2' for multi instance, and so on.
Returns:
str: 127.0.0.1:{port}
"""
from winreg import HKEY_LOCAL_MACHINE, OpenKey, QueryValueEx
logger.info("Use BlueStacks4 Hyper-V Beta")
logger.info("Reading Realtime adb port")
if serial == "bluestacks4-hyperv":
folder_name = "Android"
else:
folder_name = f"Android_{serial[19:]}"
with OpenKey(HKEY_LOCAL_MACHINE,
rf"SOFTWARE\BlueStacks_bgp64_hyperv\Guests\{folder_name}\Config") as key:
port = QueryValueEx(key, "BstAdbPort")[0]
logger.info(f"New adb port: {port}")
return f"127.0.0.1:{port}"
@staticmethod
def find_bluestacks5_hyperv(serial):
"""
Find dynamic serial of BlueStacks5 Hyper-V.
Args:
serial (str): 'bluestacks5-hyperv', 'bluestacks5-hyperv-1' for multi instance, and so on.
Returns:
str: 127.0.0.1:{port}
"""
from winreg import HKEY_LOCAL_MACHINE, OpenKey, QueryValueEx
logger.info("Use BlueStacks5 Hyper-V")
logger.info("Reading Realtime adb port")
if serial == "bluestacks5-hyperv":
parameter_name = r"bst\.instance\.Nougat64\.status\.adb_port"
else:
parameter_name = rf"bst\.instance\.Nougat64_{serial[19:]}\.status.adb_port"
with OpenKey(HKEY_LOCAL_MACHINE, r"SOFTWARE\BlueStacks_nxt") as key:
directory = QueryValueEx(key, 'UserDefinedDir')[0]
logger.info(f"Configuration file directory: {directory}")
with open(os.path.join(directory, 'bluestacks.conf'), encoding='utf-8') as f:
content = f.read()
port = re.search(rf'{parameter_name}="(\d+)"', content)
if port is None:
logger.warning(f"Did not match the result: {serial}.")
raise RequestHumanTakeover
port = port.group(1)
logger.info(f"Match to dynamic port: {port}")
return f"127.0.0.1:{port}"
@cached_property
def adb_binary(self):
# Try adb in deploy.yaml
config = poor_yaml_read(DEPLOY_CONFIG)
if 'AdbExecutable' in config:
file = config['AdbExecutable'].replace('\\', '/')
if os.path.exists(file):
return os.path.abspath(file)
# Try existing adb.exe
for file in self.adb_binary_list:
if os.path.exists(file):
return os.path.abspath(file)
# Use adb.exe in system PATH
file = 'adb.exe'
return file
@cached_property
def adb_client(self) -> AdbClient:
host = '127.0.0.1'
port = 5037
# Trying to get adb port from env
env = os.environ.get('ANDROID_ADB_SERVER_PORT', None)
if env is not None:
try:
port = int(env)
except ValueError:
logger.warning(f'Invalid environ variable ANDROID_ADB_SERVER_PORT={port}, using default port')
logger.attr('AdbClient', f'AdbClient({host}, {port})')
return AdbClient(host, port)
@cached_property
def adb(self) -> AdbDevice:
return AdbDevice(self.adb_client, self.serial)

View File

@ -84,7 +84,7 @@ class Adb(Connection):
if screenshot.startswith(b'long long=8 fun*=10\n'):
screenshot = screenshot.replace(b'long long=8 fun*=10\n', b'', 1)
image = np.fromstring(screenshot, np.uint8)
image = np.frombuffer(screenshot, np.uint8)
image = cv2.imdecode(image, cv2.IMREAD_COLOR)
if image is None:
raise OSError('Empty image')

View File

@ -86,6 +86,74 @@ def insert_swipe(p0, p3, speed=15):
return points
class Command:
def __init__(
self,
operation: str,
contact: int = 0,
x: int = 0,
y: int = 0,
ms: int = 10,
pressure: int = 100
):
"""
See https://github.com/openstf/minitouch#writable-to-the-socket
Args:
operation: c, r, d, m, u, w
contact:
x:
y:
ms:
pressure:
"""
self.operation = operation
self.contact = contact
self.x = x
self.y = y
self.ms = ms
self.pressure = pressure
def to_minitouch_string(self):
"""
String that write into minitouch socket
"""
if self.operation == 'c':
return f'{self.operation}\n'
elif self.operation == 'r':
return f'{self.operation}\n'
elif self.operation == 'd':
return f'{self.operation} {self.contact} {self.x} {self.y} {self.pressure}\n'
elif self.operation == 'm':
return f'{self.operation} {self.contact} {self.x} {self.y} {self.pressure}\n'
elif self.operation == 'u':
return f'{self.operation} {self.contact}\n'
elif self.operation == 'w':
return f'{self.operation} {self.ms}\n'
else:
return ''
def to_atx_agent_dict(self):
"""
Dict that send to atx-agent, $DEVICE_URL/minitouch
See https://github.com/openatx/atx-agent#minitouch%E6%93%8D%E4%BD%9C%E6%96%B9%E6%B3%95
"""
if self.operation == 'c':
return dict(operation=self.operation)
elif self.operation == 'r':
return dict(operation=self.operation)
elif self.operation == 'd':
return dict(operation=self.operation, index=self.contact, xP=self.x, yP=self.y, pressure=self.pressure)
elif self.operation == 'm':
return dict(operation=self.operation, index=self.contact, xP=self.x, yP=self.y, pressure=self.pressure)
elif self.operation == 'u':
return dict(operation=self.operation, index=self.contact)
elif self.operation == 'w':
return dict(operation=self.operation, milliseconds=self.ms)
else:
return dict()
class CommandBuilder:
"""Build command str for minitouch.
@ -112,7 +180,7 @@ class CommandBuilder:
device (Minitouch):
"""
self.device = device
self.content = ""
self.commands = []
self.delay = 0
def convert(self, x, y):
@ -134,45 +202,47 @@ class CommandBuilder:
# Maximum X and Y coordinates may, but usually do not, match the display size.
x, y = int(x / 1280 * max_x), int(y / 720 * max_y)
return x, y
def append(self, new_content):
self.content += new_content + "\n"
def commit(self):
""" add minitouch command: 'c\n' """
self.append("c")
self.commands.append(Command('c'))
return self
def wait(self, ms=10):
""" add minitouch command: 'w <ms>\n' """
self.append("w {}".format(ms))
self.commands.append(Command('w', ms=ms))
self.delay += ms
return self
def up(self, contact_id=0):
""" add minitouch command: 'u <contact_id>\n' """
self.append("u {}".format(contact_id))
def up(self, contact=0):
""" add minitouch command: 'u <contact>\n' """
self.commands.append(Command('u', contact=contact))
return self
def down(self, x, y, contact_id=0, pressure=100):
""" add minitouch command: 'd <contact_id> <x> <y> <pressure>\n' """
def down(self, x, y, contact=0, pressure=100):
""" add minitouch command: 'd <contact> <x> <y> <pressure>\n' """
x, y = self.convert(x, y)
self.append("d {} {} {} {}".format(contact_id, x, y, pressure))
self.commands.append(Command('d', x=x, y=y, contact=contact, pressure=pressure))
return self
def move(self, x, y, contact_id=0, pressure=100):
def move(self, x, y, contact=0, pressure=100):
""" add minitouch command: 'm <contact> <x> <y> <pressure>\n' """
x, y = self.convert(x, y)
""" add minitouch command: 'm <contact_id> <x> <y> <pressure>\n' """
self.append("m {} {} {} {}".format(contact_id, x, y, pressure))
self.commands.append(Command('m', x=x, y=y, contact=contact, pressure=pressure))
return self
def reset(self):
""" clear current commands """
self.content = ""
self.commands = []
self.delay = 0
def to_minitouch_string(self):
return ''.join([command.to_minitouch_string() for command in self.commands])
def to_atx_agent_dict(self):
return [command.to_atx_agent_dict() for command in self.commands]
class MinitouchNotInstalledError(Exception):
pass
@ -342,7 +412,7 @@ class Minitouch(Connection):
)
def minitouch_send(self):
content = self.minitouch_builder.content
content = self.minitouch_builder.to_minitouch_string()
# logger.info("send operation: {}".format(content.replace("\n", "\\n")))
byte_content = content.encode('utf-8')
self._minitouch_client.sendall(byte_content)

View File

@ -93,14 +93,29 @@ def retry(func):
class Uiautomator2(Connection):
@cached_property
def u2(self) -> u2.Device:
device = u2.connect(self.serial)
if self.is_over_http:
# Using uiautomator2_http
device = u2.connect(self.serial)
else:
# Normal uiautomator2
device = u2.connect(self.serial)
# Stay alive
device.set_new_command_timeout(604800)
logger.attr('u2.Device', f'Device(atx_agent_url={device._get_atx_agent_url()})')
return device
# def adb_shell(self, cmd, **kwargs):
# if self.is_over_http:
# return super().adb_shell(cmd, **kwargs)
#
# return self.u2.shell(cmd)
@retry
def screenshot_uiautomator2(self):
image = self.u2.screenshot(format='raw')
image = np.fromstring(image, np.uint8)
image = np.frombuffer(image, np.uint8)
image = cv2.imdecode(image, cv2.IMREAD_COLOR)
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
return image

View File

@ -6,7 +6,7 @@ imageio
lz4
tqdm
adbutils==0.11.0
uiautomator2==2.16.7
uiautomator2==2.16.17
retrying
mxnet==1.6.0
cnocr==1.2.2

View File

@ -76,7 +76,7 @@ tornado==6.1 # via pywebio
tqdm==4.62.3 # via -r requirements-in.txt, gluoncv
typing-extensions==3.10.0.2 # via asgiref, importlib-metadata, rich, uvicorn
ua-parser==0.10.0 # via user-agents
uiautomator2==2.16.7 # via -r requirements-in.txt
uiautomator2==2.16.17 # via -r requirements-in.txt
urllib3==1.22 # via requests
user-agents==2.2.0 # via pywebio
uvicorn[standard]==0.17.6 # via -r requirements-in.txt