Add: Screenshot method scrcpy

Control via scrcpy is supported but not exposed because swipes ended at the middle
This commit is contained in:
LmeSzinc 2022-12-29 12:46:56 +08:00
parent 18c7a09266
commit a3604d8100
19 changed files with 1121 additions and 16 deletions

Binary file not shown.

View File

@ -107,7 +107,8 @@
"uiautomator2",
"aScreenCap",
"aScreenCap_nc",
"DroidCast"
"DroidCast",
"scrcpy"
]
},
"ControlMethod": {

View File

@ -29,7 +29,7 @@ Emulator:
option: [ disabled, ]
ScreenshotMethod:
value: ADB_nc
option: [ ADB, ADB_nc, uiautomator2, aScreenCap, aScreenCap_nc, DroidCast ]
option: [ ADB, ADB_nc, uiautomator2, aScreenCap, aScreenCap_nc, DroidCast, scrcpy ]
ControlMethod:
value: minitouch
option: [ ADB, uiautomator2, minitouch, Hermit ]

View File

@ -21,7 +21,7 @@ class GeneratedConfig:
Emulator_Serial = 'auto'
Emulator_PackageName = 'auto' # auto, com.bilibili.azurlane, com.YoStarEN.AzurLane, com.YoStarJP.AzurLane, com.hkmanjuu.azurlane.gp, com.bilibili.blhx.huawei, com.bilibili.blhx.mi, com.tencent.tmgp.bilibili.blhx, com.bilibili.blhx.baidu, com.bilibili.blhx.qihoo, com.bilibili.blhx.nearme.gamecenter, com.bilibili.blhx.vivo, com.bilibili.blhx.mz, com.bilibili.blhx.uc, com.bilibili.blhx.mzw, com.yiwu.blhx.yx15, com.bilibili.blhx.m4399, com.hkmanjuu.azurlane.gp.mc
Emulator_ServerName = 'disabled' # disabled, cn_android-0, cn_android-1, cn_android-2, cn_android-3, cn_android-4, cn_android-5, cn_android-6, cn_android-7, cn_android-8, cn_android-9, cn_android-10, cn_android-11, cn_android-12, cn_android-13, cn_android-14, cn_android-15, cn_android-16, cn_android-17, cn_android-18, cn_android-19, cn_android-20, cn_android-21, cn_android-22, cn_ios-0, cn_ios-1, cn_ios-2, cn_ios-3, cn_ios-4, cn_ios-5, cn_ios-6, cn_ios-7, cn_ios-8, cn_ios-9, cn_ios-10, cn_channel-0, cn_channel-1, cn_channel-2, cn_channel-3, en-0, en-1, en-2, en-3, en-4, jp-0, jp-1, jp-2, jp-3, jp-4, jp-5, jp-6, jp-7, jp-8, jp-9, jp-10, jp-11, jp-12, jp-13, jp-14, jp-15, jp-16, jp-17
Emulator_ScreenshotMethod = 'ADB_nc' # ADB, ADB_nc, uiautomator2, aScreenCap, aScreenCap_nc, DroidCast
Emulator_ScreenshotMethod = 'ADB_nc' # ADB, ADB_nc, uiautomator2, aScreenCap, aScreenCap_nc, DroidCast, scrcpy
Emulator_ControlMethod = 'minitouch' # ADB, uiautomator2, minitouch, Hermit
Emulator_ScreenshotDedithering = False
Emulator_AdbRestart = False

View File

@ -78,6 +78,8 @@ class ManualConfig:
DROIDCAST_FILEPATH_REMOTE = '/data/local/tmp/DroidCast.apk'
MINITOUCH_FILEPATH_REMOTE = '/data/local/tmp/minitouch'
HERMIT_FILEPATH_LOCAL = './bin/hermit/hermit.apk'
SCRCPY_FILEPATH_LOCAL = './bin/scrcpy/scrcpy-server-v1.20.jar'
SCRCPY_FILEPATH_REMOTE = '/data/local/tmp/scrcpy-server-v1.20.jar'
"""
module.campaign.gems_farming

View File

@ -394,7 +394,8 @@
"uiautomator2": "uiautomator2",
"aScreenCap": "aScreenCap",
"aScreenCap_nc": "aScreenCap_nc",
"DroidCast": "DroidCast"
"DroidCast": "DroidCast",
"scrcpy": "scrcpy"
},
"ControlMethod": {
"name": "Control Method",

View File

@ -394,7 +394,8 @@
"uiautomator2": "uiautomator2",
"aScreenCap": "aScreenCap",
"aScreenCap_nc": "aScreenCap_nc",
"DroidCast": "DroidCast"
"DroidCast": "DroidCast",
"scrcpy": "scrcpy"
},
"ControlMethod": {
"name": "Emulator.ControlMethod.name",

View File

@ -394,7 +394,8 @@
"uiautomator2": "uiautomator2",
"aScreenCap": "aScreenCap",
"aScreenCap_nc": "aScreenCap_nc",
"DroidCast": "DroidCast"
"DroidCast": "DroidCast",
"scrcpy": "scrcpy"
},
"ControlMethod": {
"name": "模拟器控制方案",

View File

@ -394,7 +394,8 @@
"uiautomator2": "uiautomator2",
"aScreenCap": "aScreenCap",
"aScreenCap_nc": "aScreenCap_nc",
"DroidCast": "DroidCast"
"DroidCast": "DroidCast",
"scrcpy": "scrcpy"
},
"ControlMethod": {
"name": "模擬器控制方案",

View File

@ -9,12 +9,13 @@ from module.logger import logger
class AppControl(Adb, WSA, Uiautomator2):
hierarchy: etree._Element
_app_u2_family = ['uiautomator2', 'minitouch', 'DroidCast', 'scrcpy']
def app_is_running(self) -> bool:
method = self.config.Emulator_ControlMethod
if self.is_wsa:
package = self.app_current_wsa()
elif method == 'uiautomator2' or method == 'minitouch':
elif method in AppControl._app_u2_family:
package = self.app_current_uiautomator2()
else:
package = self.app_current_adb()
@ -28,7 +29,7 @@ class AppControl(Adb, WSA, Uiautomator2):
logger.info(f'App start: {self.package}')
if self.config.Emulator_Serial == 'wsa-0':
self.app_start_wsa(display=0)
elif method == 'uiautomator2' or method == 'minitouch':
elif method in AppControl._app_u2_family:
self.app_start_uiautomator2()
else:
self.app_start_adb()
@ -36,7 +37,7 @@ class AppControl(Adb, WSA, Uiautomator2):
def app_stop(self):
method = self.config.Emulator_ControlMethod
logger.info(f'App stop: {self.package}')
if method == 'uiautomator2' or method == 'minitouch':
if method in AppControl._app_u2_family:
self.app_stop_uiautomator2()
else:
self.app_stop_adb()
@ -47,7 +48,7 @@ class AppControl(Adb, WSA, Uiautomator2):
etree._Element: Select elements with `self.hierarchy.xpath('//*[@text="Hermit"]')` for example.
"""
method = self.config.Emulator_ControlMethod
if method == 'uiautomator2' or method == 'minitouch':
if method in AppControl._app_u2_family:
self.hierarchy = self.dump_hierarchy_uiautomator2()
else:
self.hierarchy = self.dump_hierarchy_adb()

View File

@ -3,11 +3,12 @@ from module.base.timer import Timer
from module.base.utils import *
from module.device.method.hermit import Hermit
from module.device.method.minitouch import Minitouch
from module.device.method.scrcpy import Scrcpy
from module.device.method.uiautomator_2 import Uiautomator2
from module.logger import logger
class Control(Hermit, Uiautomator2, Minitouch):
class Control(Hermit, Uiautomator2, Minitouch, Scrcpy):
def handle_control_check(self, button):
# Will be overridden in Device
pass
@ -33,6 +34,8 @@ class Control(Hermit, Uiautomator2, Minitouch):
self.click_uiautomator2(x, y)
elif method == 'Hermit':
self.click_hermit(x, y)
elif method == 'scrcpy':
self.click_scrcpy(x, y)
else:
self.click_adb(x, y)
@ -66,6 +69,8 @@ class Control(Hermit, Uiautomator2, Minitouch):
self.long_click_minitouch(x, y, duration)
elif method == 'uiautomator2':
self.long_click_uiautomator2(x, y, duration)
elif method == 'scrcpy':
self.long_click_scrcpy(x, y, duration)
else:
self.swipe_adb((x, y), (x, y), duration)
@ -78,6 +83,8 @@ class Control(Hermit, Uiautomator2, Minitouch):
logger.info('Swipe %s -> %s' % (point2str(*p1), point2str(*p2)))
elif method == 'uiautomator2':
logger.info('Swipe %s -> %s, %s' % (point2str(*p1), point2str(*p2), duration))
elif method == 'scrcpy':
logger.info('Swipe %s -> %s' % (point2str(*p1), point2str(*p2)))
else:
# ADB needs to be slow, or swipe doesn't work
duration *= 2.5
@ -94,6 +101,8 @@ class Control(Hermit, Uiautomator2, Minitouch):
self.swipe_minitouch(p1, p2)
elif method == 'uiautomator2':
self.swipe_uiautomator2(p1, p2, duration=duration)
elif method == 'scrcpy':
self.swipe_scrcpy(p1, p2)
else:
self.swipe_adb(p1, p2, duration=duration)
@ -139,6 +148,8 @@ class Control(Hermit, Uiautomator2, Minitouch):
self.drag_uiautomator2(
p1, p2, segments=segments, shake=shake, point_random=point_random, shake_random=shake_random,
swipe_duration=swipe_duration, shake_duration=shake_duration)
if method == 'scrcpy':
self.drag_scrcpy(p1, p2, point_random=point_random)
else:
logger.warning(f'Control method {method} does not support drag well, '
f'falling back to ADB swipe may cause unexpected behaviour')

View File

@ -33,7 +33,7 @@ def random_rho(dis):
return random_normal_distribution(-dis, dis)
def insert_swipe(p0, p3, speed=15):
def insert_swipe(p0, p3, speed=15, min_distance=10):
"""
Insert way point from start to end.
First generate a cubic bézier curve
@ -74,7 +74,7 @@ def insert_swipe(p0, p3, speed=15):
for t in ts:
point = p0 * (1 - t) ** 3 + 3 * p1 * t * (1 - t) ** 2 + 3 * p2 * t ** 2 * (1 - t) + p3 * t ** 3
point = point.astype(np.int).tolist()
if np.linalg.norm(np.subtract(point, prev)) < 10:
if np.linalg.norm(np.subtract(point, prev)) < min_distance:
continue
points.append(point)
@ -83,7 +83,7 @@ def insert_swipe(p0, p3, speed=15):
# Delete nearing points
if len(points[1:]):
distance = np.linalg.norm(np.subtract(points[1:], points[0]), axis=1)
mask = np.append(True, distance > 10)
mask = np.append(True, distance > min_distance)
points = np.array(points)[mask].tolist()
else:
points = [p0, p3]

View File

@ -0,0 +1 @@
from .scrcpy import Scrcpy, ScrcpyError

View File

@ -0,0 +1,326 @@
"""
This module includes all consts used in this project
"""
# Action
ACTION_DOWN = 0
ACTION_UP = 1
ACTION_MOVE = 2
# KeyCode
KEYCODE_UNKNOWN = 0
KEYCODE_SOFT_LEFT = 1
KEYCODE_SOFT_RIGHT = 2
KEYCODE_HOME = 3
KEYCODE_BACK = 4
KEYCODE_CALL = 5
KEYCODE_ENDCALL = 6
KEYCODE_0 = 7
KEYCODE_1 = 8
KEYCODE_2 = 9
KEYCODE_3 = 10
KEYCODE_4 = 11
KEYCODE_5 = 12
KEYCODE_6 = 13
KEYCODE_7 = 14
KEYCODE_8 = 15
KEYCODE_9 = 16
KEYCODE_STAR = 17
KEYCODE_POUND = 18
KEYCODE_DPAD_UP = 19
KEYCODE_DPAD_DOWN = 20
KEYCODE_DPAD_LEFT = 21
KEYCODE_DPAD_RIGHT = 22
KEYCODE_DPAD_CENTER = 23
KEYCODE_VOLUME_UP = 24
KEYCODE_VOLUME_DOWN = 25
KEYCODE_POWER = 26
KEYCODE_CAMERA = 27
KEYCODE_CLEAR = 28
KEYCODE_A = 29
KEYCODE_B = 30
KEYCODE_C = 31
KEYCODE_D = 32
KEYCODE_E = 33
KEYCODE_F = 34
KEYCODE_G = 35
KEYCODE_H = 36
KEYCODE_I = 37
KEYCODE_J = 38
KEYCODE_K = 39
KEYCODE_L = 40
KEYCODE_M = 41
KEYCODE_N = 42
KEYCODE_O = 43
KEYCODE_P = 44
KEYCODE_Q = 45
KEYCODE_R = 46
KEYCODE_S = 47
KEYCODE_T = 48
KEYCODE_U = 49
KEYCODE_V = 50
KEYCODE_W = 51
KEYCODE_X = 52
KEYCODE_Y = 53
KEYCODE_Z = 54
KEYCODE_COMMA = 55
KEYCODE_PERIOD = 56
KEYCODE_ALT_LEFT = 57
KEYCODE_ALT_RIGHT = 58
KEYCODE_SHIFT_LEFT = 59
KEYCODE_SHIFT_RIGHT = 60
KEYCODE_TAB = 61
KEYCODE_SPACE = 62
KEYCODE_SYM = 63
KEYCODE_EXPLORER = 64
KEYCODE_ENVELOPE = 65
KEYCODE_ENTER = 66
KEYCODE_DEL = 67
KEYCODE_GRAVE = 68
KEYCODE_MINUS = 69
KEYCODE_EQUALS = 70
KEYCODE_LEFT_BRACKET = 71
KEYCODE_RIGHT_BRACKET = 72
KEYCODE_BACKSLASH = 73
KEYCODE_SEMICOLON = 74
KEYCODE_APOSTROPHE = 75
KEYCODE_SLASH = 76
KEYCODE_AT = 77
KEYCODE_NUM = 78
KEYCODE_HEADSETHOOK = 79
KEYCODE_PLUS = 81
KEYCODE_MENU = 82
KEYCODE_NOTIFICATION = 83
KEYCODE_SEARCH = 84
KEYCODE_MEDIA_PLAY_PAUSE = 85
KEYCODE_MEDIA_STOP = 86
KEYCODE_MEDIA_NEXT = 87
KEYCODE_MEDIA_PREVIOUS = 88
KEYCODE_MEDIA_REWIND = 89
KEYCODE_MEDIA_FAST_FORWARD = 90
KEYCODE_MUTE = 91
KEYCODE_PAGE_UP = 92
KEYCODE_PAGE_DOWN = 93
KEYCODE_BUTTON_A = 96
KEYCODE_BUTTON_B = 97
KEYCODE_BUTTON_C = 98
KEYCODE_BUTTON_X = 99
KEYCODE_BUTTON_Y = 100
KEYCODE_BUTTON_Z = 101
KEYCODE_BUTTON_L1 = 102
KEYCODE_BUTTON_R1 = 103
KEYCODE_BUTTON_L2 = 104
KEYCODE_BUTTON_R2 = 105
KEYCODE_BUTTON_THUMBL = 106
KEYCODE_BUTTON_THUMBR = 107
KEYCODE_BUTTON_START = 108
KEYCODE_BUTTON_SELECT = 109
KEYCODE_BUTTON_MODE = 110
KEYCODE_ESCAPE = 111
KEYCODE_FORWARD_DEL = 112
KEYCODE_CTRL_LEFT = 113
KEYCODE_CTRL_RIGHT = 114
KEYCODE_CAPS_LOCK = 115
KEYCODE_SCROLL_LOCK = 116
KEYCODE_META_LEFT = 117
KEYCODE_META_RIGHT = 118
KEYCODE_FUNCTION = 119
KEYCODE_SYSRQ = 120
KEYCODE_BREAK = 121
KEYCODE_MOVE_HOME = 122
KEYCODE_MOVE_END = 123
KEYCODE_INSERT = 124
KEYCODE_FORWARD = 125
KEYCODE_MEDIA_PLAY = 126
KEYCODE_MEDIA_PAUSE = 127
KEYCODE_MEDIA_CLOSE = 128
KEYCODE_MEDIA_EJECT = 129
KEYCODE_MEDIA_RECORD = 130
KEYCODE_F1 = 131
KEYCODE_F2 = 132
KEYCODE_F3 = 133
KEYCODE_F4 = 134
KEYCODE_F5 = 135
KEYCODE_F6 = 136
KEYCODE_F7 = 137
KEYCODE_F8 = 138
KEYCODE_F9 = 139
KEYCODE_F10 = 140
KEYCODE_F11 = 141
KEYCODE_F12 = 142
KEYCODE_NUM_LOCK = 143
KEYCODE_NUMPAD_0 = 144
KEYCODE_NUMPAD_1 = 145
KEYCODE_NUMPAD_2 = 146
KEYCODE_NUMPAD_3 = 147
KEYCODE_NUMPAD_4 = 148
KEYCODE_NUMPAD_5 = 149
KEYCODE_NUMPAD_6 = 150
KEYCODE_NUMPAD_7 = 151
KEYCODE_NUMPAD_8 = 152
KEYCODE_NUMPAD_9 = 153
KEYCODE_NUMPAD_DIVIDE = 154
KEYCODE_NUMPAD_MULTIPLY = 155
KEYCODE_NUMPAD_SUBTRACT = 156
KEYCODE_NUMPAD_ADD = 157
KEYCODE_NUMPAD_DOT = 158
KEYCODE_NUMPAD_COMMA = 159
KEYCODE_NUMPAD_ENTER = 160
KEYCODE_NUMPAD_EQUALS = 161
KEYCODE_NUMPAD_LEFT_PAREN = 162
KEYCODE_NUMPAD_RIGHT_PAREN = 163
KEYCODE_VOLUME_MUTE = 164
KEYCODE_INFO = 165
KEYCODE_CHANNEL_UP = 166
KEYCODE_CHANNEL_DOWN = 167
KEYCODE_ZOOM_IN = 168
KEYCODE_ZOOM_OUT = 169
KEYCODE_TV = 170
KEYCODE_WINDOW = 171
KEYCODE_GUIDE = 172
KEYCODE_DVR = 173
KEYCODE_BOOKMARK = 174
KEYCODE_CAPTIONS = 175
KEYCODE_SETTINGS = 176
KEYCODE_TV_POWER = 177
KEYCODE_TV_INPUT = 178
KEYCODE_STB_POWER = 179
KEYCODE_STB_INPUT = 180
KEYCODE_AVR_POWER = 181
KEYCODE_AVR_INPUT = 182
KEYCODE_PROG_RED = 183
KEYCODE_PROG_GREEN = 184
KEYCODE_PROG_YELLOW = 185
KEYCODE_PROG_BLUE = 186
KEYCODE_APP_SWITCH = 187
KEYCODE_BUTTON_1 = 188
KEYCODE_BUTTON_2 = 189
KEYCODE_BUTTON_3 = 190
KEYCODE_BUTTON_4 = 191
KEYCODE_BUTTON_5 = 192
KEYCODE_BUTTON_6 = 193
KEYCODE_BUTTON_7 = 194
KEYCODE_BUTTON_8 = 195
KEYCODE_BUTTON_9 = 196
KEYCODE_BUTTON_10 = 197
KEYCODE_BUTTON_11 = 198
KEYCODE_BUTTON_12 = 199
KEYCODE_BUTTON_13 = 200
KEYCODE_BUTTON_14 = 201
KEYCODE_BUTTON_15 = 202
KEYCODE_BUTTON_16 = 203
KEYCODE_LANGUAGE_SWITCH = 204
KEYCODE_MANNER_MODE = 205
KEYCODE_3D_MODE = 206
KEYCODE_CONTACTS = 207
KEYCODE_CALENDAR = 208
KEYCODE_MUSIC = 209
KEYCODE_CALCULATOR = 210
KEYCODE_ZENKAKU_HANKAKU = 211
KEYCODE_EISU = 212
KEYCODE_MUHENKAN = 213
KEYCODE_HENKAN = 214
KEYCODE_KATAKANA_HIRAGANA = 215
KEYCODE_YEN = 216
KEYCODE_RO = 217
KEYCODE_KANA = 218
KEYCODE_ASSIST = 219
KEYCODE_BRIGHTNESS_DOWN = 220
KEYCODE_BRIGHTNESS_UP = 221
KEYCODE_MEDIA_AUDIO_TRACK = 222
KEYCODE_SLEEP = 223
KEYCODE_WAKEUP = 224
KEYCODE_PAIRING = 225
KEYCODE_MEDIA_TOP_MENU = 226
KEYCODE_11 = 227
KEYCODE_12 = 228
KEYCODE_LAST_CHANNEL = 229
KEYCODE_TV_DATA_SERVICE = 230
KEYCODE_VOICE_ASSIST = 231
KEYCODE_TV_RADIO_SERVICE = 232
KEYCODE_TV_TELETEXT = 233
KEYCODE_TV_NUMBER_ENTRY = 234
KEYCODE_TV_TERRESTRIAL_ANALOG = 235
KEYCODE_TV_TERRESTRIAL_DIGITAL = 236
KEYCODE_TV_SATELLITE = 237
KEYCODE_TV_SATELLITE_BS = 238
KEYCODE_TV_SATELLITE_CS = 239
KEYCODE_TV_SATELLITE_SERVICE = 240
KEYCODE_TV_NETWORK = 241
KEYCODE_TV_ANTENNA_CABLE = 242
KEYCODE_TV_INPUT_HDMI_1 = 243
KEYCODE_TV_INPUT_HDMI_2 = 244
KEYCODE_TV_INPUT_HDMI_3 = 245
KEYCODE_TV_INPUT_HDMI_4 = 246
KEYCODE_TV_INPUT_COMPOSITE_1 = 247
KEYCODE_TV_INPUT_COMPOSITE_2 = 248
KEYCODE_TV_INPUT_COMPONENT_1 = 249
KEYCODE_TV_INPUT_COMPONENT_2 = 250
KEYCODE_TV_INPUT_VGA_1 = 251
KEYCODE_TV_AUDIO_DESCRIPTION = 252
KEYCODE_TV_AUDIO_DESCRIPTION_MIX_UP = 253
KEYCODE_TV_AUDIO_DESCRIPTION_MIX_DOWN = 254
KEYCODE_TV_ZOOM_MODE = 255
KEYCODE_TV_CONTENTS_MENU = 256
KEYCODE_TV_MEDIA_CONTEXT_MENU = 257
KEYCODE_TV_TIMER_PROGRAMMING = 258
KEYCODE_HELP = 259
KEYCODE_NAVIGATE_PREVIOUS = 260
KEYCODE_NAVIGATE_NEXT = 261
KEYCODE_NAVIGATE_IN = 262
KEYCODE_NAVIGATE_OUT = 263
KEYCODE_STEM_PRIMARY = 264
KEYCODE_STEM_1 = 265
KEYCODE_STEM_2 = 266
KEYCODE_STEM_3 = 267
KEYCODE_DPAD_UP_LEFT = 268
KEYCODE_DPAD_DOWN_LEFT = 269
KEYCODE_DPAD_UP_RIGHT = 270
KEYCODE_DPAD_DOWN_RIGHT = 271
KEYCODE_MEDIA_SKIP_FORWARD = 272
KEYCODE_MEDIA_SKIP_BACKWARD = 273
KEYCODE_MEDIA_STEP_FORWARD = 274
KEYCODE_MEDIA_STEP_BACKWARD = 275
KEYCODE_SOFT_SLEEP = 276
KEYCODE_CUT = 277
KEYCODE_COPY = 278
KEYCODE_PASTE = 279
KEYCODE_SYSTEM_NAVIGATION_UP = 280
KEYCODE_SYSTEM_NAVIGATION_DOWN = 281
KEYCODE_SYSTEM_NAVIGATION_LEFT = 282
KEYCODE_SYSTEM_NAVIGATION_RIGHT = 283
KEYCODE_KEYCODE_ALL_APPS = 284
KEYCODE_KEYCODE_REFRESH = 285
KEYCODE_KEYCODE_THUMBS_UP = 286
KEYCODE_KEYCODE_THUMBS_DOWN = 287
# Event
EVENT_INIT = "init"
EVENT_FRAME = "frame"
EVENT_DISCONNECT = "disconnect"
# Type
TYPE_INJECT_KEYCODE = 0
TYPE_INJECT_TEXT = 1
TYPE_INJECT_TOUCH_EVENT = 2
TYPE_INJECT_SCROLL_EVENT = 3
TYPE_BACK_OR_SCREEN_ON = 4
TYPE_EXPAND_NOTIFICATION_PANEL = 5
TYPE_EXPAND_SETTINGS_PANEL = 6
TYPE_COLLAPSE_PANELS = 7
TYPE_GET_CLIPBOARD = 8
TYPE_SET_CLIPBOARD = 9
TYPE_SET_SCREEN_POWER_MODE = 10
TYPE_ROTATE_DEVICE = 11
# Lock screen orientation
LOCK_SCREEN_ORIENTATION_UNLOCKED = -1
LOCK_SCREEN_ORIENTATION_INITIAL = -2
LOCK_SCREEN_ORIENTATION_0 = 0
LOCK_SCREEN_ORIENTATION_1 = 1
LOCK_SCREEN_ORIENTATION_2 = 2
LOCK_SCREEN_ORIENTATION_3 = 3
# Screen power mode
POWER_MODE_OFF = 0
POWER_MODE_NORMAL = 2

View File

@ -0,0 +1,266 @@
import functools
import socket
import struct
import time
import module.device.method.scrcpy.const as const
def inject(control_type: int):
"""
Inject control code, with this inject, we will be able to do unit test
Args:
control_type: event to send, TYPE_*
"""
def wrapper(f):
@functools.wraps(f)
def inner(self, *args, **kwargs):
package = struct.pack(">B", control_type) + f(self, *args, **kwargs)
if self.control_socket is not None:
with self.control_socket_lock:
self.control_socket.send(package)
return package
return inner
return wrapper
class ControlSender:
def __init__(self, parent):
self.parent = parent
@property
def control_socket(self):
return self.parent._scrcpy_control_socket
@property
def control_socket_lock(self):
return self.parent._scrcpy_control_socket_lock
@property
def resolution(self):
return self.parent._scrcpy_resolution
@inject(const.TYPE_INJECT_KEYCODE)
def keycode(
self, keycode: int, action: int = const.ACTION_DOWN, repeat: int = 0
) -> bytes:
"""
Send keycode to device
Args:
keycode: const.KEYCODE_*
action: ACTION_DOWN | ACTION_UP
repeat: repeat count
"""
return struct.pack(">Biii", action, keycode, repeat, 0)
@inject(const.TYPE_INJECT_TEXT)
def text(self, text: str) -> bytes:
"""
Send text to device
Args:
text: text to send
"""
buffer = text.encode("utf-8")
return struct.pack(">i", len(buffer)) + buffer
@inject(const.TYPE_INJECT_TOUCH_EVENT)
def touch(
self, x: int, y: int, action: int = const.ACTION_DOWN, touch_id: int = -1
) -> bytes:
"""
Touch screen
Args:
x: horizontal position
y: vertical position
action: ACTION_DOWN | ACTION_UP | ACTION_MOVE
touch_id: Default using virtual id -1, you can specify it to emulate multi finger touch
"""
x, y = max(x, 0), max(y, 0)
return struct.pack(
">BqiiHHHi",
action,
touch_id,
int(x),
int(y),
int(self.resolution[0]),
int(self.resolution[1]),
0xFFFF,
1,
)
@inject(const.TYPE_INJECT_SCROLL_EVENT)
def scroll(self, x: int, y: int, h: int, v: int) -> bytes:
"""
Scroll screen
Args:
x: horizontal position
y: vertical position
h: horizontal movement
v: vertical movement
"""
x, y = max(x, 0), max(y, 0)
return struct.pack(
">iiHHii",
int(x),
int(y),
int(self.resolution[0]),
int(self.resolution[1]),
int(h),
int(v),
)
@inject(const.TYPE_BACK_OR_SCREEN_ON)
def back_or_turn_screen_on(self, action: int = const.ACTION_DOWN) -> bytes:
"""
If the screen is off, it is turned on only on ACTION_DOWN
Args:
action: ACTION_DOWN | ACTION_UP
"""
return struct.pack(">B", action)
@inject(const.TYPE_EXPAND_NOTIFICATION_PANEL)
def expand_notification_panel(self) -> bytes:
"""
Expand notification panel
"""
return b""
@inject(const.TYPE_EXPAND_SETTINGS_PANEL)
def expand_settings_panel(self) -> bytes:
"""
Expand settings panel
"""
return b""
@inject(const.TYPE_COLLAPSE_PANELS)
def collapse_panels(self) -> bytes:
"""
Collapse all panels
"""
return b""
def get_clipboard(self) -> str:
"""
Get clipboard
"""
# Since this function need socket response, we can't auto inject it any more
s: socket.socket = self.control_socket
with self.control_socket_lock:
# Flush socket
s.setblocking(False)
while True:
try:
s.recv(1024)
except BlockingIOError:
break
s.setblocking(True)
# Read package
package = struct.pack(">B", const.TYPE_GET_CLIPBOARD)
s.send(package)
(code,) = struct.unpack(">B", s.recv(1))
assert code == 0
(length,) = struct.unpack(">i", s.recv(4))
return s.recv(length).decode("utf-8")
@inject(const.TYPE_SET_CLIPBOARD)
def set_clipboard(self, text: str, paste: bool = False) -> bytes:
"""
Set clipboard
Args:
text: the string you want to set
paste: paste now
"""
buffer = text.encode("utf-8")
return struct.pack(">?i", paste, len(buffer)) + buffer
@inject(const.TYPE_SET_SCREEN_POWER_MODE)
def set_screen_power_mode(self, mode: int = const.POWER_MODE_NORMAL) -> bytes:
"""
Set screen power mode
Args:
mode: POWER_MODE_OFF | POWER_MODE_NORMAL
"""
return struct.pack(">b", mode)
@inject(const.TYPE_ROTATE_DEVICE)
def rotate_device(self) -> bytes:
"""
Rotate device
"""
return b""
def swipe(
self,
start_x: int,
start_y: int,
end_x: int,
end_y: int,
move_step_length: int = 5,
move_steps_delay: float = 0.005,
) -> None:
"""
Swipe on screen
Args:
start_x: start horizontal position
start_y: start vertical position
end_x: start horizontal position
end_y: end vertical position
move_step_length: length per step
move_steps_delay: sleep seconds after each step
:return:
"""
self.touch(start_x, start_y, const.ACTION_DOWN)
next_x = start_x
next_y = start_y
if end_x > self.resolution[0]:
end_x = self.resolution[0]
if end_y > self.resolution[1]:
end_y = self.resolution[1]
decrease_x = True if start_x > end_x else False
decrease_y = True if start_y > end_y else False
while True:
if decrease_x:
next_x -= move_step_length
if next_x < end_x:
next_x = end_x
else:
next_x += move_step_length
if next_x > end_x:
next_x = end_x
if decrease_y:
next_y -= move_step_length
if next_y < end_y:
next_y = end_y
else:
next_y += move_step_length
if next_y > end_y:
next_y = end_y
self.touch(next_x, next_y, const.ACTION_MOVE)
if next_x == end_x and next_y == end_y:
self.touch(next_x, next_y, const.ACTION_UP)
break
time.sleep(move_steps_delay)

View File

@ -0,0 +1,209 @@
import socket
import struct
import threading
import time
import typing as t
from time import sleep
import numpy as np
from adbutils import _AdbStreamConnection, AdbError, Network
from module.base.decorator import cached_property
from module.base.timer import Timer
from module.device.connection import Connection
from module.device.method.scrcpy.control import ControlSender
from module.device.method.scrcpy.options import ScrcpyOptions
from module.device.method.utils import recv_all
from module.logger import logger
class ScrcpyError(Exception):
pass
class ScrcpyCore(Connection):
"""
Scrcpy: https://github.com/Genymobile/scrcpy
Module from https://github.com/leng-yue/py-scrcpy-client
"""
_scrcpy_last_frame: t.Optional[np.ndarray] = None
_scrcpy_last_frame_time: float = 0.
_scrcpy_alive = False
_scrcpy_server_stream: t.Optional[_AdbStreamConnection] = None
_scrcpy_video_socket: t.Optional[socket.socket] = None
_scrcpy_control_socket: t.Optional[socket.socket] = None
_scrcpy_control_socket_lock = threading.Lock()
_scrcpy_stream_loop_thread = None
_scrcpy_resolution: t.Tuple[int, int] = (1280, 720)
@cached_property
def _scrcpy_control(self) -> ControlSender:
return ControlSender(self)
def scrcpy_init(self):
self._scrcpy_server_stop()
logger.hr('Scrcpy init')
logger.info(f'pushing {self.config.SCRCPY_FILEPATH_LOCAL}')
self.adb_push(self.config.SCRCPY_FILEPATH_LOCAL, self.config.SCRCPY_FILEPATH_REMOTE)
self._scrcpy_alive = False
self.scrcpy_ensure_running()
def scrcpy_ensure_running(self):
if not self._scrcpy_alive:
self._scrcpy_server_start()
def _scrcpy_server_start(self):
"""
Connect to scrcpy server, there will be two sockets, video and control socket.
Raises:
ScrcpyError:
"""
logger.hr('Scrcpy server start')
commands = ScrcpyOptions.command_v120(jar_path=self.config.SCRCPY_FILEPATH_REMOTE)
self._scrcpy_server_stream: _AdbStreamConnection = self.adb.shell(
commands,
stream=True,
)
logger.info('Create server stream')
ret = self._scrcpy_server_stream.read(10)
# b'Aborted \r\n'
# Probably because file not exists
if b'Aborted' in ret:
raise ScrcpyError('Aborted')
if ret == b'[server] E':
# [server] ERROR: ...
ret += recv_all(self._scrcpy_server_stream)
logger.error(ret)
# java.lang.IllegalArgumentException: The server version (1.25) does not match the client (...)
if b'does not match the client' in ret:
raise ScrcpyError('Server version does not match the client')
else:
raise ScrcpyError('Unknown scrcpy error')
else:
# [server] INFO: Device: ...
ret += self._scrcpy_receive_from_server_stream()
logger.info(ret)
pass
logger.info('Create video socket')
timeout = Timer(3).start()
while 1:
if timeout.reached():
raise ScrcpyError('Connect scrcpy-server timeout')
try:
self._scrcpy_video_socket = self.adb.create_connection(
Network.LOCAL_ABSTRACT, "scrcpy"
)
break
except AdbError:
sleep(0.1)
dummy_byte = self._scrcpy_video_socket.recv(1)
if not len(dummy_byte) or dummy_byte != b"\x00":
raise ScrcpyError('Did not receive Dummy Byte from video stream')
logger.info('Create control socket')
self._scrcpy_control_socket = self.adb.create_connection(
Network.LOCAL_ABSTRACT, "scrcpy"
)
logger.info('Fetch device info')
device_name = self._scrcpy_video_socket.recv(64).decode("utf-8").rstrip("\x00")
if len(device_name):
logger.attr('Scrcpy Device', device_name)
else:
raise ScrcpyError('Did not receive Device Name')
ret = self._scrcpy_video_socket.recv(4)
self._scrcpy_resolution = struct.unpack(">HH", ret)
logger.attr('Scrcpy Resolution', self._scrcpy_resolution)
self._scrcpy_video_socket.setblocking(False)
self._scrcpy_alive = True
logger.info('Start video stream loop thread')
self.stream_loop_thread = threading.Thread(
target=self._scrcpy_stream_loop, daemon=True
)
self.stream_loop_thread.start()
logger.info('Scrcpy server is up')
def _scrcpy_server_stop(self):
"""
Stop listening (both threaded and blocked)
"""
logger.hr('Scrcpy server stop')
err = self._scrcpy_receive_from_server_stream()
if err:
logger.error(err)
self._scrcpy_alive = False
if self._scrcpy_server_stream is not None:
try:
self._scrcpy_server_stream.close()
except Exception:
pass
if self._scrcpy_control_socket is not None:
try:
self._scrcpy_control_socket.close()
except Exception:
pass
if self._scrcpy_video_socket is not None:
try:
self._scrcpy_video_socket.close()
except Exception:
pass
def _scrcpy_receive_from_server_stream(self):
if self._scrcpy_server_stream is not None:
try:
return self._scrcpy_server_stream.conn.recv(4096)
except Exception:
pass
def _scrcpy_stream_loop(self) -> None:
"""
Core loop for video parsing
"""
from av.codec import CodecContext
from av.error import InvalidDataError
codec = CodecContext.create("h264", "r")
while self._scrcpy_alive:
try:
raw_h264 = self._scrcpy_video_socket.recv(0x10000)
if raw_h264 == b"":
raise ScrcpyError("Video stream is disconnected")
packets = codec.parse(raw_h264)
for packet in packets:
frames = codec.decode(packet)
for frame in frames:
frame = frame.to_ndarray(format="rgb24")
self._scrcpy_last_frame = frame
self._scrcpy_last_frame_time = time.time()
logger.info('frame received')
self._scrcpy_resolution = (frame.shape[1], frame.shape[0])
except (BlockingIOError, InvalidDataError):
# only return nonempty frames, may block cv2 render thread
time.sleep(0.001)
except (ConnectionError, OSError) as e: # Socket Closed
if self._scrcpy_alive:
raise ScrcpyError(str(e))
raise ScrcpyError('_scrcpy_stream_loop stopped')
if __name__ == '__main__':
self = ScrcpyCore('alas')
self.scrcpy_init()
self._scrcpy_server_start()
time.sleep(1)
from PIL import Image
Image.fromarray(self._scrcpy_last_frame).show()

View File

@ -0,0 +1,132 @@
import typing as t
import module.device.method.scrcpy.const as const
class ScrcpyOptions:
frame_rate = 6
@classmethod
def codec_options(cls) -> str:
"""
Custom codec options passing through scrcpy.
https://developer.android.com/reference/android/media/MediaFormat
Returns:
key_profile=1,key_level=4096,...
"""
options = dict(
# H.264 profile and level
# https://developer.android.com/reference/android/media/MediaCodecInfo.CodecProfileLevel
# Baseline, which only has I/P frames
key_profile=1,
# Level 4.1, for 1280x720@30fps
key_level=4096,
# Max quality
key_quality=100,
# https://developer.android.com/reference/android/media/MediaCodecInfo.EncoderCapabilities
# Constant quality
key_bitrate_mode=0,
# A zero value means a stream containing all key frames is requested.
key_i_frame_interval=0,
# https://developer.android.com/reference/android/media/MediaCodecInfo.CodecCapabilities
# COLOR_Format24bitBGR888
key_color_format=12,
# The same as output frame rate to lower CPU consumption
key_capture_rate=cls.frame_rate,
# 20Mbps, the maximum output bitrate of scrcpy
key_bit_rate=20000000,
)
return ','.join([f'{k}={v}' for k, v in options.items()])
@classmethod
def arguments(cls) -> t.List[str]:
"""
https://github.com/Genymobile/scrcpy/blob/master/server/src/main/java/com/genymobile/scrcpy/Server.java
https://github.com/Genymobile/scrcpy/blob/master/server/src/main/java/com/genymobile/scrcpy/Options.java
Returns:
['log_level=info', 'max_size=1280', ...]
"""
options = [
'log_level=info',
'max_size=1280',
# 20Mbps, the maximum output bitrate of scrcpy
# If a higher value is set, scrcpy fallback to 8Mbps default.
'bit_rate=20000000',
# Screenshot time cost <= 300ms is enough for human speed.
f'max_fps={cls.frame_rate}',
# No orientation lock
f'lock_video_orientation={const.LOCK_SCREEN_ORIENTATION_UNLOCKED}',
# Always true
'tunnel_forward=true',
# Always true for controlling via scrcpy
'control=true',
# Default to 0
'display_id=0',
# Useless, always false
'show_touches=false',
# Not determined, leave it as default
'stay_awake=false',
# Encoder name
# Should in [
# "OMX.google.h264.encoder",
# "OMX.qcom.video.encoder.avc",
# "c2.qti.avc.encoder",
# "c2.android.avc.encoder",
# ]
# Empty value, let scrcpy to decide
# 'encoder_name=',
# Codec options
f'codec_options={cls.codec_options()}',
# Useless, always false
'power_off_on_close=false',
'clipboard_autosync=false',
'downsize_on_error=false',
]
return options
@classmethod
def command_v125(cls, jar_path='/data/local/tmp/scrcpy-server.jar') -> t.List[str]:
"""
Generate the commands to run scrcpy.
"""
commands = [
f'CLASSPATH={jar_path}',
'app_process',
'/',
'com.genymobile.scrcpy.Server',
'1.25',
]
commands += cls.arguments()
return commands
@classmethod
def command_v120(cls, jar_path='/data/local/tmp/scrcpy-server.jar') -> t.List[str]:
commands = [
f"CLASSPATH={jar_path}",
"app_process",
"/",
"com.genymobile.scrcpy.Server",
"1.20", # Scrcpy server version
"info", # Log level: info, verbose...
f"1280", # Max screen width (long side)
f"20000000", # Bitrate of video
f"{cls.frame_rate}", # Max frame per second
f"{const.LOCK_SCREEN_ORIENTATION_UNLOCKED}", # Lock screen orientation: LOCK_SCREEN_ORIENTATION
"true", # Tunnel forward
"-", # Crop screen
"false", # Send frame rate to client
"true", # Control enabled
"0", # Display id
"false", # Show touches
"false", # Stay awake
cls.codec_options(), # Codec (video encoding) options
"-", # Encoder name
"false", # Power off screen after server closed
]
return commands
if __name__ == '__main__':
print(' '.join(ScrcpyOptions.command_v120()))

View File

@ -0,0 +1,146 @@
import time
from functools import wraps
import numpy as np
from adbutils.errors import AdbError
import module.device.method.scrcpy.const as const
from module.base.utils import random_rectangle_point
from module.device.method.minitouch import insert_swipe
from module.device.method.scrcpy.core import ScrcpyCore, ScrcpyError
from module.device.method.utils import RETRY_DELAY, RETRY_TRIES, handle_adb_error
from module.exception import RequestHumanTakeover
from module.logger import logger
def retry(func):
@wraps(func)
def retry_wrapper(self, *args, **kwargs):
"""
Args:
self (Minitouch):
"""
init = None
sleep = True
for _ in range(RETRY_TRIES):
try:
if callable(init):
if sleep:
self.sleep(RETRY_DELAY)
sleep = True
init()
return func(self, *args, **kwargs)
# Can't handle
except RequestHumanTakeover:
break
# When adb server was killed
except ConnectionResetError as e:
logger.error(e)
def init():
self.adb_reconnect()
# Emulator closed
except ConnectionAbortedError as e:
logger.error(e)
def init():
self.adb_reconnect()
# ScrcpyError
except ScrcpyError as e:
logger.error(e)
sleep = False
def init():
self.scrcpy_init()
# AdbError
except AdbError as e:
if handle_adb_error(e):
def init():
self.adb_reconnect()
else:
break
# Unknown, probably a trucked image
except Exception as e:
logger.exception(e)
def init():
pass
logger.critical(f'Retry {func.__name__}() failed')
raise RequestHumanTakeover
return retry_wrapper
class Scrcpy(ScrcpyCore):
@retry
def screenshot_scrcpy(self):
self.scrcpy_ensure_running()
# Wait new frame
with self._scrcpy_control_socket_lock:
now = time.time()
while 1:
time.sleep(0.001)
if self._scrcpy_last_frame_time > 0:
break
screenshot = self._scrcpy_last_frame.copy()
self._scrcpy_last_frame_time = 0.
return screenshot
@retry
def click_scrcpy(self, x, y):
self.scrcpy_ensure_running()
self._scrcpy_control.touch(x, y, const.ACTION_DOWN)
self._scrcpy_control.touch(x, y, const.ACTION_UP)
self.sleep(0.05)
@retry
def long_click_scrcpy(self, x, y, duration=1.0):
self.scrcpy_ensure_running()
self._scrcpy_control.touch(x, y, const.ACTION_DOWN)
self.sleep(duration)
self._scrcpy_control.touch(x, y, const.ACTION_UP)
self.sleep(0.05)
@retry
def swipe_scrcpy(self, p1, p2):
self.scrcpy_ensure_running()
# Unlike minitouch, scrcpy swipes needs to be continuous
# So 5 times smother
points = insert_swipe(p0=p1, p3=p2, speed=4, min_distance=2)
self._scrcpy_control.touch(*p1, const.ACTION_DOWN)
for point in points[1:-1]:
self._scrcpy_control.touch(*point, const.ACTION_MOVE)
self.sleep(0.002)
self._scrcpy_control.touch(*p2, const.ACTION_MOVE)
self._scrcpy_control.touch(*p2, const.ACTION_UP)
self.sleep(0.05)
@retry
def drag_scrcpy(self, p1, p2, point_random=(-10, -10, 10, 10)):
self.scrcpy_ensure_running()
p1 = np.array(p1) - random_rectangle_point(point_random)
p2 = np.array(p2) - random_rectangle_point(point_random)
points = insert_swipe(p0=p1, p3=p2, speed=4, min_distance=2)
self._scrcpy_control.touch(*p1, const.ACTION_DOWN)
for point in points[1:-1]:
self._scrcpy_control.touch(*point, const.ACTION_MOVE)
self.sleep(0.002)
# Hold 280ms
for _ in range(int(0.14 // 0.002) * 2):
self._scrcpy_control.touch(*p2, const.ACTION_MOVE)
self.sleep(0.002)
self._scrcpy_control.touch(*p2, const.ACTION_UP)
self.sleep(0.05)

View File

@ -13,12 +13,13 @@ from module.base.utils import get_color, image_size, limit_in, save_image
from module.device.method.adb import Adb
from module.device.method.ascreencap import AScreenCap
from module.device.method.droidcast import DroidCast
from module.device.method.scrcpy import Scrcpy
from module.device.method.wsa import WSA
from module.exception import RequestHumanTakeover, ScriptError
from module.logger import logger
class Screenshot(Adb, WSA, DroidCast, AScreenCap):
class Screenshot(Adb, WSA, DroidCast, AScreenCap, Scrcpy):
_screen_size_checked = False
_screen_black_checked = False
_minicap_uninstalled = False
@ -35,6 +36,7 @@ class Screenshot(Adb, WSA, DroidCast, AScreenCap):
'aScreenCap': self.screenshot_ascreencap,
'aScreenCap_nc': self.screenshot_ascreencap_nc,
'DroidCast': self.screenshot_droidcast,
'scrcpy': self.screenshot_scrcpy,
}
@timer
@ -158,6 +160,10 @@ class Screenshot(Adb, WSA, DroidCast, AScreenCap):
else:
logger.warning(f'Unknown screenshot interval: {interval}')
raise ScriptError(f'Unknown screenshot interval: {interval}')
# Screenshot interval in scrcpy is meaningless,
# video stream is received continuously no matter you use it or not.
if self.config.Emulator_ScreenshotMethod == 'scrcpy':
interval = 0.1
if interval != self._screenshot_interval.limit:
logger.info(f'Screenshot interval set to {interval}s')