import re import cv2 from module.base.timer import Timer from module.exception import ScriptError from module.logger import logger from module.ocr.ocr import Digit, DigitCounter from module.retire.retirement import Retirement from module.shop.assets import * from module.shop.base import ShopBase from module.shop.shop_select_globals import * from module.ui.assets import BACK_ARROW class StockCounter(DigitCounter): def pre_process(self, image): # Convert to gray scale r, g, b = cv2.split(image) image = cv2.max(cv2.max(r, g), b) return 255 - image def after_process(self, result): result = super().after_process(result) if re.match(r'^\d\d$', result): # 55 -> 5/5 new = f'{result[0]}/{result[1]}' logger.info(f'StockCounter result {result} is revised to {new}') result = new if re.match(r'^\d{4,}$', result): # 1515 -> 15/15 new = f'{result[0:2]}/{result[2:4]}' logger.info(f'StockCounter result {result} is revised to {new}') result = new return result SHOP_SELECT_PR = [SHOP_SELECT_PR1, SHOP_SELECT_PR2, SHOP_SELECT_PR3] OCR_SHOP_SELECT_STOCK = StockCounter(SHOP_SELECT_STOCK) OCR_SHOP_AMOUNT = Digit(SHOP_AMOUNT, letter=(239, 239, 239), name='OCR_SHOP_AMOUNT') class ShopClerk(ShopBase, Retirement): def shop_get_choice(self, item): """ Gets the configuration saved in for the appropriate variant shop i.e. GuildShop_X Args: item (Item): Returns: str Raises: ScriptError """ group = item.group if group == 'pr': postfix = None for _ in range(3): if _: self.device.sleep((0.3, 0.5)) self.device.screenshot() for idx, btn in enumerate(SHOP_SELECT_PR): if self.appear(btn, offset=(20, 20)): postfix = f'{idx + 1}' break if postfix is not None: break logger.warning('Failed to detect PR series, ' 'app may be lagging or frozen') else: postfix = f'_{item.tier.upper()}' ugroup = group.upper() class_name = self.__class__.__name__ try: return getattr(self.config, f'{class_name}_{ugroup}{postfix}') except Exception: logger.critical(f'No configuration with name ' f'\'{class_name}_{ugroup}{postfix}\'') raise def shop_get_select(self, item): """ Gets the appropriate select grid button Args: item (Item): Returns: Button Raises: ScriptError """ # Item group must belong in SELECT_ITEM_INFO_MAP group = item.group if group not in SELECT_ITEM_INFO_MAP: logger.critical(f'Unexpected item group \'{group}\'; ' f'expected one of {SELECT_ITEM_INFO_MAP.keys()}') raise ScriptError # Get configured choice for item choice = self.shop_get_choice(item) # Get appropriate select button for click try: item_info = SELECT_ITEM_INFO_MAP[group] index = item_info['choices'][choice] if group == 'pr': for idx, btn in enumerate(SHOP_SELECT_PR): if self.appear(btn, offset=(20, 20)): series_key = f's{idx + 1}' return item_info['grid'][series_key].buttons[index] else: return item_info['grid'].buttons[index] except Exception: logger.critical(f'SELECT_ITEM_INFO_MAP may be malformed; ' f'item group \'{group}\' entry is compromised') raise ScriptError def shop_buy_select_execute(self, item): """ Args: item (Item): Returns: bool: """ # Search for appropriate select grid button for item select = self.shop_get_select(item) # Get displayed stock limit; varies between shops # If read 0, then warn and exit as cannot safely buy timeout = Timer(5, count=10).start() skip_first_screenshot = True limit = 0 while 1: if timeout.reached(): break if skip_first_screenshot: skip_first_screenshot = False else: self.device.screenshot() _, _, limit = OCR_SHOP_SELECT_STOCK.ocr(self.device.image) if limit: break if not limit: logger.critical(f'{item.name}\'s stock count cannot be ' 'extracted. Advised to re-cut the asset ' 'OCR_SHOP_SELECT_STOCK') raise ScriptError # Click in intervals until plus/minus are onscreen click_timer = Timer(3, count=6) select_offset = (500, 400) while 1: if click_timer.reached(): self.device.click(select) click_timer.reset() # Scan for plus/minus locations; searching within # offset will update the click position automatically self.device.screenshot() if self.appear(SELECT_MINUS, offset=select_offset) and self.appear(SELECT_PLUS, offset=select_offset): break else: continue # Total number to purchase altogether total = int(self._currency // item.price) diff = limit - total if diff > 0: limit = total # Alias OCR_SHOP_SELECT_STOCK to adapt with # ui_ensure_index; prevent overbuying when # out of stock; item.price may still evaluate # incorrectly def shop_buy_select_ensure_index(image): current, remain, _ = OCR_SHOP_SELECT_STOCK.ocr(image) if not current: group_case = item.group.title() if len(item.group) > 2 else item.group.upper() logger.info(f'{group_case}(s) out of stock; exit to prevent overbuying') return limit return remain self.ui_ensure_index(limit, letter=shop_buy_select_ensure_index, prev_button=SELECT_MINUS, next_button=SELECT_PLUS, skip_first_screenshot=True) self.device.click(SHOP_BUY_CONFIRM_SELECT) return True def shop_buy_amount_execute(self, item): """ Args: item (Item): Returns: bool: Raises: ScriptError """ index_offset = (40, 20) # In case either -/+ shift position, use # shipyard ocr trick to accurately parse self.appear(AMOUNT_MINUS, offset=index_offset) self.appear(AMOUNT_PLUS, offset=index_offset) area = OCR_SHOP_AMOUNT.buttons[0] OCR_SHOP_AMOUNT.buttons = [(AMOUNT_MINUS.button[2] + 3, area[1], AMOUNT_PLUS.button[0] - 3, area[3])] # Total number that can be purchased # altogether based on clicking max # Needs small delay for stable image self.appear_then_click(AMOUNT_MAX, offset=(50, 50)) self.device.sleep((0.3, 0.5)) timeout = Timer(5, count=10).start() limit = 0 while 1: if timeout.reached(): break self.device.screenshot() limit = OCR_SHOP_AMOUNT.ocr(self.device.image) if limit: break if not limit: logger.critical('OCR_SHOP_AMOUNT resulted in zero (0); ' 'asset may be compromised') raise ScriptError # Adjust purchase amount if needed total = int(self._currency // item.price) diff = limit - total if diff > 0: limit = total self.ui_ensure_index(limit, letter=OCR_SHOP_AMOUNT, prev_button=AMOUNT_MINUS, next_button=AMOUNT_PLUS, skip_first_screenshot=True) self.device.click(SHOP_BUY_CONFIRM_AMOUNT) return True def shop_interval_clear(self): """ Override in variant class if need to clear particular asset intervals """ self.interval_clear(BACK_ARROW) self.interval_clear(SHOP_BUY_CONFIRM) def shop_buy_handle(self, item): """ Override in variant class for specific buy handle actions Args: item (Item): Returns: bool: """ return False def shop_buy_execute(self, item, skip_first_screenshot=True): """ Args: item: Item to check skip_first_screenshot: bool Returns: None: exits appropriately therefore successful """ success = False self.shop_interval_clear() while 1: if skip_first_screenshot: skip_first_screenshot = False else: self.device.screenshot() if self.appear(BACK_ARROW, offset=(30, 30), interval=3): self.device.click(item) continue if self.appear_then_click(SHOP_BUY_CONFIRM, offset=(20, 20), interval=3): self.interval_reset(BACK_ARROW) continue if self.shop_buy_handle(item): self.interval_reset(BACK_ARROW) continue if self.handle_retirement(): self.interval_reset(BACK_ARROW) continue if self.shop_obstruct_handle(): self.interval_reset(BACK_ARROW) success = True continue if self.info_bar_count(): self.interval_reset(BACK_ARROW) success = True continue # End if success and self.appear(BACK_ARROW, offset=(30, 30)): break def shop_buy(self): """ Returns: bool: If success, and able to continue. """ for _ in range(12): logger.hr('Shop buy', level=2) # Get first for innate delay to ocr # shop currency for accurate parse items = self.shop_get_items() self.shop_currency() if self._currency <= 0: logger.warning(f'Current funds: {self._currency}, stopped') return False item = self.shop_get_item_to_buy(items) if item is None: logger.info('Shop buy finished') return True else: self.shop_buy_execute(item) continue logger.warning('Too many items to buy, stopped') return True