mirror of
https://github.com/anasty17/mirror-leech-telegram-bot.git
synced 2025-01-08 12:07:33 +08:00
Queueing system
- fix yt-dlp no formats - handle google drive upload error - other minor fixes Signed-off-by: anasty17 <e.anastayyar@gmail.com>
This commit is contained in:
parent
73fd3e59a3
commit
4aa27ef088
@ -24,6 +24,7 @@ In each single file there is a major change from base code, it's almost totaly d
|
||||
- Set upload as document or as media for each user
|
||||
- 4GB file upload with premium account
|
||||
- Upload all files to specific superGroup/channel.
|
||||
- Leech Split size and equal split size settings for each user
|
||||
### Google
|
||||
- Stop duplicates for all tasks except yt-dlp tasks
|
||||
- Download from Google Drive
|
||||
@ -77,6 +78,7 @@ In each single file there is a major change from base code, it's almost totaly d
|
||||
- Custom name for all links except torrents. For files you should add extension except yt-dlp links
|
||||
- Extensions Filter for the files to be uploaded/cloned
|
||||
- View Link button. Extra button to open index link in broswer instead of direct download for file
|
||||
- Queueing System
|
||||
- Almost all repository functions have been improved and many other details can't mention all of them
|
||||
- Many bugs have been fixed
|
||||
|
||||
@ -201,6 +203,11 @@ Fill up rest of the fields. Meaning of each field is discussed below. **NOTE**:
|
||||
- `MEGA_EMAIL_ID`: E-Mail ID used to sign up on mega.nz for using premium account. `Str`
|
||||
- `MEGA_PASSWORD`: Password for mega.nz account. `Str`
|
||||
|
||||
### Queue System
|
||||
- `QUEUE_ALL`: Number of parallel tasks of downloads from (mega, telegram, yt-dlp, gdrive) + all uploads. For example if 20 task added and `QUEUE_ALL` is `8`, then the summation of uploading and downloading tasks are 8 and the rest in queue. `Int`. **NOTE**: if you want to fill `QUEUE_DOWNLOAD` or `QUEUE_UPLOAD`, then `QUEUE_ALL` value must be greater than or equal to the greatest one and less than or equal to summation of `QUEUE_UPLOAD` and `QUEUE_DOWNLOAD`.
|
||||
- `QUEUE_DOWNLOAD`: Number of parallel downloading tasks from mega, telegram, yt-dlp and gdrive. `Int`
|
||||
- `QUEUE_UPLOAD`: Number of all parallel uploading tasks. `Int`
|
||||
|
||||
### Buttons
|
||||
- `VIEW_LINK`: View Link button to open file Index Link in browser instead of direct download link, you can figure out if it's compatible with your Index code or not, open any video from you Index and check if its URL ends with `?a=view`. Compatible with [BhadooIndex](https://gitlab.com/ParveenBhadooOfficial/Google-Drive-Index) Code. Default is `False`. `Bool`
|
||||
|
||||
|
@ -38,6 +38,10 @@ GLOBAL_EXTENSION_FILTER = ['.aria2']
|
||||
user_data = {}
|
||||
aria2_options = {}
|
||||
qbit_options = {}
|
||||
queued_dl = {}
|
||||
queued_up = {}
|
||||
non_queued_dl = set()
|
||||
non_queued_up = set()
|
||||
|
||||
try:
|
||||
if bool(environ.get('_____REMOVE_THIS_LINE_____')):
|
||||
@ -48,6 +52,7 @@ except:
|
||||
|
||||
download_dict_lock = Lock()
|
||||
status_reply_dict_lock = Lock()
|
||||
queue_dict_lock = Lock()
|
||||
# Key: update.effective_chat.id
|
||||
# Value: telegram.Message
|
||||
status_reply_dict = {}
|
||||
@ -58,6 +63,11 @@ download_dict = {}
|
||||
# value: {link, last_feed, last_title, filter}
|
||||
rss_dict = {}
|
||||
|
||||
if ospath.exists('pyrogram.session'):
|
||||
osremove('pyrogram.session')
|
||||
if ospath.exists('pyrogram.session-journal'):
|
||||
osremove('pyrogram.session-journal')
|
||||
|
||||
BOT_TOKEN = environ.get('BOT_TOKEN', '')
|
||||
if len(BOT_TOKEN) == 0:
|
||||
log_error("BOT_TOKEN variable is missing! Exiting now")
|
||||
@ -189,9 +199,9 @@ RSS_COMMAND = environ.get('RSS_COMMAND', '')
|
||||
if len(RSS_COMMAND) == 0:
|
||||
RSS_COMMAND = ''
|
||||
|
||||
LEECH_FILENAME_PERFIX = environ.get('LEECH_FILENAME_PERFIX', '')
|
||||
if len(LEECH_FILENAME_PERFIX) == 0:
|
||||
LEECH_FILENAME_PERFIX = ''
|
||||
LEECH_FILENAME_PREFIX = environ.get('LEECH_FILENAME_PREFIX', '')
|
||||
if len(LEECH_FILENAME_PREFIX) == 0:
|
||||
LEECH_FILENAME_PREFIX = ''
|
||||
|
||||
SEARCH_PLUGINS = environ.get('SEARCH_PLUGINS', '')
|
||||
if len(SEARCH_PLUGINS) == 0:
|
||||
@ -241,6 +251,15 @@ RSS_DELAY = 900 if len(RSS_DELAY) == 0 else int(RSS_DELAY)
|
||||
TORRENT_TIMEOUT = environ.get('TORRENT_TIMEOUT', '')
|
||||
TORRENT_TIMEOUT = '' if len(TORRENT_TIMEOUT) == 0 else int(TORRENT_TIMEOUT)
|
||||
|
||||
QUEUE_ALL = environ.get('QUEUE_ALL', '')
|
||||
QUEUE_ALL = '' if len(QUEUE_ALL) == 0 else int(QUEUE_ALL)
|
||||
|
||||
QUEUE_DOWNLOAD = environ.get('QUEUE_DOWNLOAD', '')
|
||||
QUEUE_DOWNLOAD = '' if len(QUEUE_DOWNLOAD) == 0 else int(QUEUE_DOWNLOAD)
|
||||
|
||||
QUEUE_UPLOAD = environ.get('QUEUE_UPLOAD', '')
|
||||
QUEUE_UPLOAD = '' if len(QUEUE_UPLOAD) == 0 else int(QUEUE_UPLOAD)
|
||||
|
||||
INCOMPLETE_TASK_NOTIFIER = environ.get('INCOMPLETE_TASK_NOTIFIER', '')
|
||||
INCOMPLETE_TASK_NOTIFIER = INCOMPLETE_TASK_NOTIFIER.lower() == 'true'
|
||||
|
||||
@ -303,12 +322,15 @@ config_dict = {'AS_DOCUMENT': AS_DOCUMENT,
|
||||
'INCOMPLETE_TASK_NOTIFIER': INCOMPLETE_TASK_NOTIFIER,
|
||||
'INDEX_URL': INDEX_URL,
|
||||
'IS_TEAM_DRIVE': IS_TEAM_DRIVE,
|
||||
'LEECH_FILENAME_PERFIX': LEECH_FILENAME_PERFIX,
|
||||
'LEECH_FILENAME_PREFIX': LEECH_FILENAME_PREFIX,
|
||||
'LEECH_SPLIT_SIZE': LEECH_SPLIT_SIZE,
|
||||
'MEGA_API_KEY': MEGA_API_KEY,
|
||||
'MEGA_EMAIL_ID': MEGA_EMAIL_ID,
|
||||
'MEGA_PASSWORD': MEGA_PASSWORD,
|
||||
'OWNER_ID': OWNER_ID,
|
||||
'QUEUE_ALL': QUEUE_ALL,
|
||||
'QUEUE_DOWNLOAD': QUEUE_DOWNLOAD,
|
||||
'QUEUE_UPLOAD': QUEUE_UPLOAD,
|
||||
'RSS_USER_SESSION_STRING': RSS_USER_SESSION_STRING,
|
||||
'RSS_CHAT_ID': RSS_CHAT_ID,
|
||||
'RSS_COMMAND': RSS_COMMAND,
|
||||
|
@ -85,7 +85,7 @@ def log(update, context):
|
||||
sendLogFile(context.bot, update.message)
|
||||
|
||||
help_string = f'''
|
||||
NOTE: Try each command without any perfix to see more detalis.
|
||||
NOTE: Try each command without any argument to see more detalis.
|
||||
/{BotCommands.MirrorCommand[0]} or /{BotCommands.MirrorCommand[1]}: Start mirroring to Google Drive.
|
||||
/{BotCommands.ZipMirrorCommand[0]} or /{BotCommands.ZipMirrorCommand[1]}: Start mirroring and upload the file/folder compressed with zip extension.
|
||||
/{BotCommands.UnzipMirrorCommand[0]} or /{BotCommands.UnzipMirrorCommand[1]}: Start mirroring and upload the file/folder extracted from any archive extension.
|
||||
|
@ -24,7 +24,8 @@ class MirrorStatus:
|
||||
STATUS_UPLOADING = "Upload"
|
||||
STATUS_DOWNLOADING = "Download"
|
||||
STATUS_CLONING = "Clone"
|
||||
STATUS_WAITING = "Queue"
|
||||
STATUS_QUEUEDL = "QueueDl"
|
||||
STATUS_QUEUEUP = "QueueUp"
|
||||
STATUS_PAUSED = "Pause"
|
||||
STATUS_ARCHIVING = "Archive"
|
||||
STATUS_EXTRACTING = "Extract"
|
||||
|
79
bot/helper/ext_utils/queued_starter.py
Normal file
79
bot/helper/ext_utils/queued_starter.py
Normal file
@ -0,0 +1,79 @@
|
||||
from threading import Thread
|
||||
|
||||
from bot import config_dict, queued_dl, queued_up, non_queued_up, non_queued_dl, queue_dict_lock
|
||||
from bot.helper.mirror_utils.download_utils.gd_downloader import add_gd_download
|
||||
from bot.helper.mirror_utils.download_utils.mega_downloader import add_mega_download
|
||||
from bot.helper.mirror_utils.download_utils.telegram_downloader import TelegramDownloadHelper
|
||||
from bot.helper.mirror_utils.download_utils.yt_dlp_download_helper import YoutubeDLHelper
|
||||
|
||||
def start_dl_from_queued(uid):
|
||||
dl = queued_dl[uid]
|
||||
if dl[0] == 'gd':
|
||||
Thread(target=add_gd_download, args=(dl[1], dl[2], dl[3], dl[4], True)).start()
|
||||
elif dl[0] == 'mega':
|
||||
Thread(target=add_mega_download, args=(dl[1], dl[2], dl[3], dl[4], True)).start()
|
||||
elif dl[0] == 'yt':
|
||||
ydl = YoutubeDLHelper(dl[7])
|
||||
Thread(target=ydl.add_download, args=(dl[1], dl[2], dl[3], dl[4], dl[5], dl[6], True)).start()
|
||||
elif dl[0] == 'tg':
|
||||
tg = TelegramDownloadHelper(dl[4])
|
||||
Thread(target=tg.add_download, args=(dl[1], dl[2], dl[3], True)).start()
|
||||
del queued_dl[uid]
|
||||
|
||||
def start_up_from_queued(uid):
|
||||
up = queued_up[uid]
|
||||
up[0].queuedUp = False
|
||||
del queued_up[uid]
|
||||
|
||||
def start_from_queued():
|
||||
if all_limit := config_dict['QUEUE_ALL']:
|
||||
dl_limit = config_dict['QUEUE_DOWNLOAD']
|
||||
with queue_dict_lock:
|
||||
dl = len(non_queued_dl)
|
||||
up = len(non_queued_up)
|
||||
all_ = dl + up
|
||||
if all_ < all_limit:
|
||||
f_tasks = all_limit - all_
|
||||
if queued_up and (not up_limit or up < up_limit):
|
||||
for index, uid in enumerate(list(queued_up.keys()), start=1):
|
||||
f_tasks = all_limit - all_
|
||||
start_up_from_queued(uid)
|
||||
f_tasks -= 1
|
||||
if f_tasks == 0 or (up_limit and index >= up_limit - up):
|
||||
break
|
||||
if queued_dl and (not dl_limit or dl < dl_limit) and f_tasks != 0:
|
||||
for index, uid in enumerate(list(queued_dl.keys()), start=1):
|
||||
start_dl_from_queued(uid)
|
||||
if (dl_limit and index >= dl_limit - dl) or index == f_tasks:
|
||||
break
|
||||
return
|
||||
|
||||
if up_limit := config_dict['QUEUE_UPLOAD']:
|
||||
with queue_dict_lock:
|
||||
up = len(non_queued_up)
|
||||
if queued_up and up < up_limit:
|
||||
f_tasks = up_limit - up
|
||||
for index, uid in enumerate(list(queued_up.keys()), start=1):
|
||||
start_up_from_queued(uid)
|
||||
if index == f_tasks:
|
||||
break
|
||||
else:
|
||||
with queue_dict_lock:
|
||||
if queued_up:
|
||||
for uid in list(queued_up.keys()):
|
||||
start_up_from_queued(uid)
|
||||
|
||||
if dl_limit := config_dict['QUEUE_ALL']:
|
||||
with queue_dict_lock:
|
||||
dl = len(non_queued_dl)
|
||||
if queued_dl and dl < dl_limit:
|
||||
f_tasks = dl_limit - dl
|
||||
for index, uid in enumerate(list(queued_dl.keys()), start=1):
|
||||
start_dl_from_queued(uid)
|
||||
if index == f_tasks:
|
||||
break
|
||||
else:
|
||||
with queue_dict_lock:
|
||||
if queued_dl:
|
||||
for uid in list(queued_dl.keys()):
|
||||
start_dl_from_queued(uid)
|
@ -1,14 +1,15 @@
|
||||
from random import SystemRandom
|
||||
from string import ascii_letters, digits
|
||||
|
||||
from bot import download_dict, download_dict_lock, LOGGER, config_dict
|
||||
from bot import download_dict, download_dict_lock, LOGGER, config_dict, non_queued_dl, non_queued_up, queued_dl, queue_dict_lock
|
||||
from bot.helper.mirror_utils.upload_utils.gdriveTools import GoogleDriveHelper
|
||||
from bot.helper.mirror_utils.status_utils.gd_download_status import GdDownloadStatus
|
||||
from bot.helper.mirror_utils.status_utils.queue_status import QueueStatus
|
||||
from bot.helper.telegram_helper.message_utils import sendMessage, sendStatusMessage
|
||||
from bot.helper.ext_utils.fs_utils import get_base_name
|
||||
|
||||
|
||||
def add_gd_download(link, path, listener, newname):
|
||||
def add_gd_download(link, path, listener, newname, from_queue=False):
|
||||
res, size, name, files = GoogleDriveHelper().helper(link)
|
||||
if res != "":
|
||||
return sendMessage(res, listener.bot, listener.message)
|
||||
@ -28,12 +29,33 @@ def add_gd_download(link, path, listener, newname):
|
||||
if gmsg:
|
||||
msg = "File/Folder is already available in Drive.\nHere are the search results:"
|
||||
return sendMessage(msg, listener.bot, listener.message, button)
|
||||
LOGGER.info(f"Download Name: {name}")
|
||||
drive = GoogleDriveHelper(name, path, size, listener)
|
||||
gid = ''.join(SystemRandom().choices(ascii_letters + digits, k=12))
|
||||
download_status = GdDownloadStatus(drive, size, listener, gid)
|
||||
all_limit = config_dict['QUEUE_ALL']
|
||||
dl_limit = config_dict['QUEUE_DOWNLOAD']
|
||||
if all_limit or dl_limit:
|
||||
added_to_queue = False
|
||||
with queue_dict_lock:
|
||||
dl = len(non_queued_dl)
|
||||
up = len(non_queued_up)
|
||||
if (all_limit and dl + up >= all_limit and (not dl_limit or dl >= dl_limit)) or (dl_limit and dl >= dl_limit):
|
||||
added_to_queue = True
|
||||
queued_dl[listener.uid] = ['gd', link, path, listener, newname]
|
||||
if added_to_queue:
|
||||
LOGGER.info(f"Added to Queue/Download: {name}")
|
||||
with download_dict_lock:
|
||||
download_dict[listener.uid] = QueueStatus(name, size, gid, listener, 'Dl')
|
||||
listener.onDownloadStart()
|
||||
sendStatusMessage(listener.message, listener.bot)
|
||||
return
|
||||
drive = GoogleDriveHelper(name, path, size, listener)
|
||||
with download_dict_lock:
|
||||
download_dict[listener.uid] = download_status
|
||||
listener.onDownloadStart()
|
||||
sendStatusMessage(listener.message, listener.bot)
|
||||
download_dict[listener.uid] = GdDownloadStatus(drive, size, listener, gid)
|
||||
with queue_dict_lock:
|
||||
non_queued_dl.add(listener.uid)
|
||||
if not from_queue:
|
||||
LOGGER.info(f"Download from GDrive: {name}")
|
||||
listener.onDownloadStart()
|
||||
sendStatusMessage(listener.message, listener.bot)
|
||||
else:
|
||||
LOGGER.info(f'Start Queued Download from GDrive: {name}')
|
||||
drive.download(link)
|
||||
|
@ -4,10 +4,11 @@ from os import makedirs
|
||||
from threading import Event
|
||||
from mega import (MegaApi, MegaListener, MegaRequest, MegaTransfer, MegaError)
|
||||
|
||||
from bot import LOGGER, config_dict, download_dict_lock, download_dict
|
||||
from bot import LOGGER, config_dict, download_dict_lock, download_dict, non_queued_dl, non_queued_up, queued_dl, queue_dict_lock
|
||||
from bot.helper.telegram_helper.message_utils import sendMessage, sendStatusMessage
|
||||
from bot.helper.ext_utils.bot_utils import get_mega_link_type
|
||||
from bot.helper.mirror_utils.status_utils.mega_download_status import MegaDownloadStatus
|
||||
from bot.helper.mirror_utils.status_utils.queue_status import QueueStatus
|
||||
from bot.helper.mirror_utils.upload_utils.gdriveTools import GoogleDriveHelper
|
||||
from bot.helper.ext_utils.fs_utils import get_base_name
|
||||
|
||||
@ -130,7 +131,7 @@ class AsyncExecutor:
|
||||
self.continue_event.wait()
|
||||
|
||||
|
||||
def add_mega_download(mega_link: str, path: str, listener, name: str):
|
||||
def add_mega_download(mega_link, path, listener, name, from_queue=False):
|
||||
MEGA_API_KEY = config_dict['MEGA_API_KEY']
|
||||
MEGA_EMAIL_ID = config_dict['MEGA_EMAIL_ID']
|
||||
MEGA_PASSWORD = config_dict['MEGA_PASSWORD']
|
||||
@ -174,14 +175,41 @@ def add_mega_download(mega_link: str, path: str, listener, name: str):
|
||||
if folder_api is not None:
|
||||
folder_api.removeListener(mega_listener)
|
||||
return
|
||||
with download_dict_lock:
|
||||
download_dict[listener.uid] = MegaDownloadStatus(mega_listener, listener)
|
||||
listener.onDownloadStart()
|
||||
makedirs(path)
|
||||
gid = ''.join(SystemRandom().choices(ascii_letters + digits, k=8))
|
||||
mname = name or node.getName()
|
||||
mega_listener.setValues(mname, api.getSize(node), gid)
|
||||
sendStatusMessage(listener.message, listener.bot)
|
||||
size = api.getSize(node)
|
||||
all_limit = config_dict['QUEUE_ALL']
|
||||
dl_limit = config_dict['QUEUE_DOWNLOAD']
|
||||
if all_limit or dl_limit:
|
||||
added_to_queue = False
|
||||
with queue_dict_lock:
|
||||
dl = len(non_queued_dl)
|
||||
up = len(non_queued_up)
|
||||
if (all_limit and dl + up >= all_limit and (not dl_limit or dl >= dl_limit)) or (dl_limit and dl >= dl_limit):
|
||||
added_to_queue = True
|
||||
queued_dl[listener.uid] = ['mega', mega_link, path, listener, name]
|
||||
if added_to_queue:
|
||||
LOGGER.info(f"Added to Queue/Download: {mname}")
|
||||
with download_dict_lock:
|
||||
download_dict[listener.uid] = QueueStatus(mname, size, gid, listener, 'Dl')
|
||||
listener.onDownloadStart()
|
||||
sendStatusMessage(listener.message, listener.bot)
|
||||
api.removeListener(mega_listener)
|
||||
if folder_api is not None:
|
||||
folder_api.removeListener(mega_listener)
|
||||
return
|
||||
with download_dict_lock:
|
||||
download_dict[listener.uid] = MegaDownloadStatus(mega_listener, listener)
|
||||
with queue_dict_lock:
|
||||
non_queued_dl.add(listener.uid)
|
||||
makedirs(path)
|
||||
mega_listener.setValues(mname, size, gid)
|
||||
if not from_queue:
|
||||
listener.onDownloadStart()
|
||||
sendStatusMessage(listener.message, listener.bot)
|
||||
LOGGER.info(f"Download from Mega: {mname}")
|
||||
else:
|
||||
LOGGER.info(f'Start Queued Download from Mega: {mname}')
|
||||
executor.do(api.startDownload, (node, path, name, None, False, None))
|
||||
api.removeListener(mega_listener)
|
||||
if folder_api is not None:
|
||||
|
@ -2,8 +2,9 @@ from logging import getLogger, WARNING
|
||||
from time import time
|
||||
from threading import RLock, Lock
|
||||
|
||||
from bot import LOGGER, download_dict, download_dict_lock, app, config_dict
|
||||
from bot import LOGGER, download_dict, download_dict_lock, app, config_dict, non_queued_dl, non_queued_up, queued_dl, queue_dict_lock
|
||||
from ..status_utils.telegram_download_status import TelegramDownloadStatus
|
||||
from bot.helper.mirror_utils.status_utils.queue_status import QueueStatus
|
||||
from bot.helper.telegram_helper.message_utils import sendStatusMessage, sendMessage
|
||||
from bot.helper.mirror_utils.upload_utils.gdriveTools import GoogleDriveHelper
|
||||
|
||||
@ -30,7 +31,7 @@ class TelegramDownloadHelper:
|
||||
with self.__resource_lock:
|
||||
return self.downloaded_bytes / (time() - self.__start_time)
|
||||
|
||||
def __onDownloadStart(self, name, size, file_id):
|
||||
def __onDownloadStart(self, name, size, file_id, from_queue):
|
||||
with global_lock:
|
||||
GLOBAL_GID.add(file_id)
|
||||
with self.__resource_lock:
|
||||
@ -39,8 +40,14 @@ class TelegramDownloadHelper:
|
||||
self.__id = file_id
|
||||
with download_dict_lock:
|
||||
download_dict[self.__listener.uid] = TelegramDownloadStatus(self, self.__listener, self.__id)
|
||||
self.__listener.onDownloadStart()
|
||||
sendStatusMessage(self.__listener.message, self.__listener.bot)
|
||||
with queue_dict_lock:
|
||||
non_queued_dl.add(self.__listener.uid)
|
||||
if not from_queue:
|
||||
self.__listener.onDownloadStart()
|
||||
sendStatusMessage(self.__listener.message, self.__listener.bot)
|
||||
LOGGER.info(f'Download from Telegram: {name}')
|
||||
else:
|
||||
LOGGER.info(f'Start Queued Download from Telegram: {name}')
|
||||
|
||||
def __onDownloadProgress(self, current, total):
|
||||
if self.__is_cancelled:
|
||||
@ -80,7 +87,7 @@ class TelegramDownloadHelper:
|
||||
elif not self.__is_cancelled:
|
||||
self.__onDownloadError('Internal error occurred')
|
||||
|
||||
def add_download(self, message, path, filename):
|
||||
def add_download(self, message, path, filename, from_queue=False):
|
||||
_dmsg = app.get_messages(message.chat.id, reply_to_message_ids=message.message_id)
|
||||
media = _dmsg.document or _dmsg.video or _dmsg.audio or None
|
||||
if media is not None:
|
||||
@ -93,16 +100,36 @@ class TelegramDownloadHelper:
|
||||
name = filename
|
||||
path = path + name
|
||||
|
||||
if download:
|
||||
if from_queue or download:
|
||||
size = media.file_size
|
||||
gid = media.file_unique_id
|
||||
if config_dict['STOP_DUPLICATE'] and not self.__listener.isLeech:
|
||||
LOGGER.info('Checking File/Folder if already in Drive...')
|
||||
smsg, button = GoogleDriveHelper().drive_list(name, True, True)
|
||||
if smsg:
|
||||
msg = "File/Folder is already available in Drive.\nHere are the search results:"
|
||||
return sendMessage(msg, self.__listener.bot, self.__listener.message, button)
|
||||
self.__onDownloadStart(name, size, media.file_unique_id)
|
||||
LOGGER.info(f'Downloading Telegram file with id: {media.file_unique_id}')
|
||||
sendMessage(msg, self.__listener.bot, self.__listener.message, button)
|
||||
return
|
||||
all_limit = config_dict['QUEUE_ALL']
|
||||
dl_limit = config_dict['QUEUE_DOWNLOAD']
|
||||
if all_limit or dl_limit:
|
||||
added_to_queue = False
|
||||
with queue_dict_lock:
|
||||
dl = len(non_queued_dl)
|
||||
up = len(non_queued_up)
|
||||
if (all_limit and dl + up >= all_limit and (not dl_limit or dl >= dl_limit)) or (dl_limit and dl >= dl_limit):
|
||||
added_to_queue = True
|
||||
queued_dl[self.__listener.uid] = ['tg', message, path, filename, self.__listener]
|
||||
if added_to_queue:
|
||||
LOGGER.info(f"Added to Queue/Download: {name}")
|
||||
with download_dict_lock:
|
||||
download_dict[self.__listener.uid] = QueueStatus(name, size, gid, self.__listener, 'Dl')
|
||||
self.__listener.onDownloadStart()
|
||||
sendStatusMessage(self.__listener.message, self.__listener.bot)
|
||||
with global_lock:
|
||||
GLOBAL_GID.add(gid)
|
||||
return
|
||||
self.__onDownloadStart(name, size, gid, from_queue)
|
||||
self.__download(_dmsg, path)
|
||||
else:
|
||||
self.__onDownloadError('File already being downloaded!')
|
||||
|
@ -7,9 +7,10 @@ from threading import RLock
|
||||
from re import search as re_search
|
||||
from json import loads as jsonloads
|
||||
|
||||
from bot import download_dict_lock, download_dict
|
||||
from bot import download_dict_lock, download_dict, config_dict, non_queued_dl, non_queued_up, queued_dl, queue_dict_lock
|
||||
from bot.helper.telegram_helper.message_utils import sendStatusMessage
|
||||
from ..status_utils.yt_dlp_download_status import YtDlpDownloadStatus
|
||||
from bot.helper.mirror_utils.status_utils.queue_status import QueueStatus
|
||||
|
||||
LOGGER = getLogger(__name__)
|
||||
|
||||
@ -116,11 +117,15 @@ class YoutubeDLHelper:
|
||||
except:
|
||||
pass
|
||||
|
||||
def __onDownloadStart(self):
|
||||
def __onDownloadStart(self, from_queue):
|
||||
with download_dict_lock:
|
||||
download_dict[self.__listener.uid] = YtDlpDownloadStatus(self, self.__listener, self.__gid)
|
||||
self.__listener.onDownloadStart()
|
||||
sendStatusMessage(self.__listener.message, self.__listener.bot)
|
||||
if not from_queue:
|
||||
self.__listener.onDownloadStart()
|
||||
sendStatusMessage(self.__listener.message, self.__listener.bot)
|
||||
LOGGER.info(f'Download with YT_DLP: {self.name}')
|
||||
else:
|
||||
LOGGER.info(f'Start Queued Download with YT_DLP: {self.name}')
|
||||
|
||||
def __onDownloadComplete(self):
|
||||
self.__listener.onDownloadComplete()
|
||||
@ -187,19 +192,18 @@ class YoutubeDLHelper:
|
||||
except ValueError:
|
||||
self.__onDownloadError("Download Stopped by User!")
|
||||
|
||||
def add_download(self, link, path, name, qual, playlist, args):
|
||||
def add_download(self, link, path, name, qual, playlist, args, from_queue=False):
|
||||
if playlist:
|
||||
self.opts['ignoreerrors'] = True
|
||||
self.is_playlist = True
|
||||
self.__gid = ''.join(SystemRandom().choices(ascii_letters + digits, k=10))
|
||||
self.__onDownloadStart()
|
||||
self.__onDownloadStart(from_queue)
|
||||
if qual.startswith('ba/b-'):
|
||||
mp3_info = qual.split('-')
|
||||
qual = mp3_info[0]
|
||||
rate = mp3_info[1]
|
||||
self.opts['postprocessors'] = [{'key': 'FFmpegExtractAudio', 'preferredcodec': 'mp3', 'preferredquality': rate}]
|
||||
self.opts['format'] = qual
|
||||
LOGGER.info(f"Downloading with YT-DLP: {link}")
|
||||
self.extractMetaData(link, name, args)
|
||||
if self.__is_cancelled:
|
||||
return
|
||||
@ -211,6 +215,25 @@ class YoutubeDLHelper:
|
||||
folder_name = self.name.rsplit('.', 1)[0]
|
||||
self.opts['outtmpl'] = f"{path}/{folder_name}/{self.name}"
|
||||
self.name = folder_name
|
||||
all_limit = config_dict['QUEUE_ALL']
|
||||
dl_limit = config_dict['QUEUE_DOWNLOAD']
|
||||
if all_limit or dl_limit:
|
||||
added_to_queue = False
|
||||
with queue_dict_lock:
|
||||
dl = len(non_queued_dl)
|
||||
up = len(non_queued_up)
|
||||
if (all_limit and dl + up >= all_limit and (not dl_limit or dl >= dl_limit)) or (dl_limit and dl >= dl_limit):
|
||||
added_to_queue = True
|
||||
queued_dl[self.__listener.uid] = ['yt', link, path, name, qual, playlist, args, self.__listener]
|
||||
if added_to_queue:
|
||||
LOGGER.info(f"Added to Queue/Download: {self.name}")
|
||||
with download_dict_lock:
|
||||
download_dict[self.__listener.uid] = QueueStatus(self.name, self.__size, self.__gid, self.__listener, 'Dl')
|
||||
self.__listener.onDownloadStart()
|
||||
sendStatusMessage(self.__listener.message, self.__listener.bot)
|
||||
return
|
||||
with queue_dict_lock:
|
||||
non_queued_dl.add(self.__listener.uid)
|
||||
self.__download(link, path)
|
||||
|
||||
def cancel_download(self):
|
||||
|
@ -63,7 +63,7 @@ class AriaDownloadStatus:
|
||||
self.__update()
|
||||
download = self.__download
|
||||
if download.is_waiting:
|
||||
return MirrorStatus.STATUS_WAITING
|
||||
return MirrorStatus.STATUS_QUEUEDL
|
||||
elif download.is_paused:
|
||||
return MirrorStatus.STATUS_PAUSED
|
||||
elif download.seeder and self.seeding:
|
||||
|
@ -62,7 +62,7 @@ class QbDownloadStatus:
|
||||
self.__update()
|
||||
download = self.__info.state
|
||||
if download in ["queuedDL", "queuedUP"]:
|
||||
return MirrorStatus.STATUS_WAITING
|
||||
return MirrorStatus.STATUS_QUEUEDL
|
||||
elif download in ["pausedDL", "pausedUP"]:
|
||||
return MirrorStatus.STATUS_PAUSED
|
||||
elif download in ["checkingUP", "checkingDL"]:
|
||||
|
52
bot/helper/mirror_utils/status_utils/queue_status.py
Normal file
52
bot/helper/mirror_utils/status_utils/queue_status.py
Normal file
@ -0,0 +1,52 @@
|
||||
from bot import LOGGER
|
||||
from bot.helper.ext_utils.bot_utils import get_readable_file_size, MirrorStatus
|
||||
|
||||
|
||||
class QueueStatus:
|
||||
def __init__(self, name, size, gid, listener, state):
|
||||
self.__name = name
|
||||
self.__size = size
|
||||
self.__gid = gid
|
||||
self.__listener = listener
|
||||
self.__state = state
|
||||
self.message = listener.message
|
||||
|
||||
def gid(self):
|
||||
return self.__gid
|
||||
|
||||
def name(self):
|
||||
return self.__name
|
||||
|
||||
def size_raw(self):
|
||||
return self.__size
|
||||
|
||||
def size(self):
|
||||
return get_readable_file_size(self.__size)
|
||||
|
||||
def status(self):
|
||||
if self.__state == 'Dl':
|
||||
return MirrorStatus.STATUS_QUEUEDL
|
||||
else:
|
||||
return MirrorStatus.STATUS_QUEUEUP
|
||||
|
||||
def processed_bytes(self):
|
||||
return 0
|
||||
|
||||
def progress(self):
|
||||
return '0%'
|
||||
|
||||
def speed(self):
|
||||
return '0B/s'
|
||||
|
||||
def eta(self):
|
||||
return '-'
|
||||
|
||||
def download(self):
|
||||
return self
|
||||
|
||||
def cancel_download(self):
|
||||
LOGGER.info(f'Cancelling Queue{self.__state}: {self.__name}')
|
||||
if self.__state == 'Dl':
|
||||
self.__listener.onDownloadError('task have been removed from queue/download')
|
||||
else:
|
||||
self.__listener.onUploadError('task have been removed from queue/upload')
|
@ -319,9 +319,15 @@ class GoogleDriveHelper:
|
||||
]:
|
||||
raise err
|
||||
if config_dict['USE_SERVICE_ACCOUNTS']:
|
||||
self.__switchServiceAccount()
|
||||
LOGGER.info(f"Got: {reason}, Trying Again.")
|
||||
return self.__upload_file(file_path, file_name, mime_type, dest_id)
|
||||
if self.__sa_count >= SERVICE_ACCOUNTS_NUMBER:
|
||||
LOGGER.info(f"Reached maximum number of service accounts switching, which is {self.__sa_count}")
|
||||
raise err
|
||||
else:
|
||||
if self.__is_cancelled:
|
||||
return
|
||||
self.__switchServiceAccount()
|
||||
LOGGER.info(f"Got: {reason}, Trying Again.")
|
||||
return self.__upload_file(file_path, file_name, mime_type, dest_id)
|
||||
else:
|
||||
LOGGER.error(f"Got: {reason}")
|
||||
raise err
|
||||
@ -439,10 +445,12 @@ class GoogleDriveHelper:
|
||||
reason = jsnloads(err.content).get('error').get('errors')[0].get('reason')
|
||||
if reason in ['userRateLimitExceeded', 'dailyLimitExceeded']:
|
||||
if config_dict['USE_SERVICE_ACCOUNTS']:
|
||||
if self.__sa_count == SERVICE_ACCOUNTS_NUMBER:
|
||||
if self.__sa_count >= SERVICE_ACCOUNTS_NUMBER:
|
||||
LOGGER.info(f"Reached maximum number of service accounts switching, which is {self.__sa_count}")
|
||||
raise err
|
||||
else:
|
||||
if self.__is_cancelled:
|
||||
return
|
||||
self.__switchServiceAccount()
|
||||
return self.__copyFile(file_id, dest_id)
|
||||
else:
|
||||
@ -796,10 +804,12 @@ class GoogleDriveHelper:
|
||||
]:
|
||||
raise err
|
||||
if config_dict['USE_SERVICE_ACCOUNTS']:
|
||||
if self.__sa_count == SERVICE_ACCOUNTS_NUMBER:
|
||||
if self.__sa_count >= SERVICE_ACCOUNTS_NUMBER:
|
||||
LOGGER.info(f"Reached maximum number of service accounts switching, which is {self.__sa_count}")
|
||||
raise err
|
||||
else:
|
||||
if self.__is_cancelled:
|
||||
return
|
||||
self.__switchServiceAccount()
|
||||
LOGGER.info(f"Got: {reason}, Trying Again...")
|
||||
return self.__download_file(file_id, path, filename, mime_type)
|
||||
|
@ -63,6 +63,9 @@ class TgUploader:
|
||||
sleep(1)
|
||||
if self.__listener.seed and not self.__listener.newDir:
|
||||
clean_unwanted(self.__path)
|
||||
if self.__total_files == 0:
|
||||
self.__listener.onUploadError('No files to upload. Make sure if you filled USER_SESSION_STRING then you should use supergroup. In case you filled EXTENSION_FILTER then check if all file have this extension')
|
||||
return
|
||||
if self.__total_files <= self.__corrupted:
|
||||
self.__listener.onUploadError('Files Corrupted. Check logs!')
|
||||
return
|
||||
@ -71,9 +74,9 @@ class TgUploader:
|
||||
self.__listener.onUploadComplete(None, size, self.__msgs_dict, self.__total_files, self.__corrupted, self.name)
|
||||
|
||||
def __upload_file(self, up_path, file_, dirpath):
|
||||
if LEECH_FILENAME_PERFIX := config_dict['LEECH_FILENAME_PERFIX']:
|
||||
cap_mono = f"{LEECH_FILENAME_PERFIX} <code>{file_}</code>"
|
||||
file_ = f"{LEECH_FILENAME_PERFIX} {file_}"
|
||||
if LEECH_FILENAME_PREFIX := config_dict['LEECH_FILENAME_PREFIX']:
|
||||
cap_mono = f"{LEECH_FILENAME_PREFIX} <code>{file_}</code>"
|
||||
file_ = f"{LEECH_FILENAME_PREFIX} {file_}"
|
||||
new_path = ospath.join(dirpath, file_)
|
||||
osrename(up_path, new_path)
|
||||
up_path = new_path
|
||||
|
@ -4,7 +4,6 @@ from time import time, sleep
|
||||
from os import remove, rename, path as ospath, environ
|
||||
from subprocess import run as srun, Popen
|
||||
from dotenv import load_dotenv
|
||||
from shutil import rmtree
|
||||
|
||||
from bot import config_dict, dispatcher, user_data, DATABASE_URL, MAX_SPLIT_SIZE, DRIVES_IDS, DRIVES_NAMES, INDEX_URLS, aria2, GLOBAL_EXTENSION_FILTER, status_reply_dict_lock, Interval, aria2_options, aria2c_global, IS_PREMIUM_USER, download_dict, qbit_options, get_client, LOGGER
|
||||
from bot.helper.telegram_helper.message_utils import sendMessage, sendFile, editMessage, update_all_messages
|
||||
@ -13,6 +12,7 @@ from bot.helper.telegram_helper.bot_commands import BotCommands
|
||||
from bot.helper.telegram_helper.button_build import ButtonMaker
|
||||
from bot.helper.ext_utils.bot_utils import new_thread, setInterval
|
||||
from bot.helper.ext_utils.db_handler import DbManger
|
||||
from bot.helper.ext_utils.queued_starter import start_from_queued
|
||||
from bot.modules.search import initiate_search_tools
|
||||
|
||||
START = 0
|
||||
@ -109,9 +109,9 @@ def load_config():
|
||||
if len(RSS_COMMAND) == 0:
|
||||
RSS_COMMAND = ''
|
||||
|
||||
LEECH_FILENAME_PERFIX = environ.get('LEECH_FILENAME_PERFIX', '')
|
||||
if len(LEECH_FILENAME_PERFIX) == 0:
|
||||
LEECH_FILENAME_PERFIX = ''
|
||||
LEECH_FILENAME_PREFIX = environ.get('LEECH_FILENAME_PREFIX', '')
|
||||
if len(LEECH_FILENAME_PREFIX) == 0:
|
||||
LEECH_FILENAME_PREFIX = ''
|
||||
|
||||
SEARCH_PLUGINS = environ.get('SEARCH_PLUGINS', '')
|
||||
if len(SEARCH_PLUGINS) == 0:
|
||||
@ -193,6 +193,15 @@ def load_config():
|
||||
DbManger().update_aria2('bt-stop-timeout', TORRENT_TIMEOUT)
|
||||
TORRENT_TIMEOUT = int(TORRENT_TIMEOUT)
|
||||
|
||||
QUEUE_ALL = environ.get('QUEUE_ALL', '')
|
||||
QUEUE_ALL = '' if len(QUEUE_ALL) == 0 else int(QUEUE_ALL)
|
||||
|
||||
QUEUE_DOWNLOAD = environ.get('QUEUE_DOWNLOAD', '')
|
||||
QUEUE_DOWNLOAD = '' if len(QUEUE_DOWNLOAD) == 0 else int(QUEUE_DOWNLOAD)
|
||||
|
||||
QUEUE_UPLOAD = environ.get('QUEUE_UPLOAD', '')
|
||||
QUEUE_UPLOAD = '' if len(QUEUE_UPLOAD) == 0 else int(QUEUE_UPLOAD)
|
||||
|
||||
INCOMPLETE_TASK_NOTIFIER = environ.get('INCOMPLETE_TASK_NOTIFIER', '')
|
||||
INCOMPLETE_TASK_NOTIFIER = INCOMPLETE_TASK_NOTIFIER.lower() == 'true'
|
||||
if not INCOMPLETE_TASK_NOTIFIER and DATABASE_URL:
|
||||
@ -279,12 +288,15 @@ def load_config():
|
||||
'INCOMPLETE_TASK_NOTIFIER': INCOMPLETE_TASK_NOTIFIER,
|
||||
'INDEX_URL': INDEX_URL,
|
||||
'IS_TEAM_DRIVE': IS_TEAM_DRIVE,
|
||||
'LEECH_FILENAME_PERFIX': LEECH_FILENAME_PERFIX,
|
||||
'LEECH_FILENAME_PREFIX': LEECH_FILENAME_PREFIX,
|
||||
'LEECH_SPLIT_SIZE': LEECH_SPLIT_SIZE,
|
||||
'MEGA_API_KEY': MEGA_API_KEY,
|
||||
'MEGA_EMAIL_ID': MEGA_EMAIL_ID,
|
||||
'MEGA_PASSWORD': MEGA_PASSWORD,
|
||||
'OWNER_ID': OWNER_ID,
|
||||
'QUEUE_ALL': QUEUE_ALL,
|
||||
'QUEUE_DOWNLOAD': QUEUE_DOWNLOAD,
|
||||
'QUEUE_UPLOAD': QUEUE_UPLOAD,
|
||||
'RSS_USER_SESSION_STRING': RSS_USER_SESSION_STRING,
|
||||
'RSS_CHAT_ID': RSS_CHAT_ID,
|
||||
'RSS_COMMAND': RSS_COMMAND,
|
||||
@ -311,6 +323,7 @@ def load_config():
|
||||
|
||||
if DATABASE_URL:
|
||||
DbManger().update_config(config_dict)
|
||||
start_from_queued()
|
||||
|
||||
def get_buttons(key=None, edit_type=None):
|
||||
buttons = ButtonMaker()
|
||||
@ -332,7 +345,7 @@ def get_buttons(key=None, edit_type=None):
|
||||
buttons.sbutton('Close', "botset close")
|
||||
for x in range(0, len(config_dict)-1, 10):
|
||||
buttons.sbutton(int(x/10), f"botset start var {x}", position='footer')
|
||||
msg = f'Bot Variables. Page: {int(START/10)}. State: {STATE}'
|
||||
msg = f'Config Variables | Page: {int(START/10)} | State: {STATE}'
|
||||
elif key == 'private':
|
||||
buttons.sbutton('Back', "botset back")
|
||||
buttons.sbutton('Close', "botset close")
|
||||
@ -350,7 +363,7 @@ def get_buttons(key=None, edit_type=None):
|
||||
buttons.sbutton('Close', "botset close")
|
||||
for x in range(0, len(aria2_options)-1, 10):
|
||||
buttons.sbutton(int(x/10), f"botset start aria {x}", position='footer')
|
||||
msg = f'Aria2c Options. Page: {int(START/10)}. State: {STATE}'
|
||||
msg = f'Aria2c Options | Page: {int(START/10)} | State: {STATE}'
|
||||
elif key == 'qbit':
|
||||
for k in list(qbit_options.keys())[START:10+START]:
|
||||
buttons.sbutton(k, f"botset editqbit {k}")
|
||||
@ -362,7 +375,7 @@ def get_buttons(key=None, edit_type=None):
|
||||
buttons.sbutton('Close', "botset close")
|
||||
for x in range(0, len(qbit_options)-1, 10):
|
||||
buttons.sbutton(int(x/10), f"botset start qbit {x}", position='footer')
|
||||
msg = f'Qbittorrent Options. Page: {int(START/10)}. State: {STATE}'
|
||||
msg = f'Qbittorrent Options | Page: {int(START/10)} | State: {STATE}'
|
||||
elif edit_type == 'editvar':
|
||||
buttons.sbutton('Back', "botset back var")
|
||||
if key not in ['TELEGRAM_HASH', 'TELEGRAM_API', 'OWNER_ID', 'BOT_TOKEN']:
|
||||
@ -455,6 +468,8 @@ def edit_variable(update, context, omsg, key):
|
||||
update.message.delete()
|
||||
if DATABASE_URL:
|
||||
DbManger().update_config({key: value})
|
||||
if key in ['QUEUE_ALL', 'QUEUE_DOWNLOAD', 'QUEUE_UPLOAD']:
|
||||
start_from_queued()
|
||||
|
||||
def edit_aria(update, context, omsg, key):
|
||||
handler_dict[omsg.chat.id] = False
|
||||
@ -506,15 +521,15 @@ def update_private_file(update, context, omsg):
|
||||
if not message.document and message.text:
|
||||
file_name = message.text
|
||||
fn = file_name.rsplit('.zip', 1)[0]
|
||||
if ospath.exists(fn):
|
||||
if fn == 'accounts':
|
||||
rmtree(fn)
|
||||
config_dict['USE_SERVICE_ACCOUNTS'] = False
|
||||
if DATABASE_URL:
|
||||
DbManger().update_config({'USE_SERVICE_ACCOUNTS': False})
|
||||
else:
|
||||
remove(fn)
|
||||
if file_name in ['.netrc', 'netrc']:
|
||||
if ospath.isfile(fn):
|
||||
remove(fn)
|
||||
if fn == 'accounts':
|
||||
if ospath.exists('accounts'):
|
||||
srun(["rm", "-rf", "accounts"])
|
||||
config_dict['USE_SERVICE_ACCOUNTS'] = False
|
||||
if DATABASE_URL:
|
||||
DbManger().update_config({'USE_SERVICE_ACCOUNTS': False})
|
||||
elif file_name in ['.netrc', 'netrc']:
|
||||
srun(["touch", ".netrc"])
|
||||
srun(["cp", ".netrc", "/root/.netrc"])
|
||||
srun(["chmod", "600", ".netrc"])
|
||||
@ -587,6 +602,8 @@ def edit_bot_settings(update, context):
|
||||
query.answer()
|
||||
handler_dict[message.chat.id] = False
|
||||
key = data[2] if len(data) == 3 else None
|
||||
if key is None:
|
||||
globals()['START'] = 0
|
||||
update_buttons(message, key)
|
||||
elif data[1] in ['var', 'aria', 'qbit']:
|
||||
query.answer()
|
||||
@ -637,6 +654,8 @@ def edit_bot_settings(update, context):
|
||||
update_buttons(message, 'var')
|
||||
if DATABASE_URL:
|
||||
DbManger().update_config({data[2]: value})
|
||||
if data[2] in ['QUEUE_ALL', 'QUEUE_DOWNLOAD', 'QUEUE_UPLOAD']:
|
||||
start_from_queued()
|
||||
elif data[1] == 'resetaria':
|
||||
handler_dict[message.chat.id] = False
|
||||
aria2_defaults = aria2.client.get_global_option()
|
||||
@ -817,6 +836,7 @@ def edit_bot_settings(update, context):
|
||||
|
||||
def bot_settings(update, context):
|
||||
msg, button = get_buttons()
|
||||
globals()['START'] = 0
|
||||
sendMessage(msg, context.bot, update.message, button)
|
||||
|
||||
|
||||
|
@ -62,7 +62,8 @@ def cancell_all_buttons(update, context):
|
||||
buttons.sbutton("Cloning", f"canall {MirrorStatus.STATUS_CLONING}")
|
||||
buttons.sbutton("Extracting", f"canall {MirrorStatus.STATUS_EXTRACTING}")
|
||||
buttons.sbutton("Archiving", f"canall {MirrorStatus.STATUS_ARCHIVING}")
|
||||
buttons.sbutton("Queued", f"canall {MirrorStatus.STATUS_WAITING}")
|
||||
buttons.sbutton("QueuedDl", f"canall {MirrorStatus.STATUS_QUEUEDL}")
|
||||
buttons.sbutton("QueuedUp", f"canall {MirrorStatus.STATUS_QUEUEUP}")
|
||||
buttons.sbutton("Paused", f"canall {MirrorStatus.STATUS_PAUSED}")
|
||||
buttons.sbutton("All", "canall all")
|
||||
buttons.sbutton("Close", "canall close")
|
||||
|
@ -85,7 +85,7 @@ def _clone(message, bot):
|
||||
sendMessage(result + cc, bot, message, button)
|
||||
LOGGER.info(f'Cloning Done: {name}')
|
||||
else:
|
||||
sendMessage("Send Gdrive link along with command or by replying to the link by command\n\n<b>Multi links only by replying to first link/file:</b>\n<code>/cmd</code> 10(number of links/files)", bot, message)
|
||||
sendMessage("Send Gdrive link along with command or by replying to the link by command\n\n<b>Multi links only by replying to first link:</b>\n<code>/cmd</code> 10(number of links)", bot, message)
|
||||
|
||||
@new_thread
|
||||
def cloneNode(update, context):
|
||||
|
@ -5,14 +5,16 @@ from os import path as ospath, remove as osremove, listdir, walk
|
||||
from subprocess import Popen
|
||||
from html import escape
|
||||
|
||||
from bot import Interval, aria2, DOWNLOAD_DIR, download_dict, download_dict_lock, LOGGER, DATABASE_URL, MAX_SPLIT_SIZE, config_dict, status_reply_dict_lock, user_data
|
||||
from bot import Interval, aria2, DOWNLOAD_DIR, download_dict, download_dict_lock, LOGGER, DATABASE_URL, MAX_SPLIT_SIZE, config_dict, status_reply_dict_lock, user_data, non_queued_up, non_queued_dl, queued_up, queued_dl, queue_dict_lock
|
||||
from bot.helper.ext_utils.fs_utils import get_base_name, get_path_size, split_file, clean_download, clean_target
|
||||
from bot.helper.ext_utils.exceptions import NotSupportedExtractionArchive
|
||||
from bot.helper.ext_utils.queued_starter import start_from_queued
|
||||
from bot.helper.mirror_utils.status_utils.extract_status import ExtractStatus
|
||||
from bot.helper.mirror_utils.status_utils.zip_status import ZipStatus
|
||||
from bot.helper.mirror_utils.status_utils.split_status import SplitStatus
|
||||
from bot.helper.mirror_utils.status_utils.upload_status import UploadStatus
|
||||
from bot.helper.mirror_utils.status_utils.tg_upload_status import TgUploadStatus
|
||||
from bot.helper.mirror_utils.status_utils.queue_status import QueueStatus
|
||||
from bot.helper.mirror_utils.upload_utils.gdriveTools import GoogleDriveHelper
|
||||
from bot.helper.mirror_utils.upload_utils.pyrogramEngine import TgUploader
|
||||
from bot.helper.telegram_helper.message_utils import sendMessage, delete_all_messages, update_all_messages
|
||||
@ -37,6 +39,7 @@ class MirrorLeechListener:
|
||||
self.select = select
|
||||
self.isPrivate = message.chat.type in ['private', 'group']
|
||||
self.suproc = None
|
||||
self.queuedUp = False
|
||||
|
||||
def clean(self):
|
||||
try:
|
||||
@ -53,7 +56,6 @@ class MirrorLeechListener:
|
||||
DbManger().add_incomplete_task(self.message.chat.id, self.message.link, self.tag)
|
||||
|
||||
def onDownloadComplete(self):
|
||||
user_dict = user_data.get(self.message.from_user.id, False)
|
||||
with download_dict_lock:
|
||||
download = download_dict[self.uid]
|
||||
name = str(download.name()).replace('/', '')
|
||||
@ -63,6 +65,11 @@ class MirrorLeechListener:
|
||||
name = listdir(self.dir)[-1]
|
||||
m_path = f'{self.dir}/{name}'
|
||||
size = get_path_size(m_path)
|
||||
with queue_dict_lock:
|
||||
if self.uid in non_queued_dl:
|
||||
non_queued_dl.remove(self.uid)
|
||||
start_from_queued()
|
||||
user_dict = user_data.get(self.message.from_user.id, False)
|
||||
if self.isZip:
|
||||
if self.seed and self.isLeech:
|
||||
self.newDir = f"{self.dir}10000"
|
||||
@ -193,6 +200,31 @@ class MirrorLeechListener:
|
||||
m_size.append(f_size)
|
||||
o_files.append(file_)
|
||||
|
||||
up_limit = config_dict['QUEUE_UPLOAD']
|
||||
all_limit = config_dict['QUEUE_ALL']
|
||||
added_to_queue = True
|
||||
with queue_dict_lock:
|
||||
dl = len(non_queued_dl)
|
||||
up = len(non_queued_up)
|
||||
added_to_queue = False
|
||||
if (all_limit and dl + up >= all_limit and (not up_limit or up >= up_limit)) or (up_limit and up >= up_limit):
|
||||
LOGGER.info(f"Added to Queue/Upload: {name}")
|
||||
queued_up[self.uid] = [self]
|
||||
if added_to_queue:
|
||||
with download_dict_lock:
|
||||
download_dict[self.uid] = QueueStatus(name, size, gid, self, 'Up')
|
||||
self.queuedUp = True
|
||||
while self.queuedUp:
|
||||
sleep(1)
|
||||
continue
|
||||
with download_dict_lock:
|
||||
if self.uid not in download_dict.keys():
|
||||
return
|
||||
LOGGER.info(f'Start from Queued/Upload: {name}')
|
||||
with queue_dict_lock:
|
||||
non_queued_up.add(self.uid)
|
||||
|
||||
if self.isLeech:
|
||||
size = get_path_size(up_dir)
|
||||
for s in m_size:
|
||||
size = size - s
|
||||
@ -238,6 +270,9 @@ class MirrorLeechListener:
|
||||
if self.seed:
|
||||
if self.newDir:
|
||||
clean_target(self.newDir)
|
||||
with queue_dict_lock:
|
||||
if self.uid in non_queued_up:
|
||||
non_queued_up.remove(self.uid)
|
||||
return
|
||||
else:
|
||||
msg += f'\n\n<b>Type: </b>{typ}'
|
||||
@ -265,29 +300,34 @@ class MirrorLeechListener:
|
||||
clean_target(f"{self.dir}/{name}")
|
||||
elif self.newDir:
|
||||
clean_target(self.newDir)
|
||||
with queue_dict_lock:
|
||||
if self.uid in non_queued_up:
|
||||
non_queued_up.remove(self.uid)
|
||||
return
|
||||
clean_download(self.dir)
|
||||
with download_dict_lock:
|
||||
try:
|
||||
if self.uid in download_dict.keys():
|
||||
del download_dict[self.uid]
|
||||
except Exception as e:
|
||||
LOGGER.error(str(e))
|
||||
count = len(download_dict)
|
||||
if count == 0:
|
||||
self.clean()
|
||||
else:
|
||||
update_all_messages()
|
||||
|
||||
with queue_dict_lock:
|
||||
if self.uid in non_queued_up:
|
||||
non_queued_up.remove(self.uid)
|
||||
|
||||
start_from_queued()
|
||||
|
||||
def onDownloadError(self, error):
|
||||
error = error.replace('<', ' ').replace('>', ' ')
|
||||
clean_download(self.dir)
|
||||
if self.newDir:
|
||||
clean_download(self.newDir)
|
||||
with download_dict_lock:
|
||||
try:
|
||||
if self.uid in download_dict.keys():
|
||||
del download_dict[self.uid]
|
||||
except Exception as e:
|
||||
LOGGER.error(str(e))
|
||||
count = len(download_dict)
|
||||
msg = f"{self.tag} your download has been stopped due to: {error}"
|
||||
sendMessage(msg, self.bot, self.message)
|
||||
@ -299,16 +339,27 @@ class MirrorLeechListener:
|
||||
if not self.isPrivate and config_dict['INCOMPLETE_TASK_NOTIFIER'] and DATABASE_URL:
|
||||
DbManger().rm_complete_task(self.message.link)
|
||||
|
||||
with queue_dict_lock:
|
||||
if self.uid in queued_dl:
|
||||
del queued_dl[self.uid]
|
||||
if self.uid in non_queued_dl:
|
||||
non_queued_dl.remove(self.uid)
|
||||
if self.uid in queued_up:
|
||||
del queued_up[self.uid]
|
||||
if self.uid in non_queued_up:
|
||||
non_queued_up.remove(self.uid)
|
||||
|
||||
self.queuedUp = False
|
||||
start_from_queued()
|
||||
|
||||
def onUploadError(self, error):
|
||||
e_str = error.replace('<', '').replace('>', '')
|
||||
clean_download(self.dir)
|
||||
if self.newDir:
|
||||
clean_download(self.newDir)
|
||||
with download_dict_lock:
|
||||
try:
|
||||
if self.uid in download_dict.keys():
|
||||
del download_dict[self.uid]
|
||||
except Exception as e:
|
||||
LOGGER.error(str(e))
|
||||
count = len(download_dict)
|
||||
sendMessage(f"{self.tag} {e_str}", self.bot, self.message)
|
||||
if count == 0:
|
||||
@ -318,3 +369,12 @@ class MirrorLeechListener:
|
||||
|
||||
if not self.isPrivate and config_dict['INCOMPLETE_TASK_NOTIFIER'] and DATABASE_URL:
|
||||
DbManger().rm_complete_task(self.message.link)
|
||||
|
||||
with queue_dict_lock:
|
||||
if self.uid in queued_up:
|
||||
del queued_up[self.uid]
|
||||
if self.uid in non_queued_up:
|
||||
non_queued_up.remove(self.uid)
|
||||
|
||||
self.queuedUp = False
|
||||
start_from_queued()
|
||||
|
@ -49,6 +49,7 @@ def get_user_settings(from_user):
|
||||
|
||||
buttons.sbutton("Thumbnail", f"userset {user_id} sthumb")
|
||||
thumbmsg = "Exists" if ospath.exists(thumbpath) else "Not Exists"
|
||||
|
||||
buttons.sbutton("Close", f"userset {user_id} close")
|
||||
text = f"<u>Settings for <a href='tg://user?id={user_id}'>{name}</a></u>\n"\
|
||||
f"Leech Type is <b>{ltype}</b>\n"\
|
||||
|
@ -3,7 +3,7 @@ from telegram.ext import CommandHandler, CallbackQueryHandler
|
||||
from time import sleep
|
||||
from re import split as re_split
|
||||
|
||||
from bot import DOWNLOAD_DIR, dispatcher, config_dict, user_data
|
||||
from bot import DOWNLOAD_DIR, dispatcher, config_dict, user_data, LOGGER
|
||||
from bot.helper.telegram_helper.message_utils import sendMessage, editMessage
|
||||
from bot.helper.telegram_helper.button_build import ButtonMaker
|
||||
from bot.helper.ext_utils.bot_utils import get_readable_file_size, is_url
|
||||
@ -128,6 +128,7 @@ Check all yt-dlp api options from this <a href='https://github.com/yt-dlp/yt-dlp
|
||||
qual = config_dict['YT_DLP_QUALITY']
|
||||
if qual:
|
||||
playlist = 'entries' in result
|
||||
LOGGER.info(f"Downloading with YT-DLP: {link}")
|
||||
Thread(target=ydl.add_download, args=(link, f'{DOWNLOAD_DIR}{msg_id}', name, qual, playlist, opt)).start()
|
||||
else:
|
||||
buttons = ButtonMaker()
|
||||
@ -136,11 +137,11 @@ Check all yt-dlp api options from this <a href='https://github.com/yt-dlp/yt-dlp
|
||||
formats_dict = {}
|
||||
if 'entries' in result:
|
||||
for i in ['144', '240', '360', '480', '720', '1080', '1440', '2160']:
|
||||
video_format = f"bv*[height<={i}][ext=mp4]+ba[ext=m4a]/b[height<={i}]"
|
||||
video_format = f"bv*[height<=?{i}][ext=mp4]+ba[ext=m4a]/b[height<=?{i}]"
|
||||
b_data = f"{i}|mp4"
|
||||
formats_dict[b_data] = video_format
|
||||
buttons.sbutton(f"{i}-mp4", f"qu {msg_id} {b_data} t")
|
||||
video_format = f"bv*[height<={i}][ext=webm]+ba/b[height<={i}]"
|
||||
video_format = f"bv*[height<=?{i}][ext=webm]+ba/b[height<=?{i}]"
|
||||
b_data = f"{i}|webm"
|
||||
formats_dict[b_data] = video_format
|
||||
buttons.sbutton(f"{i}-webm", f"qu {msg_id} {b_data} t")
|
||||
@ -153,6 +154,7 @@ Check all yt-dlp api options from this <a href='https://github.com/yt-dlp/yt-dlp
|
||||
bmsg = sendMessage('Choose Playlist Videos Quality:', bot, message, YTBUTTONS)
|
||||
else:
|
||||
formats = result.get('formats')
|
||||
is_m4a = False
|
||||
if formats is not None:
|
||||
for frmt in formats:
|
||||
if frmt.get('tbr'):
|
||||
@ -166,18 +168,21 @@ Check all yt-dlp api options from this <a href='https://github.com/yt-dlp/yt-dlp
|
||||
else:
|
||||
size = 0
|
||||
|
||||
if frmt.get('height'):
|
||||
if frmt.get('video_ext') == 'none' and frmt.get('acodec') != 'none':
|
||||
if frmt.get('audio_ext') == 'm4a':
|
||||
is_m4a = True
|
||||
b_name = f"{frmt['acodec']}-{frmt['ext']}"
|
||||
v_format = f"ba[format_id={format_id}]"
|
||||
elif frmt.get('height'):
|
||||
height = frmt['height']
|
||||
ext = frmt['ext']
|
||||
fps = frmt['fps'] if frmt.get('fps') else ''
|
||||
b_name = f"{height}p{fps}-{ext}"
|
||||
if ext == 'mp4':
|
||||
v_format = f"bv*[format_id={format_id}]+ba[ext=m4a]/b[height={height}]"
|
||||
ba_ext = '[ext=m4a]' if is_m4a else ''
|
||||
v_format = f"bv*[format_id={format_id}]+ba{ba_ext}/b[height=?{height}]"
|
||||
else:
|
||||
v_format = f"bv*[format_id={format_id}]+ba/b[height={height}]"
|
||||
elif frmt.get('video_ext') == 'none' and frmt.get('acodec') != 'none':
|
||||
b_name = f"{frmt['acodec']}-{frmt['ext']}"
|
||||
v_format = f"ba[format_id={format_id}]"
|
||||
v_format = f"bv*[format_id={format_id}]+ba/b[height=?{height}]"
|
||||
else:
|
||||
continue
|
||||
|
||||
@ -289,6 +294,7 @@ def select_format(update, context):
|
||||
b_name, tbr = qual.split('|')
|
||||
qual = task_info[6][b_name][tbr][1]
|
||||
ydl = YoutubeDLHelper(listener)
|
||||
LOGGER.info(f"Downloading with YT-DLP: {link}")
|
||||
Thread(target=ydl.add_download, args=(link, f'{DOWNLOAD_DIR}{task_id}', name, qual, playlist, opt)).start()
|
||||
query.message.delete()
|
||||
del listener_dict[task_id]
|
||||
|
Loading…
Reference in New Issue
Block a user