Merge pull request #4472 from LmeSzinc/dev

Bug fix
This commit is contained in:
LmeSzinc 2024-12-27 23:56:47 +08:00 committed by GitHub
commit e44589df46
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
32 changed files with 672 additions and 106 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.0 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.2 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.0 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 8.1 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.2 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 9.5 KiB

View File

@ -0,0 +1,132 @@
from .campaign_base import CampaignBase
from module.map.map_base import CampaignMap
from module.map.map_grids import SelectedGrids, RoadGrids
from module.logger import logger
MAP = CampaignMap('SP')
MAP.shape = 'I9'
MAP.camera_data = ['D2', 'D5', 'D7', 'F2', 'F5', 'F7']
MAP.camera_data_spawn_point = ['F6']
MAP.map_data = """
++ -- ME ++ ++ ++ ME -- ++
-- ME -- -- MB -- -- -- ME
-- -- -- -- -- -- ME -- ++
++ ME ++ ++ ++ ++ ++ -- --
++ -- ++ ++ ++ ++ ++ ME --
-- -- ++ ++ ++ SP SP -- --
ME -- -- -- -- __ __ -- MS
-- ME -- ++ ME -- ME MS --
++ -- ME ++ -- MS ++ -- ++
"""
MAP.weight_data = """
50 50 50 50 10 10 10 10 10
50 50 50 50 10 10 10 10 10
50 50 50 50 10 10 10 10 10
50 50 50 50 50 50 50 10 10
50 50 50 50 50 50 50 10 10
50 50 50 50 50 10 10 10 10
50 50 50 50 10 10 10 10 10
50 50 50 50 10 10 10 10 10
50 50 50 50 10 10 10 10 10
"""
MAP.spawn_data = [
{'battle': 0, 'enemy': 2, 'siren': 2},
{'battle': 1, 'enemy': 1},
{'battle': 2, 'enemy': 2, 'siren': 1},
{'battle': 3, 'enemy': 1},
{'battle': 4, 'enemy': 2},
{'battle': 5, 'enemy': 1},
{'battle': 6},
{'battle': 7, 'boss': 1},
]
MAP.spawn_data_loop = [
{'battle': 0, 'enemy': 12, 'siren': 3},
{'battle': 1},
{'battle': 2},
{'battle': 3},
{'battle': 4},
{'battle': 5},
{'battle': 6},
{'battle': 7, 'boss': 1},
]
A1, B1, C1, D1, E1, F1, G1, H1, I1, \
A2, B2, C2, D2, E2, F2, G2, H2, I2, \
A3, B3, C3, D3, E3, F3, G3, H3, I3, \
A4, B4, C4, D4, E4, F4, G4, H4, I4, \
A5, B5, C5, D5, E5, F5, G5, H5, I5, \
A6, B6, C6, D6, E6, F6, G6, H6, I6, \
A7, B7, C7, D7, E7, F7, G7, H7, I7, \
A8, B8, C8, D8, E8, F8, G8, H8, I8, \
A9, B9, C9, D9, E9, F9, G9, H9, I9, \
= MAP.flatten()
class Config:
# ===== Start of generated config =====
MAP_SIREN_TEMPLATE = []
MOVABLE_ENEMY_TURN = (2,)
MAP_HAS_SIREN = True
MAP_HAS_MOVABLE_ENEMY = True
MAP_HAS_MAP_STORY = False
MAP_HAS_FLEET_STEP = True
MAP_HAS_AMBUSH = False
MAP_HAS_MYSTERY = False
STAR_REQUIRE_1 = 0
STAR_REQUIRE_2 = 0
STAR_REQUIRE_3 = 0
# ===== End of generated config =====
STAGE_ENTRANCE = ['half', '20240725']
MAP_CHAPTER_SWITCH_20241219 = True
MAP_HAS_MODE_SWITCH = False
MAP_HAS_MOVABLE_NORMAL_ENEMY = True
MAP_SIREN_HAS_BOSS_ICON_SMALL = True
MOVABLE_NORMAL_ENEMY_TURN = (2,)
MAP_SIREN_MOVE_WAIT = 0.7
INTERNAL_LINES_FIND_PEAKS_PARAMETERS = {
'height': (80, 255 - 17),
'width': (0.9, 10),
'prominence': 10,
'distance': 35,
}
EDGE_LINES_FIND_PEAKS_PARAMETERS = {
'height': (255 - 17, 255),
'prominence': 10,
'distance': 50,
# 'width': (0, 7),
'wlen': 1000
}
HOMO_EDGE_COLOR_RANGE = (0, 17)
HOMO_EDGE_HOUGHLINES_THRESHOLD = 210
MAP_ENSURE_EDGE_INSIGHT_CORNER = 'bottom'
MAP_SWIPE_MULTIPLY = (1.090, 1.110)
MAP_SWIPE_MULTIPLY_MINITOUCH = (1.054, 1.073)
MAP_SWIPE_MULTIPLY_MAATOUCH = (1.023, 1.042)
MAP_IS_ONE_TIME_STAGE = True
MAP_WALK_USE_CURRENT_FLEET = True
class Campaign(CampaignBase):
MAP = MAP
ENEMY_FILTER = '1L > 1M > 1E > 1C > 2L > 2M > 2E > 2C > 3L > 3M > 3E > 3C'
def battle_0(self):
if self.clear_siren():
return True
if self.clear_enemy(sort=('weight', 'cost_2', 'cost_1')):
return True
return self.battle_default()
def battle_5(self):
if self.clear_siren():
return True
if self.clear_enemy(sort=('weight', 'cost_1')):
return True
return self.battle_default()
def battle_7(self):
return self.fleet_boss.clear_boss()

View File

@ -334,7 +334,7 @@ class Button(Resource):
Returns:
bool.
"""
if self.match(image, offset=offset, similarity=similarity):
if self.match_luma(image, offset=offset, similarity=similarity):
diff = np.subtract(self.button, self._button)[:2]
area = area_offset(self.area, offset=diff)
color = get_color(image, area)

View File

@ -237,7 +237,9 @@ class AutoSearchCombat(MapOperation, Combat, CampaignStatus):
# End
if self.is_in_auto_search_menu() or self._handle_auto_search_menu_missing():
raise CampaignEnd
if self.is_combat_executing():
pause = self.is_combat_executing()
if pause:
logger.attr('BattleUI', pause)
break
logger.info('Auto Search combat execute')

View File

@ -83,17 +83,24 @@ class Combat(Level, HPBalancer, Retirement, SubmarineCall, CombatAuto, CombatMan
"""
self.device.stuck_record_add(PAUSE)
if self.config.SERVER in ['cn', 'en']:
if PAUSE.match_luma(self.device.image, offset=(20, 20)):
if PAUSE.match_luma(self.device.image, offset=(10, 10)):
return PAUSE
else:
color = get_color(self.device.image, PAUSE.area)
if color_similar(color, PAUSE.color) or color_similar(color, (238, 244, 248)):
if np.max(self.image_crop(PAUSE_DOUBLE_CHECK, copy=False)) < 153:
return PAUSE
if PAUSE_New.match_luma(self.device.image, offset=(20, 20)):
if PAUSE_New.match_template_color(self.device.image, offset=(10, 10)):
return PAUSE_New
if PAUSE_Iridescent_Fantasy.match_luma(self.device.image, offset=(20, 20)):
if PAUSE_Iridescent_Fantasy.match_luma(self.device.image, offset=(10, 10)):
return PAUSE_Iridescent_Fantasy
if PAUSE_Christmas.match_luma(self.device.image, offset=(10, 10)):
return PAUSE_Christmas
# PAUSE_New, PAUSE_Cyber, PAUSE_Neon look similar, check colors
if PAUSE_Neon.match_template_color(self.device.image, offset=(10, 10)):
return PAUSE_Neon
if PAUSE_Cyber.match_template_color(self.device.image, offset=(10, 10)):
return PAUSE_Cyber
return False
def handle_combat_quit(self, offset=(20, 20), interval=3):
@ -112,6 +119,12 @@ class Combat(Level, HPBalancer, Retirement, SubmarineCall, CombatAuto, CombatMan
self.device.click(QUIT_Iridescent_Fantasy)
timer.reset()
return True
# Battle UI PAUSE_Neon uses QUIT_New
# Battle UI PAUSE_Cyber uses QUIT_New
if QUIT_Christmas.match_luma(self.device.image, offset=offset):
self.device.click(QUIT_Christmas)
timer.reset()
return True
return False
def ensure_combat_oil_loaded(self):
@ -170,7 +183,9 @@ class Combat(Level, HPBalancer, Retirement, SubmarineCall, CombatAuto, CombatMan
interval_set = True
# End
if self.is_combat_executing():
pause = self.is_combat_executing()
if pause:
logger.attr('BattleUI', pause)
if emotion_reduce:
self.emotion.reduce(fleet_index)
break

View File

@ -5,9 +5,13 @@ from module.base.template import Template
# Don't modify it manually.
PAUSE = Button(area={'cn': (1158, 40, 1199, 58), 'en': (1155, 38, 1216, 51), 'jp': (1232, 36, 1240, 60), 'tw': (1217, 36, 1225, 59)}, color={'cn': (189, 190, 202), 'en': (164, 169, 181), 'jp': (244, 241, 246), 'tw': (247, 243, 247)}, button={'cn': (1157, 34, 1241, 61), 'en': (1136, 26, 1270, 63), 'jp': (1141, 38, 1220, 57), 'tw': (1157, 34, 1241, 61)}, file={'cn': './assets/cn/combat_ui/PAUSE.png', 'en': './assets/en/combat_ui/PAUSE.png', 'jp': './assets/jp/combat_ui/PAUSE.png', 'tw': './assets/tw/combat_ui/PAUSE.png'})
PAUSE_Christmas = Button(area={'cn': (1234, 35, 1250, 56), 'en': (1234, 35, 1250, 56), 'jp': (1234, 35, 1250, 56), 'tw': (1234, 35, 1250, 56)}, color={'cn': (158, 181, 210), 'en': (158, 181, 210), 'jp': (158, 181, 210), 'tw': (158, 181, 210)}, button={'cn': (1234, 35, 1250, 56), 'en': (1234, 35, 1250, 56), 'jp': (1234, 35, 1250, 56), 'tw': (1234, 35, 1250, 56)}, file={'cn': './assets/cn/combat_ui/PAUSE_Christmas.png', 'en': './assets/cn/combat_ui/PAUSE_Christmas.png', 'jp': './assets/cn/combat_ui/PAUSE_Christmas.png', 'tw': './assets/cn/combat_ui/PAUSE_Christmas.png'})
PAUSE_Cyber = Button(area={'cn': (1231, 32, 1253, 59), 'en': (1231, 32, 1253, 59), 'jp': (1231, 32, 1253, 59), 'tw': (1231, 32, 1253, 59)}, color={'cn': (40, 140, 157), 'en': (40, 140, 157), 'jp': (40, 140, 157), 'tw': (40, 140, 157)}, button={'cn': (1231, 32, 1253, 59), 'en': (1231, 32, 1253, 59), 'jp': (1231, 32, 1253, 59), 'tw': (1231, 32, 1253, 59)}, file={'cn': './assets/cn/combat_ui/PAUSE_Cyber.png', 'en': './assets/cn/combat_ui/PAUSE_Cyber.png', 'jp': './assets/cn/combat_ui/PAUSE_Cyber.png', 'tw': './assets/cn/combat_ui/PAUSE_Cyber.png'})
PAUSE_DOUBLE_CHECK = Button(area={'cn': (1226, 35, 1231, 60), 'en': (1226, 35, 1231, 61), 'jp': (1226, 35, 1230, 60), 'tw': (1226, 35, 1231, 60)}, color={'cn': (96, 104, 136), 'en': (83, 98, 118), 'jp': (97, 102, 120), 'tw': (96, 104, 136)}, button={'cn': (1226, 35, 1231, 60), 'en': (1226, 35, 1231, 61), 'jp': (1226, 35, 1230, 60), 'tw': (1226, 35, 1231, 60)}, file={'cn': './assets/cn/combat_ui/PAUSE_DOUBLE_CHECK.png', 'en': './assets/en/combat_ui/PAUSE_DOUBLE_CHECK.png', 'jp': './assets/jp/combat_ui/PAUSE_DOUBLE_CHECK.png', 'tw': './assets/tw/combat_ui/PAUSE_DOUBLE_CHECK.png'})
PAUSE_Iridescent_Fantasy = Button(area={'cn': (1232, 33, 1252, 57), 'en': (1232, 33, 1252, 57), 'jp': (1232, 33, 1252, 57), 'tw': (1232, 33, 1252, 57)}, color={'cn': (124, 139, 190), 'en': (124, 139, 190), 'jp': (124, 139, 190), 'tw': (124, 139, 190)}, button={'cn': (1232, 33, 1252, 57), 'en': (1232, 33, 1252, 57), 'jp': (1232, 33, 1252, 57), 'tw': (1232, 33, 1252, 57)}, file={'cn': './assets/cn/combat_ui/PAUSE_Iridescent_Fantasy.png', 'en': './assets/en/combat_ui/PAUSE_Iridescent_Fantasy.png', 'jp': './assets/jp/combat_ui/PAUSE_Iridescent_Fantasy.png', 'tw': './assets/tw/combat_ui/PAUSE_Iridescent_Fantasy.png'})
PAUSE_Neon = Button(area={'cn': (1228, 32, 1250, 59), 'en': (1228, 32, 1250, 59), 'jp': (1228, 32, 1250, 59), 'tw': (1228, 32, 1250, 59)}, color={'cn': (106, 137, 80), 'en': (106, 137, 80), 'jp': (106, 137, 80), 'tw': (106, 137, 80)}, button={'cn': (1228, 32, 1250, 59), 'en': (1228, 32, 1250, 59), 'jp': (1228, 32, 1250, 59), 'tw': (1228, 32, 1250, 59)}, file={'cn': './assets/cn/combat_ui/PAUSE_Neon.png', 'en': './assets/cn/combat_ui/PAUSE_Neon.png', 'jp': './assets/cn/combat_ui/PAUSE_Neon.png', 'tw': './assets/cn/combat_ui/PAUSE_Neon.png'})
PAUSE_New = Button(area={'cn': (1231, 29, 1253, 56), 'en': (1231, 29, 1253, 56), 'jp': (1231, 29, 1253, 56), 'tw': (1231, 29, 1253, 56)}, color={'cn': (156, 158, 166), 'en': (156, 158, 166), 'jp': (156, 158, 166), 'tw': (156, 158, 166)}, button={'cn': (1231, 29, 1253, 56), 'en': (1231, 29, 1253, 56), 'jp': (1231, 29, 1253, 56), 'tw': (1231, 29, 1253, 56)}, file={'cn': './assets/cn/combat_ui/PAUSE_New.png', 'en': './assets/en/combat_ui/PAUSE_New.png', 'jp': './assets/jp/combat_ui/PAUSE_New.png', 'tw': './assets/tw/combat_ui/PAUSE_New.png'})
QUIT = Button(area={'cn': (420, 490, 593, 548), 'en': (473, 508, 567, 532), 'jp': (433, 490, 606, 547), 'tw': (433, 490, 606, 547)}, color={'cn': (199, 122, 114), 'en': (216, 168, 164), 'jp': (196, 120, 113), 'tw': (200, 126, 118)}, button={'cn': (420, 490, 593, 548), 'en': (473, 508, 567, 532), 'jp': (433, 490, 606, 547), 'tw': (433, 490, 606, 547)}, file={'cn': './assets/cn/combat_ui/QUIT.png', 'en': './assets/en/combat_ui/QUIT.png', 'jp': './assets/jp/combat_ui/QUIT.png', 'tw': './assets/tw/combat_ui/QUIT.png'})
QUIT_Christmas = Button(area={'cn': (400, 506, 477, 525), 'en': (410, 507, 469, 524), 'jp': (400, 506, 477, 525), 'tw': (400, 506, 477, 525)}, color={'cn': (195, 139, 166), 'en': (207, 166, 185), 'jp': (195, 139, 166), 'tw': (195, 139, 166)}, button={'cn': (400, 506, 477, 525), 'en': (410, 507, 469, 524), 'jp': (400, 506, 477, 525), 'tw': (400, 506, 477, 525)}, file={'cn': './assets/cn/combat_ui/QUIT_Christmas.png', 'en': './assets/en/combat_ui/QUIT_Christmas.png', 'jp': './assets/cn/combat_ui/QUIT_Christmas.png', 'tw': './assets/cn/combat_ui/QUIT_Christmas.png'})
QUIT_Iridescent_Fantasy = Button(area={'cn': (391, 522, 464, 540), 'en': (402, 507, 460, 523), 'jp': (391, 522, 464, 540), 'tw': (391, 522, 464, 540)}, color={'cn': (121, 73, 79), 'en': (255, 174, 164), 'jp': (108, 60, 70), 'tw': (121, 73, 79)}, button={'cn': (391, 522, 464, 540), 'en': (402, 507, 460, 523), 'jp': (391, 522, 464, 540), 'tw': (391, 522, 464, 540)}, file={'cn': './assets/cn/combat_ui/QUIT_Iridescent_Fantasy.png', 'en': './assets/en/combat_ui/QUIT_Iridescent_Fantasy.png', 'jp': './assets/jp/combat_ui/QUIT_Iridescent_Fantasy.png', 'tw': './assets/cn/combat_ui/QUIT_Iridescent_Fantasy.png'})
QUIT_New = Button(area={'cn': (394, 506, 467, 524), 'en': (404, 506, 463, 523), 'jp': (394, 506, 467, 524), 'tw': (394, 506, 467, 524)}, color={'cn': (255, 180, 171), 'en': (255, 195, 187), 'jp': (255, 180, 171), 'tw': (255, 180, 171)}, button={'cn': (394, 506, 467, 524), 'en': (404, 506, 463, 523), 'jp': (394, 506, 467, 524), 'tw': (394, 506, 467, 524)}, file={'cn': './assets/cn/combat_ui/QUIT_New.png', 'en': './assets/en/combat_ui/QUIT_New.png', 'jp': './assets/cn/combat_ui/QUIT_New.png', 'tw': './assets/cn/combat_ui/QUIT_New.png'})

View File

@ -34,7 +34,7 @@ def retry(func):
for _ in range(RETRY_TRIES):
try:
if callable(init):
retry_sleep(_)
time.sleep(retry_sleep(_))
init()
return func(self, *args, **kwargs)
# Can't handle

View File

@ -27,7 +27,7 @@ def retry(func):
for _ in range(RETRY_TRIES):
try:
if callable(init):
retry_sleep(_)
time.sleep(retry_sleep(_))
init()
return func(self, *args, **kwargs)
# Can't handle

View File

@ -1,4 +1,5 @@
import os
import time
from functools import wraps
import lz4.block
@ -27,7 +28,7 @@ def retry(func):
for _ in range(RETRY_TRIES):
try:
if callable(init):
retry_sleep(_)
time.sleep(retry_sleep(_))
init()
return func(self, *args, **kwargs)
# Can't handle

View File

@ -1,3 +1,4 @@
import time
import typing as t
from functools import wraps
@ -30,7 +31,7 @@ def retry(func):
for _ in range(RETRY_TRIES):
try:
if callable(init):
retry_sleep(_)
time.sleep(retry_sleep(_))
init()
return func(self, *args, **kwargs)
# Can't handle

View File

@ -1,4 +1,5 @@
import json
import time
from functools import wraps
import requests
@ -29,7 +30,7 @@ def retry(func):
for _ in range(RETRY_TRIES):
try:
if callable(init):
retry_sleep(_)
time.sleep(retry_sleep(_))
init()
return func(self, *args, **kwargs)
# Can't handle

View File

@ -145,7 +145,7 @@ def retry(func):
for _ in range(RETRY_TRIES):
try:
if callable(init):
retry_sleep(_)
time.sleep(retry_sleep(_))
init()
return func(self, *args, **kwargs)
# Can't handle

View File

@ -30,7 +30,7 @@ def retry(func):
for _ in range(RETRY_TRIES):
try:
if callable(init):
retry_sleep(_)
time.sleep(retry_sleep(_))
init()
return func(self, *args, **kwargs)
# Can't handle

View File

@ -383,7 +383,7 @@ def retry(func):
for _ in range(RETRY_TRIES):
try:
if callable(init):
retry_sleep(_)
time.sleep(retry_sleep(_))
init()
return func(self, *args, **kwargs)
# Can't handle

View File

@ -1,9 +1,9 @@
import asyncio
import ctypes
import json
import os
import sys
from functools import partial, wraps
import time
from functools import wraps
import cv2
import numpy as np
@ -13,6 +13,7 @@ from module.base.timer import Timer
from module.base.utils import ensure_time
from module.config.utils import deep_get
from module.device.method.minitouch import insert_swipe, random_rectangle_point
from module.device.method.pool import JobTimeout, WORKER_POOL
from module.device.method.utils import RETRY_TRIES, retry_sleep
from module.device.platform import Platform
from module.exception import RequestHumanTakeover
@ -163,9 +164,14 @@ def retry(func):
"""
init = None
for _ in range(RETRY_TRIES):
# Extend timeout on retries
if func.__name__ == 'screenshot':
timeout = retry_sleep(_)
if timeout > 0:
kwargs['timeout'] = timeout
try:
if callable(init):
retry_sleep(_)
time.sleep(retry_sleep(_))
init()
return func(self, *args, **kwargs)
# Can't handle
@ -176,11 +182,11 @@ def retry(func):
logger.error(e)
break
# Function call timeout
except asyncio.TimeoutError:
except JobTimeout:
logger.warning(f'Func {func.__name__}() call timeout, retrying: {_}')
def init():
self.reconnect()
pass
# NemuIpcError
except NemuIpcError as e:
logger.error(e)
@ -237,14 +243,17 @@ class NemuIpcImpl:
self.width = 0
self.height = 0
def connect(self):
def connect(self, on_thread=True):
if self.connect_id > 0:
return
connect_id = self.ev_run_sync(
self.lib.nemu_connect,
self.nemu_folder, self.instance_id
)
if on_thread:
connect_id = self.run_func(
self.lib.nemu_connect,
self.nemu_folder, self.instance_id
)
else:
connect_id = self.lib.nemu_connect(self.nemu_folder, self.instance_id)
if connect_id == 0:
raise NemuIpcError(
'Connection failed, please check if nemu_folder is correct and emulator is running'
@ -257,7 +266,7 @@ class NemuIpcImpl:
if self.connect_id == 0:
return
self.ev_run_sync(
self.run_func(
self.lib.nemu_disconnect,
self.connect_id
)
@ -275,58 +284,32 @@ class NemuIpcImpl:
def __exit__(self, exc_type, exc_val, exc_tb):
self.disconnect()
if has_cached_property(self, '_ev'):
self._ev.close()
del_cached_property(self, '_ev')
if has_cached_property(self, '_pool'):
self._pool.shutdown(wait=False)
del_cached_property(self, '_pool')
@cached_property
def _ev(self):
return asyncio.new_event_loop()
@cached_property
def _pool(self):
from concurrent.futures import ThreadPoolExecutor
return ThreadPoolExecutor(
max_workers=1,
thread_name_prefix='NemuIpc',
)
async def ev_run_async(self, func, *args, timeout=0.15, **kwargs):
@staticmethod
def run_func(func, *args, on_thread=True, timeout=0.5):
"""
Args:
func: Sync function to call
*args:
on_thread: True to run func on a separated thread
timeout:
**kwargs:
Raises:
asyncio.TimeoutError: If function call timeout
"""
func_wrapped = partial(func, *args, **kwargs)
# Increased timeout for slow PCs
# Default screenshot interval is 0.2s, so a 0.15s timeout would have a fast retry without extra time costs
result = await asyncio.wait_for(self._ev.run_in_executor(self._pool, func_wrapped), timeout=timeout)
return result
def ev_run_sync(self, func, *args, **kwargs):
"""
Args:
func: Sync function to call
*args:
**kwargs:
Raises:
asyncio.TimeoutError: If function call timeout
JobTimeout: If function call timeout
NemuIpcIncompatible:
NemuIpcError
"""
result = self._ev.run_until_complete(self.ev_run_async(func, *args, **kwargs))
if on_thread:
# nemu_ipc may timeout sometimes, so we run it on a separated thread
job = WORKER_POOL.start_thread_soon(func, *args)
result = job.get_or_kill(timeout)
else:
result = func(*args)
err = False
if func.__name__ == 'nemu_connect':
if func.__name__ == '_screenshot':
pass
elif func.__name__ == 'nemu_connect':
if result == 0:
err = True
else:
@ -336,11 +319,11 @@ class NemuIpcImpl:
if err:
logger.warning(f'Failed to call {func.__name__}, result={result}')
with CaptureNemuIpc():
result = self._ev.run_until_complete(self.ev_run_async(func, *args, **kwargs))
func(*args)
return result
def get_resolution(self):
def get_resolution(self, on_thread=True):
"""
Get emulator resolution, `self.width` and `self.height` will be set
"""
@ -351,18 +334,42 @@ class NemuIpcImpl:
height_ptr = ctypes.pointer(ctypes.c_int(0))
nullptr = ctypes.POINTER(ctypes.c_int)()
ret = self.ev_run_sync(
ret = self.run_func(
self.lib.nemu_capture_display,
self.connect_id, self.display_id, 0, width_ptr, height_ptr, nullptr
self.connect_id, self.display_id, 0, width_ptr, height_ptr, nullptr,
on_thread=on_thread
)
if ret > 0:
raise NemuIpcError('nemu_capture_display failed during get_resolution()')
self.width = width_ptr.contents.value
self.height = height_ptr.contents.value
def _screenshot(self):
if self.connect_id == 0:
self.connect(on_thread=False)
self.get_resolution(on_thread=False)
width_ptr = ctypes.pointer(ctypes.c_int(self.width))
height_ptr = ctypes.pointer(ctypes.c_int(self.height))
length = self.width * self.height * 4
pixels_pointer = ctypes.pointer((ctypes.c_ubyte * length)())
ret = self.lib.nemu_capture_display(
self.connect_id, self.display_id, length, width_ptr, height_ptr, pixels_pointer,
)
if ret > 0:
raise NemuIpcError('nemu_capture_display failed during screenshot()')
# Return pixels_pointer instead of image to avoid passing image through jobs
return pixels_pointer
@retry
def screenshot(self, timeout=0.15):
def screenshot(self, timeout=0.5):
"""
Args:
timeout: Timout in seconds to call nemu_ipc
Will be dynamically extended by `@retry`
Returns:
np.ndarray: Image array in RGBA color space
Note that image is upside down
@ -370,20 +377,7 @@ class NemuIpcImpl:
if self.connect_id == 0:
self.connect()
self.get_resolution()
width_ptr = ctypes.pointer(ctypes.c_int(self.width))
height_ptr = ctypes.pointer(ctypes.c_int(self.height))
length = self.width * self.height * 4
pixels_pointer = ctypes.pointer((ctypes.c_ubyte * length)())
ret = self.ev_run_sync(
self.lib.nemu_capture_display,
self.connect_id, self.display_id, length, width_ptr, height_ptr, pixels_pointer,
timeout=timeout,
)
if ret > 0:
raise NemuIpcError('nemu_capture_display failed during screenshot()')
pixels_pointer = self.run_func(self._screenshot, timeout=timeout)
# image = np.ctypeslib.as_array(pixels_pointer, shape=(self.height, self.width, 4))
image = np.ctypeslib.as_array(pixels_pointer.contents).reshape((self.height, self.width, 4))
@ -413,7 +407,7 @@ class NemuIpcImpl:
x, y = self.convert_xy(x, y)
ret = self.ev_run_sync(
ret = self.run_func(
self.lib.nemu_input_event_touch_down,
self.connect_id, self.display_id, x, y
)
@ -428,7 +422,7 @@ class NemuIpcImpl:
if self.connect_id == 0:
self.connect()
ret = self.ev_run_sync(
ret = self.run_func(
self.lib.nemu_input_event_touch_up,
self.connect_id, self.display_id
)
@ -578,8 +572,7 @@ class NemuIpc(Platform):
logger.info('nemu_ipc released')
def screenshot_nemu_ipc(self):
timeout = max(self._screenshot_interval.limit - 0.01, 0.15)
image = self.nemu_ipc.screenshot(timeout=timeout)
image = self.nemu_ipc.screenshot()
image = cv2.cvtColor(image, cv2.COLOR_BGRA2BGR)
cv2.flip(image, 0, dst=image)

View File

@ -0,0 +1,392 @@
import abc
import ctypes
import subprocess
from collections import deque
from functools import wraps
from itertools import count
from threading import Lock, Thread
from typing import Callable, Dict, Generic, List, NoReturn, Optional, TypeVar, Union
from module.logger import logger
ValueT = TypeVar("ValueT", covariant=True)
ResultT = TypeVar("ResultT")
def remove_tb_frames(exc: BaseException, n: int) -> BaseException:
tb = exc.__traceback__
for _ in range(n):
assert tb is not None
tb = tb.tb_next
return exc.with_traceback(tb)
class Outcome(abc.ABC, Generic[ValueT]):
@abc.abstractmethod
def unwrap(self) -> ValueT:
"""Return or raise the contained value or exception.
These two lines of code are equivalent::
x = fn(*args)
x = outcome.capture(fn, *args).unwrap()
"""
pass
class Value(Outcome[ValueT], Generic[ValueT]):
"""Concrete :class:`Outcome` subclass representing a regular value.
"""
__slots__ = ('value',)
def __init__(self, value: ValueT):
self.value: ValueT = value
def __repr__(self) -> str:
return f'Value({self.value!r})'
def unwrap(self) -> ValueT:
return self.value
class Error(Outcome[NoReturn]):
"""Concrete :class:`Outcome` subclass representing a raised exception.
"""
__slots__ = ('error',)
def __init__(self, error: BaseException):
self.error: BaseException = error
def __repr__(self) -> str:
return f'Error({self.error!r})'
def unwrap(self) -> NoReturn:
# Tracebacks show the 'raise' line below out of context, so let's give
# this variable a name that makes sense out of context.
captured_error = self.error
try:
raise captured_error
finally:
# We want to avoid creating a reference cycle here. Python does
# collect cycles just fine, so it wouldn't be the end of the world
# if we did create a cycle, but the cyclic garbage collector adds
# latency to Python programs, and the more cycles you create, the
# more often it runs, so it's nicer to avoid creating them in the
# first place. For more details see:
#
# https://github.com/python-trio/trio/issues/1770
#
# In particular, by deleting this local variables from the 'unwrap'
# methods frame, we avoid the 'captured_error' object's
# __traceback__ from indirectly referencing 'captured_error'.
del captured_error, self
def capture(
sync_fn: Callable[..., ResultT],
*args,
**kwargs,
) -> Union[Value[ResultT], Error]:
"""Run ``sync_fn(*args, **kwargs)`` and capture the result.
Returns:
Either a :class:`Value` or :class:`Error` as appropriate.
"""
try:
return Value(sync_fn(*args, **kwargs))
except BaseException as exc:
exc = remove_tb_frames(exc, 1)
return Error(exc)
class JobError(Exception):
pass
class JobTimeout(Exception):
pass
class _JobKill(Exception):
pass
class Job(Generic[ResultT]):
"""
A simple queue, copied from queue.Queue()
Faster but can only put() once and get() once.
"""
# __slots__ = ('worker', 'func_args_kwargs', 'queue', 'mutex', 'finished')
def __init__(self, worker, func_args_kwargs):
# Having attribute "worker" means job is ongoing
# Not having attribute "worker" means job is finished or killed
self.worker = worker
self.func_args_kwargs = func_args_kwargs
self.queue: deque[Outcome[ResultT]] = deque()
self.put_lock = Lock()
self.notify_get = Lock()
self.notify_get.acquire()
def __repr__(self):
return f'Job({self.func_args_kwargs})'
def get(self) -> ResultT:
"""
Get job result or job error
"""
self.notify_get.acquire()
# Return job result or raise job error
item = self.queue.popleft()
return item.unwrap()
def get_or_kill(self, timeout) -> ResultT:
"""
Try to get result within given seconds,
if success, return job result or job error
if failed, kill job and raise JobTimeout
Note that JobTimeout may not raises immediately if POOL_SIZE reached
"""
if self.notify_get.acquire(timeout=timeout):
# Return job result or raise job error
item = self.queue.popleft()
return item.unwrap()
else:
self._kill()
raise JobTimeout
def _kill(self):
with self.put_lock:
try:
worker = self.worker
except AttributeError:
# Trying to kill a finished job, do nothing
return
worker.kill()
del self.worker
name_counter = count()
class WorkerThread:
def __init__(self, thread_pool: "WorkerPool") -> None:
self.job: Optional[Job] = None
self.thread_pool = thread_pool
# This Lock is used in an unconventional way.
#
# "Unlocked" means we have a pending job that's been assigned to us;
# "locked" means that we don't.
#
# Initially we have no job, so it starts out in locked state.
self.worker_lock = Lock()
self.worker_lock.acquire()
self.default_name = f"Alasio thread {next(name_counter)}"
self.thread = Thread(target=self._work, name=self.default_name, daemon=True)
self.thread.start()
def __repr__(self):
return f'{self.__class__.__name__}({self.default_name})'
def _handle_job(self) -> None:
# Convert to local variable, `self.job` will be another
# value if new job is assigned
job = self.job
del self.job
func, args, kwargs = job.func_args_kwargs
result = capture(func, *args, **kwargs)
# Tell the cache that we're available to be assigned a new
# job. We do this *before* calling 'deliver', so that if
# 'deliver' triggers a new job, it can be assigned to us
# instead of spawning a new thread.
self.thread_pool.idle_workers[self] = None
self.thread_pool.release_full_lock()
# Deliver
if isinstance(result, Error) and isinstance(result.error, _JobKill):
# Job killed
pass
else:
# Job finished, putin result and notify
with job.put_lock:
job.queue.append(result)
del job.worker
job.notify_get.release()
def _work(self) -> None:
while True:
if self.worker_lock.acquire(timeout=WorkerPool.IDLE_TIMEOUT):
# We got a job
self._handle_job()
else:
# Timeout acquiring lock, so we can probably exit. But,
# there's a race condition: we might be assigned a job *just*
# as we're about to exit. So we have to check.
try:
del self.thread_pool.idle_workers[self]
except KeyError:
# Someone else removed us from the idle worker queue, so
# they must be in the process of assigning us a job - loop
# around and wait for it.
self.thread_pool.release_full_lock()
continue
else:
# We successfully removed ourselves from the idle
# worker queue, so no more jobs are incoming; it's safe to
# exit.
del self.thread_pool.all_workers[self]
self.thread_pool.release_full_lock()
return
def kill(self):
"""
Yes, it's unsafe to kill a thread, but what else can you do
if a single job function get blocked.
This method should be protected by `job.put_lock` to prevent
race condition with `_handle_job()`.
Returns:
bool: If success to kill the thread
"""
# Send SystemExit to thread
thread_id = ctypes.c_long(self.thread.ident)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
thread_id, ctypes.py_object(_JobKill))
if res <= 1:
return True
else:
try:
job = self.job
except AttributeError:
job = None
logger.error(f'Failed to kill thread {self.thread.ident} from job {job}')
# Failed to send SystemExit, reset it
ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, 0)
return False
class WorkerPool:
"""
A thread pool imitating trio.to_thread.start_thread_soon()
https://github.com/python-trio/trio/issues/6
"""
# Pool has 40 threads at max.
POOL_SIZE = 40
# Thread exits after 10s idling.
IDLE_TIMEOUT = 10
def __init__(self) -> None:
self.idle_workers: Dict[WorkerThread, None] = {}
self.all_workers: Dict[WorkerThread, None] = {}
self.notify_worker = Lock()
self.notify_worker.acquire()
self.notify_pool = Lock()
self.notify_pool.acquire()
def release_full_lock(self):
"""
Call this method if worker finished any job, or exited, or get killed.
When pool full,
Pool tells all workers: any worker finishes his job notify me.
`self.notify_worker.release()`
Then the pool blocks himself.
`self.notify_pool.acquire()`
The fastest worker, and also the only worker, receives the message,
`if self.notify_worker.acquire(blocking=False):`
Worker tells the pool, new pool slot is ready, you are ready to go.
`self.notify_pool.release()`
"""
if self.notify_worker.acquire(blocking=False):
self.notify_pool.release()
def _get_thread_worker(self) -> WorkerThread:
try:
worker, _ = self.idle_workers.popitem()
return worker
except KeyError:
pass
# Wait if reached max thread
if len(self.all_workers) >= WorkerPool.POOL_SIZE:
# See release_full_lock()
self.notify_worker.release()
self.notify_pool.acquire()
# A worker just idle
try:
worker, _ = self.idle_workers.popitem()
return worker
except KeyError:
pass
# A worker just exited
# if len(self.all_workers) < WorkerPool.MAX_WORKER:
# break
# Create new worker
worker = WorkerThread(self)
# logger.info(f'New worker thread: {worker.default_name}')
self.all_workers[worker] = None
return worker
def start_thread_soon(
self,
func: Callable[..., ResultT],
*args,
**kwargs,
) -> Job[ResultT]:
worker = self._get_thread_worker()
job = Job(worker=worker, func_args_kwargs=(func, args, kwargs))
worker.job = job
worker.worker_lock.release()
return job
def run_on_thread(self, func: Callable[..., ResultT]) -> Callable[..., Job[ResultT]]:
@wraps(func)
def thread_wrapper(*args, **kwargs) -> Job[ResultT]:
return self.start_thread_soon(func, *args, **kwargs)
return thread_wrapper
@staticmethod
def _subprocess_execute(cmd: List[str], timeout=10) -> bytes:
logger.info(f'Execute: {cmd}')
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=False)
try:
stdout, stderr = process.communicate(timeout=timeout)
except subprocess.TimeoutExpired:
process.kill()
stdout, stderr = process.communicate()
logger.warning(f'TimeoutExpired when calling {cmd}, stdout={stdout}, stderr={stderr}')
return stdout
def start_cmd_soon(
self,
cmd: List[str],
timeout=10
) -> Job[bytes]:
worker = self._get_thread_worker()
job = Job(worker=worker, func_args_kwargs=(
self._subprocess_execute, (cmd,), {'timeout': timeout}
))
worker.job = job
worker.worker_lock.release()
return job
WORKER_POOL = WorkerPool()

View File

@ -26,7 +26,7 @@ def retry(func):
for _ in range(RETRY_TRIES):
try:
if callable(init):
retry_sleep(_)
time.sleep(retry_sleep(_))
init()
return func(self, *args, **kwargs)
# Can't handle

View File

@ -1,3 +1,4 @@
import time
import typing as t
from dataclasses import dataclass
from functools import wraps
@ -28,7 +29,7 @@ def retry(func):
for _ in range(RETRY_TRIES):
try:
if callable(init):
retry_sleep(_)
time.sleep(retry_sleep(_))
init()
return func(self, *args, **kwargs)
# Can't handle

View File

@ -154,16 +154,16 @@ class ImageTruncated(Exception):
def retry_sleep(trial):
# First trial
if trial == 0:
pass
return 0
# Failed once, fast retry
elif trial == 1:
pass
return 0
# Failed twice
elif trial == 2:
time.sleep(1)
return 1
# Failed more
else:
time.sleep(RETRY_DELAY)
return RETRY_DELAY
def handle_adb_error(e):

View File

@ -1,4 +1,5 @@
import re
import time
from functools import wraps
from adbutils.errors import AdbError
@ -21,7 +22,7 @@ def retry(func):
for _ in range(RETRY_TRIES):
try:
if callable(init):
retry_sleep(_)
time.sleep(retry_sleep(_))
init()
return func(self, *args, **kwargs)
# Can't handle

View File

@ -29,7 +29,9 @@ class ExerciseCombat(HpDaemon, OpponentChoose, ExerciseEquipment, Combat):
continue
# End
if self.is_combat_executing():
pause = self.is_combat_executing()
if pause:
logger.attr('BattleUI', pause)
break
def _combat_execute(self):

View File

@ -1,7 +1,7 @@
from module.base.base import ModuleBase
from module.base.timer import Timer
from module.base.utils import color_bar_percentage
from module.combat_ui.assets import PAUSE, PAUSE_Iridescent_Fantasy, PAUSE_New
from module.combat_ui.assets import PAUSE, PAUSE_Christmas, PAUSE_Cyber, PAUSE_Iridescent_Fantasy, PAUSE_Neon, PAUSE_New
from module.exercise.assets import *
from module.logger import logger
@ -61,7 +61,13 @@ class HpDaemon(ModuleBase):
if pause == PAUSE:
self.attacker_hp = self._calculate_hp(image, area=ATTACKER_HP_AREA.area, reverse=True)
self.defender_hp = self._calculate_hp(image, area=DEFENDER_HP_AREA.area, reverse=False)
elif pause in [PAUSE_New, PAUSE_Iridescent_Fantasy]:
elif pause in [
PAUSE_New,
PAUSE_Iridescent_Fantasy,
PAUSE_Neon,
PAUSE_Christmas,
PAUSE_Cyber,
]:
self.attacker_hp = self._calculate_hp(image, area=ATTACKER_HP_AREA_New.area, reverse=True)
self.defender_hp = self._calculate_hp(image, area=DEFENDER_HP_AREA_New.area, reverse=True)
else:

View File

@ -477,7 +477,9 @@ class GuildOperations(GuildBase):
continue
# End
if az.is_combat_executing():
pause = az.is_combat_executing()
if pause:
logger.attr('BattleUI', pause)
return True
def _guild_operations_boss_combat(self):

View File

@ -16,6 +16,7 @@ AUTO_SEARCH_SET_MOB = Button(area={'cn': (496, 207, 515, 226), 'en': (477, 208,
AUTO_SEARCH_SET_STANDBY = Button(area={'cn': (932, 207, 950, 226), 'en': (813, 322, 832, 340), 'jp': (932, 207, 950, 226), 'tw': (932, 207, 950, 226)}, color={'cn': (30, 30, 37), 'en': (41, 41, 42), 'jp': (37, 36, 37), 'tw': (37, 38, 39)}, button={'cn': (932, 207, 950, 226), 'en': (813, 322, 832, 340), 'jp': (932, 207, 950, 226), 'tw': (932, 207, 950, 226)}, file={'cn': './assets/cn/handler/AUTO_SEARCH_SET_STANDBY.png', 'en': './assets/en/handler/AUTO_SEARCH_SET_STANDBY.png', 'jp': './assets/jp/handler/AUTO_SEARCH_SET_STANDBY.png', 'tw': './assets/tw/handler/AUTO_SEARCH_SET_STANDBY.png'})
AUTO_SEARCH_SET_SUB_AUTO = Button(area={'cn': (578, 457, 597, 476), 'en': (577, 460, 595, 478), 'jp': (578, 457, 597, 476), 'tw': (578, 457, 597, 476)}, color={'cn': (69, 96, 52), 'en': (41, 39, 41), 'jp': (39, 37, 39), 'tw': (73, 100, 53)}, button={'cn': (578, 457, 597, 476), 'en': (577, 460, 595, 478), 'jp': (578, 457, 597, 476), 'tw': (578, 457, 597, 476)}, file={'cn': './assets/cn/handler/AUTO_SEARCH_SET_SUB_AUTO.png', 'en': './assets/en/handler/AUTO_SEARCH_SET_SUB_AUTO.png', 'jp': './assets/jp/handler/AUTO_SEARCH_SET_SUB_AUTO.png', 'tw': './assets/tw/handler/AUTO_SEARCH_SET_SUB_AUTO.png'})
AUTO_SEARCH_SET_SUB_STANDBY = Button(area={'cn': (894, 457, 913, 476), 'en': (855, 460, 874, 478), 'jp': (894, 457, 913, 476), 'tw': (894, 457, 913, 476)}, color={'cn': (32, 31, 34), 'en': (75, 104, 54), 'jp': (73, 100, 52), 'tw': (35, 36, 38)}, button={'cn': (894, 457, 913, 476), 'en': (855, 460, 874, 478), 'jp': (894, 457, 913, 476), 'tw': (894, 457, 913, 476)}, file={'cn': './assets/cn/handler/AUTO_SEARCH_SET_SUB_STANDBY.png', 'en': './assets/en/handler/AUTO_SEARCH_SET_SUB_STANDBY.png', 'jp': './assets/jp/handler/AUTO_SEARCH_SET_SUB_STANDBY.png', 'tw': './assets/tw/handler/AUTO_SEARCH_SET_SUB_STANDBY.png'})
BATTLE_PASS_NEW_SEASON = Button(area={'cn': (942, 529, 962, 550), 'en': (942, 529, 962, 550), 'jp': (942, 529, 962, 550), 'tw': (942, 529, 962, 550)}, color={'cn': (134, 137, 140), 'en': (134, 137, 140), 'jp': (134, 137, 140), 'tw': (134, 137, 140)}, button={'cn': (942, 529, 962, 550), 'en': (942, 529, 962, 550), 'jp': (942, 529, 962, 550), 'tw': (942, 529, 962, 550)}, file={'cn': './assets/cn/handler/BATTLE_PASS_NEW_SEASON.png', 'en': './assets/cn/handler/BATTLE_PASS_NEW_SEASON.png', 'jp': './assets/cn/handler/BATTLE_PASS_NEW_SEASON.png', 'tw': './assets/cn/handler/BATTLE_PASS_NEW_SEASON.png'})
BATTLE_PASS_NOTICE = Button(area={'cn': (554, 483, 726, 540), 'en': (716, 488, 869, 533), 'jp': (554, 483, 726, 540), 'tw': (554, 483, 726, 540)}, color={'cn': (107, 152, 207), 'en': (89, 138, 201), 'jp': (107, 152, 207), 'tw': (107, 152, 207)}, button={'cn': (863, 173, 929, 217), 'en': (863, 173, 929, 217), 'jp': (863, 173, 929, 217), 'tw': (863, 173, 929, 217)}, file={'cn': './assets/cn/handler/BATTLE_PASS_NOTICE.png', 'en': './assets/en/handler/BATTLE_PASS_NOTICE.png', 'jp': './assets/cn/handler/BATTLE_PASS_NOTICE.png', 'tw': './assets/cn/handler/BATTLE_PASS_NOTICE.png'})
BOOK_BOX_AUTO = Button(area={'cn': (737, 614, 756, 631), 'en': (739, 615, 754, 630), 'jp': (808, 614, 825, 631), 'tw': (807, 613, 826, 632)}, color={'cn': (67, 74, 82), 'en': (57, 64, 74), 'jp': (59, 62, 68), 'tw': (62, 66, 71)}, button={'cn': (737, 614, 756, 631), 'en': (739, 615, 754, 630), 'jp': (808, 614, 825, 631), 'tw': (807, 613, 826, 632)}, file={'cn': './assets/cn/handler/BOOK_BOX_AUTO.png', 'en': './assets/en/handler/BOOK_BOX_AUTO.png', 'jp': './assets/jp/handler/BOOK_BOX_AUTO.png', 'tw': './assets/tw/handler/BOOK_BOX_AUTO.png'})
BOOK_BOX_PREP = Button(area={'cn': (922, 600, 940, 617), 'en': (906, 604, 922, 619), 'jp': (921, 602, 939, 621), 'tw': (906, 604, 922, 619)}, color={'cn': (88, 122, 68), 'en': (29, 32, 29), 'jp': (35, 35, 36), 'tw': (29, 32, 29)}, button={'cn': (922, 600, 940, 617), 'en': (906, 604, 922, 619), 'jp': (921, 602, 939, 621), 'tw': (906, 604, 922, 619)}, file={'cn': './assets/cn/handler/BOOK_BOX_PREP.png', 'en': './assets/en/handler/BOOK_BOX_PREP.png', 'jp': './assets/jp/handler/BOOK_BOX_PREP.png', 'tw': './assets/tw/handler/BOOK_BOX_PREP.png'})

View File

@ -76,7 +76,9 @@ class Combat(Combat_, MapEventHandler):
continue
# End
if self.is_combat_executing():
pause = self.is_combat_executing()
if pause:
logger.attr('BattleUI', pause)
# if emotion_reduce:
# self.emotion.reduce(fleet_index)
break
@ -235,7 +237,9 @@ class Combat(Combat_, MapEventHandler):
# End
if self.handle_os_auto_search_map_option(drop=drop):
break
if self.is_combat_executing():
pause = self.is_combat_executing()
if pause:
logger.attr('BattleUI', pause)
break
if self.is_in_map():
break

View File

@ -238,7 +238,9 @@ class Raid(MapOperation, RaidCombat, CampaignEvent):
continue
# End
if self.is_combat_executing():
pause = self.is_combat_executing()
if pause:
logger.attr('BattleUI', pause)
if emotion_reduce:
self.emotion.reduce(fleet_index)
break

View File

@ -5,10 +5,9 @@ from module.combat.assets import GET_ITEMS_1, GET_ITEMS_2, GET_SHIP
from module.exception import (GameNotRunningError, GamePageUnknownError,
RequestHumanTakeover)
from module.exercise.assets import EXERCISE_PREPARATION
from module.freebies.assets import PURCHASE_POPUP
from module.handler.assets import (AUTO_SEARCH_MENU_EXIT, BATTLE_PASS_NOTICE, GAME_TIPS, LOGIN_ANNOUNCE,
LOGIN_ANNOUNCE_2, LOGIN_CHECK, LOGIN_RETURN_SIGN, MAINTENANCE_ANNOUNCE,
MONTHLY_PASS_NOTICE)
from module.handler.assets import (AUTO_SEARCH_MENU_EXIT, BATTLE_PASS_NEW_SEASON, BATTLE_PASS_NOTICE, GAME_TIPS,
LOGIN_ANNOUNCE, LOGIN_ANNOUNCE_2, LOGIN_CHECK, LOGIN_RETURN_SIGN,
MAINTENANCE_ANNOUNCE, MONTHLY_PASS_NOTICE)
from module.handler.info_handler import InfoHandler
from module.logger import logger
from module.map.assets import (FLEET_PREPARATION, MAP_PREPARATION,
@ -388,7 +387,14 @@ class UI(InfoHandler):
# Battle pass is about to expire and player has uncollected battle pass rewards
if self.appear_then_click(BATTLE_PASS_NOTICE, offset=(30, 30), interval=3):
return True
if self.appear_then_click(PURCHASE_POPUP, offset=(44, -77, 84, -37), interval=3):
# Popup that advertise you to buy battle pass
# 2024.12.19, PURCHASE_POPUP at main page becomes BATTLE_PASS_NEW_SEASON
# if self.appear_then_click(PURCHASE_POPUP, offset=(44, -77, 84, -37), interval=3):
# return True
# Popup that tells you new battle pass season is aired
if self.appear(BATTLE_PASS_NEW_SEASON, offset=(30, 30), interval=3):
logger.info(f'UI additional: {BATTLE_PASS_NEW_SEASON} -> {BACK_ARROW}')
self.device.click(BACK_ARROW)
return True
# Item expired offset=(37, 72), skin expired, offset=(24, 68)
if self.handle_popup_single(offset=(-6, 48, 54, 88), name='ITEM_EXPIRED'):