naming convention PEP8

This commit is contained in:
5hojib 2024-12-05 22:11:05 +06:00
parent 69da540d80
commit 4efc68f741
13 changed files with 44 additions and 44 deletions

View File

@ -40,7 +40,7 @@ getLogger("pyrogram").setLevel(ERROR)
getLogger("httpx").setLevel(ERROR)
getLogger("pymongo").setLevel(ERROR)
botStartTime = time()
bot_start_time = time()
bot_loop = new_event_loop()
set_event_loop(bot_loop)

View File

@ -19,7 +19,7 @@ from time import time
from bot import (
bot,
botStartTime,
bot_start_time,
LOGGER,
intervals,
config_dict,
@ -77,7 +77,7 @@ async def stats(_, message):
memory = virtual_memory()
stats = (
f"<b>Commit Date:</b> {last_commit}\n\n"
f"<b>Bot Uptime:</b> {get_readable_time(time() - botStartTime)}\n"
f"<b>Bot Uptime:</b> {get_readable_time(time() - bot_start_time)}\n"
f"<b>OS Uptime:</b> {get_readable_time(time() - boot_time())}\n\n"
f"<b>Total Disk Space:</b> {get_readable_file_size(total)}\n"
f"<b>Used:</b> {get_readable_file_size(used)} | <b>Free:</b> {get_readable_file_size(free)}\n\n"

View File

@ -207,8 +207,8 @@ If DEFAULT_UPLOAD is `rc` then you can pass up: `gd` to upload using gdrive tool
rclone_cl = """<b>Rclone</b>: path
If DEFAULT_UPLOAD is `gd` then you can pass up: `rc` to upload to RCLONE_PATH.
/cmd rcl/rclone_path -up rcl/rclone_path/rc -rcf flagkey:flagvalue|flagkey|flagkey:flagvalue
/cmd rcl or rclonePath -up rclonePath or rc or rcl
/cmd mrcc:rclonePath -up rcl or rc(if you have add rclone path from usetting) (to use user config)"""
/cmd rcl or rclone_path -up rclone_path or rc or rcl
/cmd mrcc:rclone_path -up rcl or rc(if you have add rclone path from usetting) (to use user config)"""
name_sub = r"""<b>Name Substitution</b>: -ns
/cmd link -ns script/code/s | mirror/leech | tea/ /s | clone | cpu/ | \[mltb\]/mltb | \\text\\/text/s

View File

@ -7,7 +7,7 @@ from bot import (
DOWNLOAD_DIR,
task_dict,
task_dict_lock,
botStartTime,
bot_start_time,
config_dict,
status_dict,
)
@ -242,5 +242,5 @@ async def get_readable_message(sid, is_user, page_no=1, status="All", page_step=
buttons.data_button("♻️", f"status {sid} ref", position="header")
button = buttons.build_menu(8)
msg += f"<b>CPU:</b> {cpu_percent()}% | <b>FREE:</b> {get_readable_file_size(disk_usage(DOWNLOAD_DIR).free)}"
msg += f"\n<b>RAM:</b> {virtual_memory().percent}% | <b>UPTIME:</b> {get_readable_time(time() - botStartTime)}"
msg += f"\n<b>RAM:</b> {virtual_memory().percent}% | <b>UPTIME:</b> {get_readable_time(time() - bot_start_time)}"
return msg, button

View File

@ -42,7 +42,7 @@ async def _on_download_error(err, tor, button=None):
@new_task
async def _onSeedFinish(tor):
async def _on_seed_finish(tor):
ext_hash = tor.hash
LOGGER.info(f"Cancelling Seed: {tor.name}")
if task := await get_task_by_gid(ext_hash[:12]):
@ -192,7 +192,7 @@ async def _qb_listener():
and qb_torrents[tag]["seeding"]
):
qb_torrents[tag]["seeding"] = False
await _onSeedFinish(tor_info)
await _on_seed_finish(tor_info)
await sleep(0.5)
except Exception as e:
LOGGER.error(str(e))

View File

@ -277,7 +277,7 @@ class TaskListener(TaskConfig):
)
async def on_upload_complete(
self, link, files, folders, mime_type, rclonePath="", dir_id=""
self, link, files, folders, mime_type, rclone_path="", dir_id=""
):
if (
self.is_super_chat
@ -311,7 +311,7 @@ class TaskListener(TaskConfig):
msg += f"\n<b>Files: </b>{files}"
if (
link
or rclonePath
or rclone_path
and config_dict["RCLONE_SERVE_URL"]
and not self.private_link
):
@ -319,19 +319,19 @@ class TaskListener(TaskConfig):
if link:
buttons.url_button("☁️ Cloud Link", link)
else:
msg += f"\n\nPath: <code>{rclonePath}</code>"
msg += f"\n\nPath: <code>{rclone_path}</code>"
if (
rclonePath
rclone_path
and (RCLONE_SERVE_URL := config_dict["RCLONE_SERVE_URL"])
and not self.private_link
):
remote, path = rclonePath.split(":", 1)
remote, path = rclone_path.split(":", 1)
url_path = rutils.quote(f"{path}")
share_url = f"{RCLONE_SERVE_URL}/{remote}/{url_path}"
if mime_type == "Folder":
share_url += "/"
buttons.url_button("🔗 Rclone Link", share_url)
if not rclonePath and dir_id:
if not rclone_path and dir_id:
INDEX_URL = ""
if self.private_link:
INDEX_URL = self.user_dict.get("index_url", "") or ""
@ -345,7 +345,7 @@ class TaskListener(TaskConfig):
buttons.url_button("🌐 View Link", share_urls)
button = buttons.build_menu(2)
else:
msg += f"\n\nPath: <code>{rclonePath}</code>"
msg += f"\n\nPath: <code>{rclone_path}</code>"
button = None
msg += f"\n\n<b>cc: </b>{self.tag}"
await send_message(self.message, msg, button)

View File

@ -50,7 +50,7 @@ class TelegramDownloadHelper:
else:
LOGGER.info(f"Start Queued Download from Telegram: {self._listener.name}")
async def _onDownloadProgress(self, current, total):
async def _on_download_progress(self, current, total):
if self._listener.is_cancelled:
if self.session == "user":
user.stop_transmission()
@ -72,7 +72,7 @@ class TelegramDownloadHelper:
async def _download(self, message, path):
try:
download = await message.download(
file_name=path, progress=self._onDownloadProgress
file_name=path, progress=self._on_download_progress
)
if self._listener.is_cancelled:
await self._on_download_error("Cancelled by user!")

View File

@ -59,7 +59,7 @@ class GoogleDriveClone(GoogleDriveHelper):
mime_type = meta.get("mimeType")
if mime_type == self.G_DRIVE_DIR_MIME_TYPE:
dir_id = self.create_directory(meta.get("name"), self.listener.up_dest)
self._cloneFolder(meta.get("name"), meta.get("id"), dir_id)
self._clone_folder(meta.get("name"), meta.get("id"), dir_id)
durl = self.G_DRIVE_DIR_BASE_DOWNLOAD_URL.format(dir_id)
if self.listener.is_cancelled:
LOGGER.info("Deleting cloned data from Drive...")
@ -70,7 +70,7 @@ class GoogleDriveClone(GoogleDriveHelper):
mime_type = "Folder"
self.listener.size = self.proc_bytes
else:
file = self._copyFile(meta.get("id"), self.listener.up_dest)
file = self._copy_file(meta.get("id"), self.listener.up_dest)
msg += f'<b>Name: </b><code>{file.get("name")}</code>'
durl = self.G_DRIVE_BASE_DOWNLOAD_URL.format(file.get("id"))
if mime_type is None:
@ -102,7 +102,7 @@ class GoogleDriveClone(GoogleDriveHelper):
async_to_sync(self.listener.on_upload_error, msg)
return None, None, None, None, None
def _cloneFolder(self, folder_name, folder_id, dest_id):
def _clone_folder(self, folder_name, folder_id, dest_id):
LOGGER.info(f"Syncing: {folder_name}")
files = self.get_files_by_folder_id(folder_id)
if len(files) == 0:
@ -112,14 +112,14 @@ class GoogleDriveClone(GoogleDriveHelper):
self.total_folders += 1
file_path = ospath.join(folder_name, file.get("name"))
current_dir_id = self.create_directory(file.get("name"), dest_id)
self._cloneFolder(file_path, file.get("id"), current_dir_id)
self._clone_folder(file_path, file.get("id"), current_dir_id)
elif (
not file.get("name")
.lower()
.endswith(tuple(self.listener.extension_filter))
):
self.total_files += 1
self._copyFile(file.get("id"), dest_id)
self._copy_file(file.get("id"), dest_id)
self.proc_bytes += int(file.get("size", 0))
self.total_time = int(time() - self._start_time)
if self.listener.is_cancelled:
@ -130,7 +130,7 @@ class GoogleDriveClone(GoogleDriveHelper):
stop=stop_after_attempt(3),
retry=retry_if_exception_type(Exception),
)
def _copyFile(self, file_id, dest_id):
def _copy_file(self, file_id, dest_id):
body = {"parents": [dest_id]}
try:
return (
@ -159,7 +159,7 @@ class GoogleDriveClone(GoogleDriveHelper):
if self.listener.is_cancelled:
return
self.switch_service_account()
return self._copyFile(file_id, dest_id)
return self._copy_file(file_id, dest_id)
else:
LOGGER.error(f"Got: {reason}")
raise err

View File

@ -47,20 +47,20 @@ class GoogleDriveCount(GoogleDriveHelper):
LOGGER.info(f"Counting: {name}")
mime_type = meta.get("mimeType")
if mime_type == self.G_DRIVE_DIR_MIME_TYPE:
self._gDrive_directory(meta)
self._gdrive_directory(meta)
mime_type = "Folder"
else:
if mime_type is None:
mime_type = "File"
self.total_files += 1
self._gDrive_file(meta)
self._gdrive_file(meta)
return name, mime_type, self.proc_bytes, self.total_files, self.total_folders
def _gDrive_file(self, filee):
def _gdrive_file(self, filee):
size = int(filee.get("size", 0))
self.proc_bytes += size
def _gDrive_directory(self, drive_folder):
def _gdrive_directory(self, drive_folder):
files = self.get_files_by_folder_id(drive_folder["id"])
if len(files) == 0:
return
@ -74,7 +74,7 @@ class GoogleDriveCount(GoogleDriveHelper):
mime_type = filee.get("mimeType")
if mime_type == self.G_DRIVE_DIR_MIME_TYPE:
self.total_folders += 1
self._gDrive_directory(filee)
self._gdrive_directory(filee)
else:
self.total_files += 1
self._gDrive_file(filee)
self._gdrive_file(filee)

View File

@ -16,7 +16,7 @@ class GoogleDriveSearch(GoogleDriveHelper):
self._is_recursive = is_recursive
self._item_type = item_type
def _drive_query(self, dirId, file_name, is_recursive):
def _drive_query(self, dir_id, file_name, is_recursive):
try:
if is_recursive:
if self._stop_dup:
@ -33,7 +33,7 @@ class GoogleDriveSearch(GoogleDriveHelper):
elif self._item_type == "folders":
query += f"mimeType = '{self.G_DRIVE_DIR_MIME_TYPE}' and "
query += "trashed = false"
if dirId == "root":
if dir_id == "root":
return (
self.service.files()
.list(
@ -51,7 +51,7 @@ class GoogleDriveSearch(GoogleDriveHelper):
.list(
supportsAllDrives=True,
includeItemsFromAllDrives=True,
driveId=dirId,
driveId=dir_id,
q=query,
spaces="drive",
pageSize=150,
@ -63,9 +63,9 @@ class GoogleDriveSearch(GoogleDriveHelper):
)
else:
if self._stop_dup:
query = f"'{dirId}' in parents and name = '{file_name}' and "
query = f"'{dir_id}' in parents and name = '{file_name}' and "
else:
query = f"'{dirId}' in parents and "
query = f"'{dir_id}' in parents and "
file_name = file_name.split()
for name in file_name:
if name != "":

View File

@ -82,7 +82,7 @@ async def unauthorize(_, message):
@new_task
async def addSudo(_, message):
async def add_sudo(_, message):
id_ = ""
msg = message.text.split()
if len(msg) > 1:
@ -103,7 +103,7 @@ async def addSudo(_, message):
@new_task
async def removeSudo(_, message):
async def remove_sudo(_, message):
id_ = ""
msg = message.text.split()
if len(msg) > 1:
@ -136,14 +136,14 @@ bot.add_handler(
)
bot.add_handler(
MessageHandler(
addSudo,
add_sudo,
filters=command(BotCommands.AddSudoCommand, case_sensitive=True)
& CustomFilters.sudo,
)
)
bot.add_handler(
MessageHandler(
removeSudo,
remove_sudo,
filters=command(BotCommands.RmSudoCommand, case_sensitive=True)
& CustomFilters.sudo,
)

View File

@ -12,7 +12,7 @@ from ..helper.telegram_helper.message_utils import delete_message, send_message
@new_task
async def countNode(_, message):
async def count_node(_, message):
args = message.text.split()
user = message.from_user or message.sender_chat
if username := user.username:
@ -50,7 +50,7 @@ async def countNode(_, message):
bot.add_handler(
MessageHandler(
countNode,
count_node,
filters=command(BotCommands.CountCommand, case_sensitive=True)
& CustomFilters.authorized,
)

View File

@ -7,7 +7,7 @@ from bot import (
task_dict_lock,
status_dict,
task_dict,
botStartTime,
bot_start_time,
DOWNLOAD_DIR,
intervals,
bot,
@ -37,7 +37,7 @@ async def mirror_status(_, message):
async with task_dict_lock:
count = len(task_dict)
if count == 0:
currentTime = get_readable_time(time() - botStartTime)
currentTime = get_readable_time(time() - bot_start_time)
free = get_readable_file_size(disk_usage(DOWNLOAD_DIR).free)
msg = f"No Active Tasks!\nEach user can get status for his tasks by adding me or user_id after cmd: /{BotCommands.StatusCommand} me"
msg += (