diff --git a/bot/__init__.py b/bot/__init__.py index 92ef9d8c..63c8f5df 100644 --- a/bot/__init__.py +++ b/bot/__init__.py @@ -1,3 +1,4 @@ +from sys import exit from apscheduler.schedulers.asyncio import AsyncIOScheduler from aria2p import API as ariaAPI, Client as ariaClient from asyncio import Lock, get_event_loop @@ -24,6 +25,7 @@ from subprocess import Popen, run from time import time from tzlocal import get_localzone from uvloop import install +from concurrent.futures import ThreadPoolExecutor # from faulthandler import enable as faulthandler_enable # faulthandler_enable() @@ -40,6 +42,8 @@ getLogger("pymongo").setLevel(ERROR) botStartTime = time() bot_loop = get_event_loop() +THREADPOOL = ThreadPoolExecutor(max_workers=99999) +bot_loop.set_default_executor(THREADPOOL) basicConfig( format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", @@ -539,7 +543,7 @@ bot = tgClient( TELEGRAM_API, TELEGRAM_HASH, bot_token=BOT_TOKEN, - workers=1000, + workers=99999, parse_mode=enums.ParseMode.HTML, max_concurrent_transmissions=10, ).start() @@ -556,7 +560,9 @@ def get_qb_options(): for k in list(qbit_options.keys()): if k.startswith("rss"): del qbit_options[k] + qbittorrent_client.app_set_preferences({"web_ui_password": "mltbmltb"}) else: + qbit_options["web_ui_password"] = "mltbmltb" qb_opt = {**qbit_options} qbittorrent_client.app_set_preferences(qb_opt) diff --git a/bot/__main__.py b/bot/__main__.py index dbbd3aac..e7b8c313 100644 --- a/bot/__main__.py +++ b/bot/__main__.py @@ -264,31 +264,44 @@ async def main(): ) create_help_buttons() - bot.add_handler(MessageHandler(start, filters=command(BotCommands.StartCommand))) bot.add_handler( MessageHandler( - log, filters=command(BotCommands.LogCommand) & CustomFilters.sudo + start, filters=command(BotCommands.StartCommand, case_sensitive=True) ) ) bot.add_handler( MessageHandler( - restart, filters=command(BotCommands.RestartCommand) & CustomFilters.sudo + log, + filters=command(BotCommands.LogCommand, case_sensitive=True) + & CustomFilters.sudo, ) ) bot.add_handler( MessageHandler( - ping, filters=command(BotCommands.PingCommand) & CustomFilters.authorized + restart, + filters=command(BotCommands.RestartCommand, case_sensitive=True) + & CustomFilters.sudo, + ) + ) + bot.add_handler( + MessageHandler( + ping, + filters=command(BotCommands.PingCommand, case_sensitive=True) + & CustomFilters.authorized, ) ) bot.add_handler( MessageHandler( bot_help, - filters=command(BotCommands.HelpCommand) & CustomFilters.authorized, + filters=command(BotCommands.HelpCommand, case_sensitive=True) + & CustomFilters.authorized, ) ) bot.add_handler( MessageHandler( - stats, filters=command(BotCommands.StatsCommand) & CustomFilters.authorized + stats, + filters=command(BotCommands.StatsCommand, case_sensitive=True) + & CustomFilters.authorized, ) ) LOGGER.info("Bot Started!") diff --git a/bot/helper/common.py b/bot/helper/common.py index a884ea40..628ee8c5 100644 --- a/bot/helper/common.py +++ b/bot/helper/common.py @@ -139,6 +139,10 @@ class TaskConfig: ) async def isTokenExists(self, path, status): + if not self.upDest: + raise ValueError("No Upload Destination!") + if not is_gdrive_id(self.upDest) and not is_rclone_path(self.upDest): + raise ValueError("Wrong Upload Destination!") if is_rclone_path(path): config_path = self.getConfigPath(path) if config_path != "rclone.conf" and status == "up": @@ -176,7 +180,15 @@ class TaskConfig: else ["aria2", "!qB"] ) if self.link not in ["rcl", "gdl"]: - if not self.isYtDlp and not self.isJd: + if ( + not self.isYtDlp + and not self.isJd + and ( + is_gdrive_id(self.link) + or is_rclone_path(self.link) + or is_gdrive_link(self.link) + ) + ): await self.isTokenExists(self.link, "dl") elif self.link == "rcl": if not self.isYtDlp and not self.isJd: @@ -217,10 +229,6 @@ class TaskConfig: ) elif (not self.upDest and default_upload == "gd") or self.upDest == "gd": self.upDest = self.userDict.get("gdrive_id") or config_dict["GDRIVE_ID"] - if not self.upDest: - raise ValueError("No Upload Destination!") - if not is_gdrive_id(self.upDest) and not is_rclone_path(self.upDest): - raise ValueError("Wrong Upload Destination!") if self.upDest not in ["rcl", "gdl"]: await self.isTokenExists(self.upDest, "up") diff --git a/bot/helper/ext_utils/bot_utils.py b/bot/helper/ext_utils/bot_utils.py index e104c265..73b1d66e 100644 --- a/bot/helper/ext_utils/bot_utils.py +++ b/bot/helper/ext_utils/bot_utils.py @@ -6,7 +6,6 @@ from asyncio import ( sleep, ) from asyncio.subprocess import PIPE -from concurrent.futures import ThreadPoolExecutor from functools import partial, wraps from bot import user_data, config_dict, bot_loop @@ -18,8 +17,6 @@ from bot.helper.ext_utils.help_messages import ( from bot.helper.ext_utils.telegraph_helper import telegraph from bot.helper.telegram_helper.button_build import ButtonMaker -THREADPOOL = ThreadPoolExecutor(max_workers=1000) - COMMAND_USAGE = {} @@ -206,7 +203,7 @@ def new_task(func): async def sync_to_async(func, *args, wait=True, **kwargs): pfunc = partial(func, *args, **kwargs) - future = bot_loop.run_in_executor(THREADPOOL, pfunc) + future = bot_loop.run_in_executor(None, pfunc) return await future if wait else future diff --git a/bot/helper/ext_utils/db_handler.py b/bot/helper/ext_utils/db_handler.py index 1a1d214a..f5a56d3a 100644 --- a/bot/helper/ext_utils/db_handler.py +++ b/bot/helper/ext_utils/db_handler.py @@ -42,7 +42,6 @@ class DbManager: ) except Exception as e: LOGGER.error(f"DataBase Collection Error: {e}") - self._conn.close return # Save Aria2c options if await self._db.settings.aria2c.find_one({"_id": bot_id}) is None: @@ -98,7 +97,6 @@ class DbManager: del row["_id"] rss_dict[user_id] = row LOGGER.info("Rss data has been imported from Database.") - self._conn.close async def update_deploy_config(self): if self._err: @@ -107,7 +105,6 @@ class DbManager: await self._db.settings.deployConfig.replace_one( {"_id": bot_id}, current_config, upsert=True ) - self._conn.close async def update_config(self, dict_): if self._err: @@ -115,7 +112,6 @@ class DbManager: await self._db.settings.config.update_one( {"_id": bot_id}, {"$set": dict_}, upsert=True ) - self._conn.close async def update_aria2(self, key, value): if self._err: @@ -123,7 +119,6 @@ class DbManager: await self._db.settings.aria2c.update_one( {"_id": bot_id}, {"$set": {key: value}}, upsert=True ) - self._conn.close async def update_qbittorrent(self, key, value): if self._err: @@ -131,7 +126,6 @@ class DbManager: await self._db.settings.qbittorrent.update_one( {"_id": bot_id}, {"$set": {key: value}}, upsert=True ) - self._conn.close async def save_qbit_settings(self): if self._err: @@ -139,7 +133,6 @@ class DbManager: await self._db.settings.qbittorrent.replace_one( {"_id": bot_id}, qbit_options, upsert=True ) - self._conn.close async def update_private_file(self, path): if self._err: @@ -155,8 +148,6 @@ class DbManager: ) if path == "config.env": await self.update_deploy_config() - else: - self._conn.close async def update_nzb_config(self): async with aiopen("sabnzbd/SABnzbd.ini", "rb+") as pf: @@ -176,7 +167,6 @@ class DbManager: if data.get("token_pickle"): del data["token_pickle"] await self._db.users.replace_one({"_id": user_id}, data, upsert=True) - self._conn.close async def update_user_doc(self, user_id, key, path=""): if self._err: @@ -189,7 +179,6 @@ class DbManager: await self._db.users.update_one( {"_id": user_id}, {"$set": {key: doc_bin}}, upsert=True ) - self._conn.close async def rss_update_all(self): if self._err: @@ -198,7 +187,6 @@ class DbManager: await self._db.rss[bot_id].replace_one( {"_id": user_id}, rss_dict[user_id], upsert=True ) - self._conn.close async def rss_update(self, user_id): if self._err: @@ -206,25 +194,21 @@ class DbManager: await self._db.rss[bot_id].replace_one( {"_id": user_id}, rss_dict[user_id], upsert=True ) - self._conn.close async def rss_delete(self, user_id): if self._err: return await self._db.rss[bot_id].delete_one({"_id": user_id}) - self._conn.close async def add_incomplete_task(self, cid, link, tag): if self._err: return await self._db.tasks[bot_id].insert_one({"_id": link, "cid": cid, "tag": tag}) - self._conn.close async def rm_complete_task(self, link): if self._err: return await self._db.tasks[bot_id].delete_one({"_id": link}) - self._conn.close async def get_incomplete_tasks(self): notifier_dict = {} @@ -242,11 +226,9 @@ class DbManager: else: notifier_dict[row["cid"]] = {row["tag"]: [row["_id"]]} await self._db.tasks[bot_id].drop() - self._conn.close return notifier_dict # return a dict ==> {cid: {tag: [_id, _id, ...]}} async def trunc_table(self, name): if self._err: return await self._db[name][bot_id].drop() - self._conn.close diff --git a/bot/helper/ext_utils/files_utils.py b/bot/helper/ext_utils/files_utils.py index 37faa620..85f2f0c1 100644 --- a/bot/helper/ext_utils/files_utils.py +++ b/bot/helper/ext_utils/files_utils.py @@ -5,7 +5,7 @@ from os import walk, path as ospath, makedirs from re import split as re_split, I, search as re_search, escape from shutil import rmtree from subprocess import run as srun -from sys import exit as sexit +from sys import exit from bot import aria2, LOGGER, DOWNLOAD_DIR, qbittorrent_client from bot.helper.ext_utils.bot_utils import sync_to_async, cmd_exec @@ -105,10 +105,10 @@ def exit_clean_up(signal, frame): LOGGER.info("Please wait, while we clean up and stop the running downloads") clean_all() srun(["pkill", "-9", "-f", "gunicorn|aria2c|qbittorrent-nox|ffmpeg|java"]) - sexit(0) + exit(0) except KeyboardInterrupt: LOGGER.warning("Force Exiting before the cleanup finishes!") - sexit(1) + exit(1) async def clean_unwanted(path, custom_list=None): diff --git a/bot/helper/ext_utils/media_utils.py b/bot/helper/ext_utils/media_utils.py index 6a44ec93..73dcf0ad 100644 --- a/bot/helper/ext_utils/media_utils.py +++ b/bot/helper/ext_utils/media_utils.py @@ -249,6 +249,8 @@ async def take_ss(video_file, ss_nb) -> bool: "1", "-frames:v", "1", + "-threads", + f"{cpu_count() // 2}", output, ] cap_time += interval @@ -288,6 +290,8 @@ async def get_audio_thumb(audio_file): "-an", "-vcodec", "copy", + "-threads", + f"{cpu_count() // 2}", des_dir, ] _, err, code = await cmd_exec(cmd) @@ -321,6 +325,8 @@ async def create_thumbnail(video_file, duration): "thumbnail", "-frames:v", "1", + "-threads", + f"{cpu_count() // 2}", des_dir, ] try: diff --git a/bot/helper/mirror_leech_utils/download_utils/direct_link_generator.py b/bot/helper/mirror_leech_utils/download_utils/direct_link_generator.py index e4fc1720..041c4cec 100644 --- a/bot/helper/mirror_leech_utils/download_utils/direct_link_generator.py +++ b/bot/helper/mirror_leech_utils/download_utils/direct_link_generator.py @@ -19,7 +19,6 @@ from bot.helper.ext_utils.help_messages import PASSWORD_ERROR_MESSAGE from bot.helper.ext_utils.links_utils import is_share_link from bot.helper.ext_utils.status_utils import speed_string_to_bytes -_caches = {} user_agent = ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:122.0) Gecko/20100101 Firefox/122.0" ) diff --git a/bot/helper/mirror_leech_utils/download_utils/jd_download.py b/bot/helper/mirror_leech_utils/download_utils/jd_download.py index 78983758..999a95ed 100644 --- a/bot/helper/mirror_leech_utils/download_utils/jd_download.py +++ b/bot/helper/mirror_leech_utils/download_utils/jd_download.py @@ -1,4 +1,4 @@ -from asyncio import wait_for, Event, wrap_future, sleep +from asyncio import wait_for, Event, sleep from functools import partial from pyrogram.filters import regex, user from pyrogram.handlers import CallbackQueryHandler @@ -16,7 +16,7 @@ from bot import ( jd_lock, jd_downloads, ) -from bot.helper.ext_utils.bot_utils import new_thread, retry_function, new_task +from bot.helper.ext_utils.bot_utils import retry_function from bot.helper.ext_utils.jdownloader_booter import jdownloader from bot.helper.ext_utils.task_manager import ( check_running_tasks, @@ -36,7 +36,6 @@ from bot.helper.telegram_helper.message_utils import ( ) -@new_task async def configureDownload(_, query, obj): data = query.data.split() message = query.message @@ -56,7 +55,6 @@ class JDownloaderHelper: self.listener = listener self.event = Event() - @new_thread async def _event_handler(self): pfunc = partial(configureDownload, obj=self) handler = self.listener.client.add_handler( @@ -75,7 +73,6 @@ class JDownloaderHelper: self.listener.client.remove_handler(*handler) async def waitForConfigurations(self): - future = self._event_handler() buttons = ButtonMaker() buttons.ubutton("Select", "https://my.jdownloader.org") buttons.ibutton("Done Selecting", "jdq sdone") @@ -83,7 +80,7 @@ class JDownloaderHelper: button = buttons.build_menu(2) msg = f"Disable/Remove the unwanted files or change variants or edit files names from myJdownloader site for {self.listener.name} but don't start it manually!\n\nAfter finish press Done Selecting!\nTimeout: 300s" self._reply_to = await sendMessage(self.listener.message, msg, button) - await wrap_future(future) + await self._event_handler() if not self.listener.isCancelled: await deleteMessage(self._reply_to) return self.listener.isCancelled @@ -94,7 +91,6 @@ async def add_jd_download(listener, path): if jdownloader.device is None: await listener.onDownloadError(jdownloader.error) return - try: await wait_for(retry_function(jdownloader.device.jd.version), timeout=10) except: diff --git a/bot/helper/mirror_leech_utils/download_utils/telegram_download.py b/bot/helper/mirror_leech_utils/download_utils/telegram_download.py index 9a0edbd8..ffe7d788 100644 --- a/bot/helper/mirror_leech_utils/download_utils/telegram_download.py +++ b/bot/helper/mirror_leech_utils/download_utils/telegram_download.py @@ -1,6 +1,6 @@ from asyncio import Lock, sleep from time import time -from pyrogram.errors import FloodWait +from pyrogram.errors import FloodWait, FloodPremiumWait from bot import ( LOGGER, @@ -81,7 +81,7 @@ class TelegramDownloadHelper: if self._listener.isCancelled: await self._onDownloadError("Cancelled by user!") return - except FloodWait as f: + except (FloodWait, FloodPremiumWait) as f: LOGGER.warning(str(f)) await sleep(f.value) except Exception as e: diff --git a/bot/helper/mirror_leech_utils/gdrive_utils/list.py b/bot/helper/mirror_leech_utils/gdrive_utils/list.py index fa649c31..6ed2872a 100644 --- a/bot/helper/mirror_leech_utils/gdrive_utils/list.py +++ b/bot/helper/mirror_leech_utils/gdrive_utils/list.py @@ -1,5 +1,5 @@ from aiofiles.os import path as aiopath -from asyncio import wait_for, Event, wrap_future, gather +from asyncio import wait_for, Event, gather from functools import partial from logging import getLogger from natsort import natsorted @@ -9,7 +9,7 @@ from tenacity import RetryError from time import time from bot import config_dict -from bot.helper.ext_utils.bot_utils import new_thread, new_task, update_user_ldata +from bot.helper.ext_utils.bot_utils import new_task, update_user_ldata from bot.helper.ext_utils.db_handler import DbManager from bot.helper.ext_utils.status_utils import get_readable_file_size, get_readable_time from bot.helper.mirror_leech_utils.gdrive_utils.helper import GoogleDriveHelper @@ -25,7 +25,6 @@ LOGGER = getLogger(__name__) LIST_LIMIT = 6 -@new_task async def id_updates(_, query, obj): await query.answer() message = query.message @@ -134,7 +133,6 @@ class gdriveList(GoogleDriveHelper): self.page_step = 1 super().__init__() - @new_thread async def _event_handler(self): pfunc = partial(id_updates, obj=self) handler = self.listener.client.add_handler( @@ -351,7 +349,6 @@ class gdriveList(GoogleDriveHelper): async def get_target_id(self, status, token_path=None): self.list_status = status - future = self._event_handler() if token_path is None: self._token_user, self._token_owner, self._sa_owner = await gather( aiopath.exists(self.user_token_path), @@ -366,7 +363,7 @@ class gdriveList(GoogleDriveHelper): self.token_path = token_path self.use_sa = self.token_path == "accounts" await self.list_drives() - await wrap_future(future) + await self._event_handler() if self._reply_to: await deleteMessage(self._reply_to) if not self.listener.isCancelled: diff --git a/bot/helper/mirror_leech_utils/rclone_utils/list.py b/bot/helper/mirror_leech_utils/rclone_utils/list.py index 4c77549b..04a2e0ef 100644 --- a/bot/helper/mirror_leech_utils/rclone_utils/list.py +++ b/bot/helper/mirror_leech_utils/rclone_utils/list.py @@ -1,6 +1,6 @@ from aiofiles import open as aiopen from aiofiles.os import path as aiopath -from asyncio import wait_for, Event, wrap_future, gather +from asyncio import wait_for, Event, gather from configparser import ConfigParser from functools import partial from json import loads @@ -11,7 +11,6 @@ from time import time from bot import LOGGER, config_dict from bot.helper.ext_utils.bot_utils import ( cmd_exec, - new_thread, new_task, update_user_ldata, ) @@ -27,7 +26,6 @@ from bot.helper.telegram_helper.message_utils import ( LIST_LIMIT = 6 -@new_task async def path_updates(_, query, obj): await query.answer() message = query.message @@ -129,7 +127,6 @@ class RcloneList: self.iter_start = 0 self.page_step = 1 - @new_thread async def _event_handler(self): pfunc = partial(path_updates, obj=self) handler = self.listener.client.add_handler( @@ -313,7 +310,6 @@ class RcloneList: async def get_rclone_path(self, status, config_path=None): self.list_status = status - future = self._event_handler() if config_path is None: self._rc_user, self._rc_owner = await gather( aiopath.exists(self.user_rcc_path), aiopath.exists("rclone.conf") @@ -325,7 +321,7 @@ class RcloneList: else: self.config_path = config_path await self.list_remotes() - await wrap_future(future) + await self._event_handler() await deleteMessage(self._reply_to) if self.config_path != "rclone.conf" and not self.listener.isCancelled: return f"mrcc:{self.remote}{self.path}" diff --git a/bot/helper/mirror_leech_utils/telegram_uploader.py b/bot/helper/mirror_leech_utils/telegram_uploader.py index 54ba233e..9c59d73b 100644 --- a/bot/helper/mirror_leech_utils/telegram_uploader.py +++ b/bot/helper/mirror_leech_utils/telegram_uploader.py @@ -10,7 +10,7 @@ from asyncio import sleep from logging import getLogger from natsort import natsorted from os import walk, path as ospath -from pyrogram.errors import FloodWait, RPCError +from pyrogram.errors import FloodWait, RPCError, FloodPremiumWait, SlowmodeWait from pyrogram.types import InputMediaVideo, InputMediaDocument, InputMediaPhoto from re import match as re_match, sub as re_sub from tenacity import ( @@ -469,7 +469,7 @@ class TgUploader: and await aiopath.exists(thumb) ): await remove(thumb) - except FloodWait as f: + except (FloodWait, FloodPremiumWait, SlowmodeWait) as f: LOGGER.warning(str(f)) await sleep(f.value * 1.3) if ( diff --git a/bot/modules/authorize.py b/bot/modules/authorize.py index dd8af8eb..99274637 100644 --- a/bot/modules/authorize.py +++ b/bot/modules/authorize.py @@ -84,22 +84,29 @@ async def removeSudo(client, message): bot.add_handler( MessageHandler( - authorize, filters=command(BotCommands.AuthorizeCommand) & CustomFilters.sudo + authorize, + filters=command(BotCommands.AuthorizeCommand, case_sensitive=True) + & CustomFilters.sudo, ) ) bot.add_handler( MessageHandler( unauthorize, - filters=command(BotCommands.UnAuthorizeCommand) & CustomFilters.sudo, + filters=command(BotCommands.UnAuthorizeCommand, case_sensitive=True) + & CustomFilters.sudo, ) ) bot.add_handler( MessageHandler( - addSudo, filters=command(BotCommands.AddSudoCommand) & CustomFilters.sudo + addSudo, + filters=command(BotCommands.AddSudoCommand, case_sensitive=True) + & CustomFilters.sudo, ) ) bot.add_handler( MessageHandler( - removeSudo, filters=command(BotCommands.RmSudoCommand) & CustomFilters.sudo + removeSudo, + filters=command(BotCommands.RmSudoCommand, case_sensitive=True) + & CustomFilters.sudo, ) ) diff --git a/bot/modules/bot_settings.py b/bot/modules/bot_settings.py index 5a0d9f38..96601820 100644 --- a/bot/modules/bot_settings.py +++ b/bot/modules/bot_settings.py @@ -4,17 +4,15 @@ from aioshutil import rmtree from asyncio import ( create_subprocess_exec, create_subprocess_shell, - sleep, gather, wait_for, ) from dotenv import load_dotenv -from functools import partial from io import BytesIO from os import environ, getcwd -from pyrogram.filters import command, regex, create +from pyrogram import filters from pyrogram.handlers import MessageHandler, CallbackQueryHandler -from time import time +from pyrogram.errors import ListenerTimeout, ListenerStopped from bot import ( config_dict, @@ -44,7 +42,6 @@ from bot import ( from bot.helper.ext_utils.bot_utils import ( setInterval, sync_to_async, - new_thread, retry_function, ) from bot.helper.ext_utils.db_handler import DbManager @@ -66,7 +63,6 @@ from bot.modules.torrent_search import initiate_search_tools START = 0 STATE = "view" -handler_dict = {} default_values = { "DOWNLOAD_DIR": "/usr/src/app/downloads/", "LEECH_SPLIT_SIZE": MAX_SPLIT_SIZE, @@ -251,8 +247,7 @@ async def update_buttons(message, key=None, edit_type=None): await editMessage(message, msg, button) -async def edit_variable(_, message, pre_message, key): - handler_dict[message.chat.id] = False +async def edit_variable(message, pre_message, key): value = message.text if value.lower() == "true": value = True @@ -343,8 +338,7 @@ async def edit_variable(_, message, pre_message, key): await sabnzbd_client.set_special_config("servers", s) -async def edit_aria(_, message, pre_message, key): - handler_dict[message.chat.id] = False +async def edit_aria(message, pre_message, key): value = message.text if key == "newkey": key, value = [x.strip() for x in value.split(":", 1)] @@ -371,8 +365,7 @@ async def edit_aria(_, message, pre_message, key): await DbManager().update_aria2(key, value) -async def edit_qbit(_, message, pre_message, key): - handler_dict[message.chat.id] = False +async def edit_qbit(message, pre_message, key): value = message.text if value.lower() == "true": value = True @@ -390,8 +383,7 @@ async def edit_qbit(_, message, pre_message, key): await DbManager().update_qbittorrent(key, value) -async def edit_nzb(_, message, pre_message, key): - handler_dict[message.chat.id] = False +async def edit_nzb(message, pre_message, key): value = message.text if value.isdigit(): value = int(value) @@ -405,8 +397,7 @@ async def edit_nzb(_, message, pre_message, key): await DbManager().update_nzb_config() -async def edit_nzb_server(_, message, pre_message, key, index=0): - handler_dict[message.chat.id] = False +async def edit_nzb_server(message, pre_message, key, index=0): value = message.text if value.startswith("{") and value.endswith("}"): if key == "newser": @@ -471,8 +462,7 @@ async def sync_jdownloader(): await DbManager().update_private_file("cfg.zip") -async def update_private_file(_, message, pre_message): - handler_dict[message.chat.id] = False +async def update_private_file(message, pre_message): if not message.media and (file_name := message.text): fn = file_name.rsplit(".zip", 1)[0] if await aiopath.isfile(fn) and file_name != "config.env": @@ -550,35 +540,20 @@ async def update_private_file(_, message, pre_message): await remove("accounts.zip") -async def event_handler(client, query, pfunc, rfunc, document=False): - chat_id = query.message.chat.id - handler_dict[chat_id] = True - start_time = time() - - async def event_filter(_, __, event): - user = event.from_user or event.sender_chat - return bool( - user.id == query.from_user.id - and event.chat.id == chat_id - and (event.text or event.document and document) - ) - - handler = client.add_handler( - MessageHandler(pfunc, filters=create(event_filter)), group=-1 +async def event_handler(client, query, document=False): + event_filter = filters.text | filters.document if document else filters.text + return await client.listen( + chat_id=query.message.chat.id, + user_id=query.from_user.id, + filters=event_filter, + timeout=60, ) - while handler_dict[chat_id]: - await sleep(0.5) - if time() - start_time > 60: - handler_dict[chat_id] = False - await rfunc() - client.remove_handler(*handler) -@new_thread async def edit_bot_settings(client, query): - data = query.data.split() message = query.message - handler_dict[message.chat.id] = False + await client.stop_listening(chat_id=message.chat.id, user_id=query.from_user.id) + data = query.data.split() if data[1] == "close": await query.answer() await deleteMessage(message.reply_to_message) @@ -772,15 +747,25 @@ async def edit_bot_settings(client, query): elif data[1] == "private": await query.answer() await update_buttons(message, data[1]) - pfunc = partial(update_private_file, pre_message=message) - rfunc = partial(update_buttons, message) - await event_handler(client, query, pfunc, rfunc, True) + try: + event = await event_handler(client, query, True) + except ListenerTimeout: + await update_buttons(message) + except ListenerStopped: + pass + else: + await update_private_file(event, message) elif data[1] == "botvar" and STATE == "edit": await query.answer() await update_buttons(message, data[2], data[1]) - pfunc = partial(edit_variable, pre_message=message, key=data[2]) - rfunc = partial(update_buttons, message, "var") - await event_handler(client, query, pfunc, rfunc) + try: + event = await event_handler(client, query) + except ListenerTimeout: + await update_buttons(message, "var") + except ListenerStopped: + pass + else: + await edit_variable(event, message, data[2]) elif data[1] == "botvar" and STATE == "view": value = f"{config_dict[data[2]]}" if len(value) > 200: @@ -795,9 +780,14 @@ async def edit_bot_settings(client, query): elif data[1] == "ariavar" and (STATE == "edit" or data[2] == "newkey"): await query.answer() await update_buttons(message, data[2], data[1]) - pfunc = partial(edit_aria, pre_message=message, key=data[2]) - rfunc = partial(update_buttons, message, "aria") - await event_handler(client, query, pfunc, rfunc) + try: + event = await event_handler(client, query) + except ListenerTimeout: + await update_buttons(message, "aria") + except ListenerStopped: + pass + else: + await edit_aria(event, message, data[2]) elif data[1] == "ariavar" and STATE == "view": value = f"{aria2_options[data[2]]}" if len(value) > 200: @@ -812,9 +802,14 @@ async def edit_bot_settings(client, query): elif data[1] == "qbitvar" and STATE == "edit": await query.answer() await update_buttons(message, data[2], data[1]) - pfunc = partial(edit_qbit, pre_message=message, key=data[2]) - rfunc = partial(update_buttons, message, "qbit") - await event_handler(client, query, pfunc, rfunc) + try: + event = await event_handler(client, query) + except ListenerTimeout: + await update_buttons(message, "qbit") + except ListenerStopped: + pass + else: + await edit_qbit(event, message, data[2]) elif data[1] == "qbitvar" and STATE == "view": value = f"{qbit_options[data[2]]}" if len(value) > 200: @@ -829,9 +824,14 @@ async def edit_bot_settings(client, query): elif data[1] == "nzbvar" and STATE == "edit": await query.answer() await update_buttons(message, data[2], data[1]) - pfunc = partial(edit_nzb, pre_message=message, key=data[2]) - rfunc = partial(update_buttons, message, "nzb") - await event_handler(client, query, pfunc, rfunc) + try: + event = await event_handler(client, query) + except ListenerTimeout: + await update_buttons(message, "nzb") + except ListenerStopped: + pass + else: + await edit_nzb(event, message, data[2]) elif data[1] == "nzbvar" and STATE == "view": value = f"{nzb_options[data[2]]}" if len(value) > 200: @@ -861,9 +861,14 @@ async def edit_bot_settings(client, query): index = 0 if data[2] == "newser" else int(data[1].replace("nzbsevar", "")) await query.answer() await update_buttons(message, data[2], data[1]) - pfunc = partial(edit_nzb_server, pre_message=message, key=data[2], index=index) - rfunc = partial(update_buttons, message, data[1]) - await event_handler(client, query, pfunc, rfunc) + try: + event = await event_handler(client, query) + except ListenerTimeout: + await update_buttons(message, data[1]) + except ListenerStopped: + pass + else: + await edit_nzb_server(event, message, data[2], index) elif data[1].startswith("nzbsevar") and STATE == "view": index = int(data[1].replace("nzbsevar", "")) value = f"{config_dict['USENET_SERVERS'][index][data[2]]}" @@ -912,8 +917,8 @@ async def edit_bot_settings(client, query): await deleteMessage(message) -async def bot_settings(_, message): - handler_dict[message.chat.id] = False +async def bot_settings(client, message): + await client.stop_listening(chat_id=message.chat.id, user_id=message.from_user.id) msg, button = await get_buttons() globals()["START"] = 0 await sendMessage(message, msg, button) @@ -1281,11 +1286,11 @@ async def load_config(): bot.add_handler( MessageHandler( - bot_settings, filters=command(BotCommands.BotSetCommand) & CustomFilters.sudo + bot_settings, filters=filters.command(BotCommands.BotSetCommand, case_sensitive=True) & CustomFilters.sudo ) ) bot.add_handler( CallbackQueryHandler( - edit_bot_settings, filters=regex("^botset") & CustomFilters.sudo + edit_bot_settings, filters=filters.regex("^botset") & CustomFilters.sudo ) ) diff --git a/bot/modules/cancel_task.py b/bot/modules/cancel_task.py index 8a201fe0..3f021f21 100644 --- a/bot/modules/cancel_task.py +++ b/bot/modules/cancel_task.py @@ -165,13 +165,15 @@ async def cancel_all_update(_, query): bot.add_handler( MessageHandler( cancel_task, - filters=command(BotCommands.CancelTaskCommand) & CustomFilters.authorized, + filters=command(BotCommands.CancelTaskCommand, case_sensitive=True) + & CustomFilters.authorized, ) ) bot.add_handler( MessageHandler( cancell_all_buttons, - filters=command(BotCommands.CancelAllCommand) & CustomFilters.authorized, + filters=command(BotCommands.CancelAllCommand, case_sensitive=True) + & CustomFilters.authorized, ) ) bot.add_handler(CallbackQueryHandler(cancel_all_update, filters=regex("^canall"))) diff --git a/bot/modules/clone.py b/bot/modules/clone.py index b7a27d3d..99e3ed07 100644 --- a/bot/modules/clone.py +++ b/bot/modules/clone.py @@ -290,6 +290,8 @@ async def clone(client, message): bot.add_handler( MessageHandler( - clone, filters=command(BotCommands.CloneCommand) & CustomFilters.authorized + clone, + filters=command(BotCommands.CloneCommand, case_sensitive=True) + & CustomFilters.authorized, ) ) diff --git a/bot/modules/exec.py b/bot/modules/exec.py index dac91bd4..2fd0fe72 100644 --- a/bot/modules/exec.py +++ b/bot/modules/exec.py @@ -118,16 +118,22 @@ async def clear(_, message): bot.add_handler( MessageHandler( - aioexecute, filters=command(BotCommands.AExecCommand) & CustomFilters.owner + aioexecute, + filters=command(BotCommands.AExecCommand, case_sensitive=True) + & CustomFilters.owner, ) ) bot.add_handler( MessageHandler( - execute, filters=command(BotCommands.ExecCommand) & CustomFilters.owner + execute, + filters=command(BotCommands.ExecCommand, case_sensitive=True) + & CustomFilters.owner, ) ) bot.add_handler( MessageHandler( - clear, filters=command(BotCommands.ClearLocalsCommand) & CustomFilters.owner + clear, + filters=command(BotCommands.ClearLocalsCommand, case_sensitive=True) + & CustomFilters.owner, ) ) diff --git a/bot/modules/file_selector.py b/bot/modules/file_selector.py index eb704e6d..45829e9e 100644 --- a/bot/modules/file_selector.py +++ b/bot/modules/file_selector.py @@ -168,7 +168,9 @@ async def get_confirm(_, query): bot.add_handler( MessageHandler( - select, filters=command(BotCommands.SelectCommand) & CustomFilters.authorized + select, + filters=command(BotCommands.SelectCommand, case_sensitive=True) + & CustomFilters.authorized, ) ) bot.add_handler(CallbackQueryHandler(get_confirm, filters=regex("^sel"))) diff --git a/bot/modules/force_start.py b/bot/modules/force_start.py index f7cd79fd..7a3d8434 100644 --- a/bot/modules/force_start.py +++ b/bot/modules/force_start.py @@ -77,6 +77,7 @@ async def remove_from_queue(_, message): bot.add_handler( MessageHandler( remove_from_queue, - filters=command(BotCommands.ForceStartCommand) & CustomFilters.authorized, + filters=command(BotCommands.ForceStartCommand, case_sensitive=True) + & CustomFilters.authorized, ) ) diff --git a/bot/modules/gd_count.py b/bot/modules/gd_count.py index 5d5a3fac..74f13184 100644 --- a/bot/modules/gd_count.py +++ b/bot/modules/gd_count.py @@ -50,6 +50,8 @@ async def countNode(_, message): bot.add_handler( MessageHandler( - countNode, filters=command(BotCommands.CountCommand) & CustomFilters.authorized + countNode, + filters=command(BotCommands.CountCommand, case_sensitive=True) + & CustomFilters.authorized, ) ) diff --git a/bot/modules/gd_delete.py b/bot/modules/gd_delete.py index 57c4888c..5731abc4 100644 --- a/bot/modules/gd_delete.py +++ b/bot/modules/gd_delete.py @@ -34,6 +34,7 @@ async def deletefile(_, message): bot.add_handler( MessageHandler( deletefile, - filters=command(BotCommands.DeleteCommand) & CustomFilters.authorized, + filters=command(BotCommands.DeleteCommand, case_sensitive=True) + & CustomFilters.authorized, ) ) diff --git a/bot/modules/gd_search.py b/bot/modules/gd_search.py index a7faaf02..897816e5 100644 --- a/bot/modules/gd_search.py +++ b/bot/modules/gd_search.py @@ -95,7 +95,8 @@ async def gdrive_search(_, message): bot.add_handler( MessageHandler( gdrive_search, - filters=command(BotCommands.ListCommand) & CustomFilters.authorized, + filters=command(BotCommands.ListCommand, case_sensitive=True) + & CustomFilters.authorized, ) ) bot.add_handler(CallbackQueryHandler(select_type, filters=regex("^list_types"))) diff --git a/bot/modules/mirror_leech.py b/bot/modules/mirror_leech.py index 117d2ed8..bb3128ee 100644 --- a/bot/modules/mirror_leech.py +++ b/bot/modules/mirror_leech.py @@ -384,45 +384,55 @@ async def nzb_leech(client, message): bot.add_handler( MessageHandler( - mirror, filters=command(BotCommands.MirrorCommand) & CustomFilters.authorized + mirror, filters=command(BotCommands.MirrorCommand, case_sensitive=True) & CustomFilters.authorized ) ) bot.add_handler( MessageHandler( qb_mirror, - filters=command(BotCommands.QbMirrorCommand) & CustomFilters.authorized, + filters=command(BotCommands.QbMirrorCommand, case_sensitive=True) + & CustomFilters.authorized, ) ) bot.add_handler( MessageHandler( jd_mirror, - filters=command(BotCommands.JdMirrorCommand) & CustomFilters.authorized, + filters=command(BotCommands.JdMirrorCommand, case_sensitive=True) + & CustomFilters.authorized, ) ) bot.add_handler( MessageHandler( nzb_mirror, - filters=command(BotCommands.NzbMirrorCommand) & CustomFilters.authorized, + filters=command(BotCommands.NzbMirrorCommand, case_sensitive=True) + & CustomFilters.authorized, ) ) bot.add_handler( MessageHandler( - leech, filters=command(BotCommands.LeechCommand) & CustomFilters.authorized + leech, + filters=command(BotCommands.LeechCommand, case_sensitive=True) + & CustomFilters.authorized, ) ) bot.add_handler( MessageHandler( - qb_leech, filters=command(BotCommands.QbLeechCommand) & CustomFilters.authorized + qb_leech, + filters=command(BotCommands.QbLeechCommand, case_sensitive=True) + & CustomFilters.authorized, ) ) bot.add_handler( MessageHandler( - jd_leech, filters=command(BotCommands.JdLeechCommand) & CustomFilters.authorized + jd_leech, + filters=command(BotCommands.JdLeechCommand, case_sensitive=True) + & CustomFilters.authorized, ) ) bot.add_handler( MessageHandler( nzb_leech, - filters=command(BotCommands.NzbLeechCommand) & CustomFilters.authorized, + filters=command(BotCommands.NzbLeechCommand, case_sensitive=True) + & CustomFilters.authorized, ) ) diff --git a/bot/modules/rss.py b/bot/modules/rss.py index 41d72202..1a8e109e 100644 --- a/bot/modules/rss.py +++ b/bot/modules/rss.py @@ -1,16 +1,15 @@ from httpx import AsyncClient from apscheduler.triggers.interval import IntervalTrigger -from asyncio import Lock, sleep +from asyncio import Lock, sleep, gather from datetime import datetime, timedelta from feedparser import parse as feedparse -from functools import partial from io import BytesIO -from pyrogram.filters import command, regex, create +from pyrogram.filters import command, regex from pyrogram.handlers import MessageHandler, CallbackQueryHandler -from time import time +from pyrogram.errors import ListenerTimeout, ListenerStopped from bot import scheduler, rss_dict, LOGGER, DATABASE_URL, config_dict, bot -from bot.helper.ext_utils.bot_utils import new_thread, arg_parser +from bot.helper.ext_utils.bot_utils import arg_parser from bot.helper.ext_utils.db_handler import DbManager from bot.helper.ext_utils.exceptions import RssShutdownException from bot.helper.ext_utils.help_messages import RSS_HELP_MESSAGE @@ -26,7 +25,6 @@ from bot.helper.telegram_helper.message_utils import ( ) rss_dict_lock = Lock() -handler_dict = {} async def rssMenu(event): @@ -60,14 +58,14 @@ async def updateRssMenu(query): await editMessage(query.message, msg, button) -async def getRssMenu(_, message): +async def getRssMenu(client, message): + await client.stop_listening(chat_id=message.chat.id, user_id=message.from_user.id) msg, button = await rssMenu(message) await sendMessage(message, msg, button) -async def rssSub(_, message, pre_event): +async def rssSub(message): user_id = message.from_user.id - handler_dict[user_id] = False if username := message.from_user.username: tag = f"@{username}" else: @@ -185,7 +183,6 @@ async def rssSub(_, message, pre_event): elif is_sudo and not scheduler.running: addJob() scheduler.start() - await updateRssMenu(pre_event) async def getUserId(title): @@ -200,9 +197,8 @@ async def getUserId(title): ) -async def rssUpdate(_, message, pre_event, state): +async def rssUpdate(message, state): user_id = message.from_user.id - handler_dict[user_id] = False titles = message.text.split() is_sudo = await CustomFilters.sudo("", message) updated = [] @@ -250,7 +246,6 @@ async def rssUpdate(_, message, pre_event, state): ) if DATABASE_URL and rss_dict.get(user_id): await DbManager().rss_update(user_id) - await updateRssMenu(pre_event) async def rssList(query, start, all_users=False): @@ -302,16 +297,14 @@ async def rssList(query, start, all_users=False): await editMessage(query.message, list_feed, button) -async def rssGet(_, message, pre_event): +async def rssGet(message): user_id = message.from_user.id - handler_dict[user_id] = False args = message.text.split() if len(args) < 2: await sendMessage( message, f"{args}. Wrong Input format. You should add number of the items you want to get. Read help message before adding new subcription!", ) - await updateRssMenu(pre_event) return try: title = args[0] @@ -355,12 +348,10 @@ async def rssGet(_, message, pre_event): except Exception as e: LOGGER.error(str(e)) await sendMessage(message, f"Enter a valid value!. {e}") - await updateRssMenu(pre_event) -async def rssEdit(_, message, pre_event): +async def rssEdit(message): user_id = message.from_user.id - handler_dict[user_id] = False items = message.text.split("\n") updated = False for item in items: @@ -408,11 +399,9 @@ async def rssEdit(_, message, pre_event): rss_dict[user_id][title]["exf"] = exf_lists if DATABASE_URL and updated: await DbManager().rss_update(user_id) - await updateRssMenu(pre_event) -async def rssDelete(_, message, pre_event): - handler_dict[message.from_user.id] = False +async def rssDelete(message): users = message.text.split() for user in users: user = int(user) @@ -420,59 +409,46 @@ async def rssDelete(_, message, pre_event): del rss_dict[user] if DATABASE_URL: await DbManager().rss_delete(user) - await updateRssMenu(pre_event) -async def event_handler(client, query, pfunc): - user_id = query.from_user.id - handler_dict[user_id] = True - start_time = time() - - async def event_filter(_, __, event): - user = event.from_user or event.sender_chat - return bool( - user.id == user_id and event.chat.id == query.message.chat.id and event.text - ) - - handler = client.add_handler(MessageHandler(pfunc, create(event_filter)), group=-1) - while handler_dict[user_id]: - await sleep(0.5) - if time() - start_time > 60: - handler_dict[user_id] = False - await updateRssMenu(query) - client.remove_handler(*handler) +async def event_handler(client, query): + return await client.listen( + chat_id=query.message.chat.id, user_id=query.from_user.id, timeout=60 + ) -@new_thread async def rssListener(client, query): user_id = query.from_user.id message = query.message data = query.data.split() + await client.stop_listening(chat_id=message.chat.id, user_id=query.from_user.id) if int(data[2]) != user_id and not await CustomFilters.sudo("", query): await query.answer( text="You don't have permission to use these buttons!", show_alert=True ) elif data[1] == "close": await query.answer() - handler_dict[user_id] = False await deleteMessage(message.reply_to_message) await deleteMessage(message) elif data[1] == "back": await query.answer() - handler_dict[user_id] = False await updateRssMenu(query) elif data[1] == "sub": await query.answer() - handler_dict[user_id] = False buttons = ButtonMaker() buttons.ibutton("Back", f"rss back {user_id}") buttons.ibutton("Close", f"rss close {user_id}") button = buttons.build_menu(2) await editMessage(message, RSS_HELP_MESSAGE, button) - pfunc = partial(rssSub, pre_event=query) - await event_handler(client, query, pfunc) + try: + event = await event_handler(client, query) + except ListenerTimeout: + await updateRssMenu(query) + except ListenerStopped: + pass + else: + await gather(rssSub(event), updateRssMenu(query)) elif data[1] == "list": - handler_dict[user_id] = False if len(rss_dict.get(int(data[2]), {})) == 0: await query.answer(text="No subscriptions!", show_alert=True) else: @@ -480,7 +456,6 @@ async def rssListener(client, query): start = int(data[3]) await rssList(query, start) elif data[1] == "get": - handler_dict[user_id] = False if len(rss_dict.get(int(data[2]), {})) == 0: await query.answer(text="No subscriptions!", show_alert=True) else: @@ -494,10 +469,15 @@ async def rssListener(client, query): "Send one title with value separated by space get last X items.\nTitle Value\nTimeout: 60 sec.", button, ) - pfunc = partial(rssGet, pre_event=query) - await event_handler(client, query, pfunc) + try: + event = await event_handler(client, query) + except ListenerTimeout: + await updateRssMenu(query) + except ListenerStopped: + pass + else: + await gather(rssGet(event), updateRssMenu(query)) elif data[1] in ["unsubscribe", "pause", "resume"]: - handler_dict[user_id] = False if len(rss_dict.get(int(data[2]), {})) == 0: await query.answer(text="No subscriptions!", show_alert=True) else: @@ -517,10 +497,15 @@ async def rssListener(client, query): f"Send one or more rss titles separated by space to {data[1]}.\nTimeout: 60 sec.", button, ) - pfunc = partial(rssUpdate, pre_event=query, state=data[1]) - await event_handler(client, query, pfunc) + try: + event = await event_handler(client, query) + except ListenerTimeout: + await updateRssMenu(query) + except ListenerStopped: + pass + else: + await gather(rssUpdate(event, data[1]),updateRssMenu(query)) elif data[1] == "edit": - handler_dict[user_id] = False if len(rss_dict.get(int(data[2]), {})) == 0: await query.answer(text="No subscriptions!", show_alert=True) else: @@ -538,10 +523,15 @@ Note: Only what you provide will be edited, the rest will be the same like examp Timeout: 60 sec. Argument -c for command and arguments """ await editMessage(message, msg, button) - pfunc = partial(rssEdit, pre_event=query) - await event_handler(client, query, pfunc) + try: + event = await event_handler(client, query) + except ListenerTimeout: + await updateRssMenu(query) + except ListenerStopped: + pass + else: + await gather(rssEdit(event), updateRssMenu(query)) elif data[1].startswith("uall"): - handler_dict[user_id] = False if len(rss_dict.get(int(data[2]), {})) == 0: await query.answer(text="No subscriptions!", show_alert=True) return @@ -610,8 +600,14 @@ Timeout: 60 sec. Argument -c for command and arguments button = buttons.build_menu(2) msg = "Send one or more user_id separated by space to delete their resources.\nTimeout: 60 sec." await editMessage(message, msg, button) - pfunc = partial(rssDelete, pre_event=query) - await event_handler(client, query, pfunc) + try: + event = await event_handler(client, query) + except ListenerTimeout: + await updateRssMenu(query) + except ListenerStopped: + pass + else: + await gather(rssDelete(event), updateRssMenu(query)) elif data[1] == "listall": if not rss_dict: await query.answer(text="No subscriptions!", show_alert=True) @@ -769,7 +765,9 @@ addJob() scheduler.start() bot.add_handler( MessageHandler( - getRssMenu, filters=command(BotCommands.RssCommand) & CustomFilters.authorized + getRssMenu, + filters=command(BotCommands.RssCommand, case_sensitive=True) + & CustomFilters.authorized, ) ) bot.add_handler(CallbackQueryHandler(rssListener, filters=regex("^rss"))) diff --git a/bot/modules/shell.py b/bot/modules/shell.py index ffb7e690..955a3bd9 100644 --- a/bot/modules/shell.py +++ b/bot/modules/shell.py @@ -36,11 +36,15 @@ async def shell(_, message): bot.add_handler( MessageHandler( - shell, filters=command(BotCommands.ShellCommand) & CustomFilters.owner + shell, + filters=command(BotCommands.ShellCommand, case_sensitive=True) + & CustomFilters.owner, ) ) bot.add_handler( EditedMessageHandler( - shell, filters=command(BotCommands.ShellCommand) & CustomFilters.owner + shell, + filters=command(BotCommands.ShellCommand, case_sensitive=True) + & CustomFilters.owner, ) ) diff --git a/bot/modules/status.py b/bot/modules/status.py index 0c18c794..89daed7d 100644 --- a/bot/modules/status.py +++ b/bot/modules/status.py @@ -156,7 +156,8 @@ async def status_pages(_, query): bot.add_handler( MessageHandler( mirror_status, - filters=command(BotCommands.StatusCommand) & CustomFilters.authorized, + filters=command(BotCommands.StatusCommand, case_sensitive=True) + & CustomFilters.authorized, ) ) bot.add_handler(CallbackQueryHandler(status_pages, filters=regex("^status"))) diff --git a/bot/modules/torrent_search.py b/bot/modules/torrent_search.py index ac56c483..f758cca3 100644 --- a/bot/modules/torrent_search.py +++ b/bot/modules/torrent_search.py @@ -314,7 +314,8 @@ async def torrentSearchUpdate(_, query): bot.add_handler( MessageHandler( torrentSearch, - filters=command(BotCommands.SearchCommand) & CustomFilters.authorized, + filters=command(BotCommands.SearchCommand, case_sensitive=True) + & CustomFilters.authorized, ) ) bot.add_handler(CallbackQueryHandler(torrentSearchUpdate, filters=regex("^torser"))) diff --git a/bot/modules/users_settings.py b/bot/modules/users_settings.py index b0f390a7..1c52cfba 100644 --- a/bot/modules/users_settings.py +++ b/bot/modules/users_settings.py @@ -1,12 +1,11 @@ from aiofiles.os import remove, path as aiopath, makedirs -from asyncio import sleep -from functools import partial from html import escape from io import BytesIO from os import getcwd -from pyrogram.filters import command, regex, create +from asyncio import gather +from pyrogram import filters from pyrogram.handlers import MessageHandler, CallbackQueryHandler -from time import time +from pyrogram.errors import ListenerTimeout, ListenerStopped from bot import ( bot, @@ -17,7 +16,7 @@ from bot import ( MAX_SPLIT_SIZE, GLOBAL_EXTENSION_FILTER, ) -from bot.helper.ext_utils.bot_utils import update_user_ldata, new_thread, getSizeBytes +from bot.helper.ext_utils.bot_utils import update_user_ldata, getSizeBytes from bot.helper.ext_utils.db_handler import DbManager from bot.helper.ext_utils.media_utils import createThumb from bot.helper.telegram_helper.bot_commands import BotCommands @@ -30,8 +29,6 @@ from bot.helper.telegram_helper.message_utils import ( deleteMessage, ) -handler_dict = {} - async def get_user_settings(from_user): user_id = from_user.id @@ -203,55 +200,48 @@ async def update_user_settings(query): await editMessage(query.message, msg, button) -async def user_settings(_, message): +async def user_settings(client, message): + await client.stop_listening(chat_id=message.chat.id, user_id=message.from_user.id) from_user = message.from_user - handler_dict[from_user.id] = False msg, button = await get_user_settings(from_user) await sendMessage(message, msg, button) -async def set_thumb(_, message, pre_event): +async def set_thumb(message): user_id = message.from_user.id - handler_dict[user_id] = False des_dir = await createThumb(message, user_id) update_user_ldata(user_id, "thumb", des_dir) await deleteMessage(message) - await update_user_settings(pre_event) if DATABASE_URL: await DbManager().update_user_doc(user_id, "thumb", des_dir) -async def add_rclone(_, message, pre_event): +async def add_rclone(message): user_id = message.from_user.id - handler_dict[user_id] = False rpath = f"{getcwd()}/rclone/" await makedirs(rpath, exist_ok=True) des_dir = f"{rpath}{user_id}.conf" await message.download(file_name=des_dir) update_user_ldata(user_id, "rclone_config", f"rclone/{user_id}.conf") await deleteMessage(message) - await update_user_settings(pre_event) if DATABASE_URL: await DbManager().update_user_doc(user_id, "rclone_config", des_dir) -async def add_token_pickle(_, message, pre_event): +async def add_token_pickle(message): user_id = message.from_user.id - handler_dict[user_id] = False tpath = f"{getcwd()}/tokens/" await makedirs(tpath, exist_ok=True) des_dir = f"{tpath}{user_id}.pickle" await message.download(file_name=des_dir) update_user_ldata(user_id, "token_pickle", f"tokens/{user_id}.pickle") await deleteMessage(message) - await update_user_settings(pre_event) if DATABASE_URL: await DbManager().update_user_doc(user_id, "token_pickle", des_dir) -async def delete_path(_, message, pre_event): +async def delete_path(message): user_id = message.from_user.id - handler_dict[user_id] = False user_dict = user_data.get(user_id, {}) names = message.text.split() for name in names: @@ -260,14 +250,12 @@ async def delete_path(_, message, pre_event): new_value = user_dict["upload_paths"] update_user_ldata(user_id, "upload_paths", new_value) await deleteMessage(message) - await update_user_settings(pre_event) if DATABASE_URL: await DbManager().update_user_doc(user_id, "upload_paths", new_value) -async def set_option(_, message, pre_event, option): +async def set_option(message, option): user_id = message.from_user.id - handler_dict[user_id] = False value = message.text if option == "split_size": if not value.isdigit(): @@ -290,59 +278,42 @@ async def set_option(_, message, pre_event, option): data = line.split(maxsplit=1) if len(data) != 2: await sendMessage(message, "Wrong format! Add ") - await update_user_settings(pre_event) return name, path = data user_dict["upload_paths"][name] = path value = user_dict["upload_paths"] update_user_ldata(user_id, option, value) await deleteMessage(message) - await update_user_settings(pre_event) if DATABASE_URL: await DbManager().update_user_data(user_id) -async def event_handler(client, query, pfunc, photo=False, document=False): - user_id = query.from_user.id - handler_dict[user_id] = True - start_time = time() - - async def event_filter(_, __, event): - if photo: - mtype = event.photo - elif document: - mtype = event.document - else: - mtype = event.text - user = event.from_user or event.sender_chat - return bool( - user.id == user_id and event.chat.id == query.message.chat.id and mtype - ) - - handler = client.add_handler( - MessageHandler(pfunc, filters=create(event_filter)), group=-1 +async def event_handler(client, query, photo=False, document=False): + if photo: + event_filter = filters.photo + elif document: + event_filter = filters.document + else: + event_filter = filters.text + return await client.listen( + chat_id=query.message.chat.id, + user_id=query.from_user.id, + filters=event_filter, + timeout=60, ) - while handler_dict[user_id]: - await sleep(0.5) - if time() - start_time > 60: - handler_dict[user_id] = False - await update_user_settings(query) - client.remove_handler(*handler) - -@new_thread async def edit_user_settings(client, query): from_user = query.from_user user_id = from_user.id name = from_user.mention message = query.message data = query.data.split() - handler_dict[user_id] = False thumb_path = f"Thumbnails/{user_id}.jpg" rclone_conf = f"rclone/{user_id}.conf" token_pickle = f"tokens/{user_id}.pickle" user_dict = user_data.get(user_id, {}) + await client.stop_listening(chat_id=message.chat.id, user_id=query.from_user.id) if user_id != int(data[1]): await query.answer("Not Yours!", show_alert=True) elif data[2] in [ @@ -576,8 +547,14 @@ Stop Duplicate is {sd_msg}""" "Send a photo to save it as custom thumbnail. Timeout: 60 sec", buttons.build_menu(1), ) - pfunc = partial(set_thumb, pre_event=query) - await event_handler(client, query, pfunc, True) + try: + event = await event_handler(client, query, True) + except ListenerTimeout: + await update_user_settings(query) + except ListenerStopped: + pass + else: + await gather(set_thumb(event), update_user_settings(query)) elif data[2] == "yto": await query.answer() buttons = ButtonMaker() @@ -594,8 +571,14 @@ Example: format:bv*+mergeall[vcodec=none]|nocheckcertificate:True Check all yt-dlp api options from this FILE or use this script to convert cli arguments to api options. """ await editMessage(message, rmsg, buttons.build_menu(1)) - pfunc = partial(set_option, pre_event=query, option="yt_opt") - await event_handler(client, query, pfunc) + try: + event = await event_handler(client, query) + except ListenerTimeout: + await update_user_settings(query) + except ListenerStopped: + pass + else: + await gather(set_option(event, "yt_opt"), update_user_settings(query)) elif data[2] == "lss": await query.answer() buttons = ButtonMaker() @@ -608,8 +591,14 @@ Check all yt-dlp api options from this