Added Aria2c and qbittorrent tasks to bot queue system

- VIEW_LINK var removed it's always enabled now for media only

Signed-off-by: anasty17 <e.anastayyar@gmail.com>
This commit is contained in:
anasty17 2023-04-23 20:55:44 +03:00
parent 2723b9740d
commit 770c15990d
13 changed files with 134 additions and 60 deletions

View File

@ -120,7 +120,7 @@ In each single file there is a major change from base code, it's almost totaly d
- Custom name for all links except torrents. For files you should add extension except yt-dlp links
- Extensions Filter for the files to be uploaded/cloned
- View Link button. Extra button to open index link in broswer instead of direct download for file
- Queueing System
- Queueing System for all tasks
- Ability to zip/unzip multi links in same directory. Mostly helpful in unziping tg file parts
- Almost all repository functions have been improved and many other details can't mention all of them
- Many bugs have been fixed
@ -276,14 +276,10 @@ Fill up rest of the fields. Meaning of each field is discussed below. **NOTE**:
### Queue System
- `QUEUE_ALL`: Number of parallel tasks of downloads from (mega, telegram, yt-dlp, gdrive) + all uploads. For example if 20 task added and `QUEUE_ALL` is `8`, then the summation of uploading and downloading tasks are 8 and the rest in queue. `Int`. **NOTE**: if you want to fill `QUEUE_DOWNLOAD` or `QUEUE_UPLOAD`, then `QUEUE_ALL` value must be greater than or equal to the greatest one and less than or equal to summation of `QUEUE_UPLOAD` and `QUEUE_DOWNLOAD`.
- `QUEUE_DOWNLOAD`: Number of parallel downloading tasks from mega, telegram, yt-dlp and gdrive. `Int`
- `QUEUE_ALL`: Number of parallel tasks of downloads and uploads. For example if 20 task added and `QUEUE_ALL` is `8`, then the summation of uploading and downloading tasks are 8 and the rest in queue. `Int`. **NOTE**: if you want to fill `QUEUE_DOWNLOAD` or `QUEUE_UPLOAD`, then `QUEUE_ALL` value must be greater than or equal to the greatest one and less than or equal to summation of `QUEUE_UPLOAD` and `QUEUE_DOWNLOAD`.
- `QUEUE_DOWNLOAD`: Number of all parallel downloading tasks. `Int`
- `QUEUE_UPLOAD`: Number of all parallel uploading tasks. `Int`
### Buttons
- `VIEW_LINK`: View Link button to open file Index Link in browser instead of direct download link, you can figure out if it's compatible with your Index code or not, open any video from you Index and check if its URL ends with `?a=view`. Compatible with [BhadooIndex](https://gitlab.com/ParveenBhadooOfficial/Google-Drive-Index) Code. Default is `False`. `Bool`
### Torrent Search
- `SEARCH_API_LINK`: Search api app link. Get your api from deploying this [repository](https://github.com/Ryuk-me/Torrent-Api-py). `Str`

View File

@ -264,9 +264,6 @@ INCOMPLETE_TASK_NOTIFIER = INCOMPLETE_TASK_NOTIFIER.lower() == 'true'
STOP_DUPLICATE = environ.get('STOP_DUPLICATE', '')
STOP_DUPLICATE = STOP_DUPLICATE.lower() == 'true'
VIEW_LINK = environ.get('VIEW_LINK', '')
VIEW_LINK = VIEW_LINK.lower() == 'true'
IS_TEAM_DRIVE = environ.get('IS_TEAM_DRIVE', '')
IS_TEAM_DRIVE = IS_TEAM_DRIVE.lower() == 'true'
@ -366,7 +363,6 @@ config_dict = {'AS_DOCUMENT': AS_DOCUMENT,
'UPTOBOX_TOKEN': UPTOBOX_TOKEN,
'USER_SESSION_STRING': USER_SESSION_STRING,
'USE_SERVICE_ACCOUNTS': USE_SERVICE_ACCOUNTS,
'VIEW_LINK': VIEW_LINK,
'WEB_PINCODE': WEB_PINCODE,
'YT_DLP_OPTIONS': YT_DLP_OPTIONS}

View File

@ -75,7 +75,8 @@ async def __onDownloadComplete(api, gid):
if dl := await getDownloadByGid(new_gid):
listener = dl.listener()
if config_dict['BASE_URL'] and listener.select:
await sync_to_async(api.client.force_pause, new_gid)
if not dl.queued:
await sync_to_async(api.client.force_pause, new_gid)
SBUTTONS = bt_selection_buttons(new_gid)
msg = "Your download paused. Choose files then press Done Selecting button to start downloading."
await sendMessage(listener.message, msg, SBUTTONS)

View File

@ -354,7 +354,7 @@ class MirrorLeechListener:
buttons.ubutton("⚡ Index Link", share_url)
else:
buttons.ubutton("⚡ Index Link", share_url)
if config_dict['VIEW_LINK']:
if mime_type.startswith(('image', 'video', 'audio')):
share_urls = f'{INDEX_URL}/{url_path}?a=view'
buttons.ubutton("🌐 View Link", share_urls)
button = buttons.build_menu(2)

View File

@ -1,10 +1,11 @@
#!/usr/bin/env python3
from aiofiles.os import remove as aioremove, path as aiopath
from bot import aria2, download_dict_lock, download_dict, LOGGER, config_dict, aria2_options, aria2c_global
from bot import aria2, download_dict_lock, download_dict, LOGGER, config_dict, aria2_options, aria2c_global, non_queued_dl, queue_dict_lock
from bot.helper.ext_utils.bot_utils import bt_selection_buttons, sync_to_async
from bot.helper.mirror_utils.status_utils.aria2_status import Aria2Status
from bot.helper.telegram_helper.message_utils import sendStatusMessage, sendMessage
from bot.helper.ext_utils.task_manager import is_queued
async def add_aria2c_download(link, path, listener, filename, auth, ratio, seed_time):
@ -21,6 +22,12 @@ async def add_aria2c_download(link, path, listener, filename, auth, ratio, seed_
a2c_opt['seed-time'] = seed_time
if TORRENT_TIMEOUT := config_dict['TORRENT_TIMEOUT']:
a2c_opt['bt-stop-timeout'] = f'{TORRENT_TIMEOUT}'
added_to_queue, event = await is_queued(listener.uid)
if added_to_queue:
if link.startswith('magnet:'):
a2c_opt['pause-metadata'] = 'true'
else:
a2c_opt['pause'] = 'true'
try:
download = (await sync_to_async(aria2.add, link, a2c_opt))[0]
except Exception as e:
@ -34,15 +41,44 @@ async def add_aria2c_download(link, path, listener, filename, auth, ratio, seed_
LOGGER.info(f"Aria2c Download Error: {error}")
await sendMessage(listener.message, error)
return
gid = download.gid
name = download.name
async with download_dict_lock:
download_dict[listener.uid] = Aria2Status(gid, listener)
LOGGER.info(f"Aria2Download started: {gid}")
download_dict[listener.uid] = Aria2Status(
gid, listener, queued=added_to_queue)
if added_to_queue:
LOGGER.info(f"Added to Queue/Download: {name}. Gid: {gid}")
if not listener.select or not download.is_torrent:
await sendStatusMessage(listener.message)
else:
async with queue_dict_lock:
non_queued_dl.add(listener.uid)
LOGGER.info(f"Aria2Download started: {name}. Gid: {gid}")
await listener.onDownloadStart()
if not listener.select or not config_dict['BASE_URL']:
if not added_to_queue and (not listener.select or not config_dict['BASE_URL']):
await sendStatusMessage(listener.message)
elif download.is_torrent and not download.is_metadata:
await sync_to_async(aria2.client.force_pause, gid)
elif listener.select and download.is_torrent and not download.is_metadata:
if not added_to_queue:
await sync_to_async(aria2.client.force_pause, gid)
SBUTTONS = bt_selection_buttons(gid)
msg = "Your download paused. Choose files then press Done Selecting button to start downloading."
await sendMessage(listener.message, msg, SBUTTONS)
if added_to_queue:
await event.wait()
async with download_dict_lock:
if listener.uid not in download_dict:
return
download = download_dict[listener.uid]
download.queued = False
new_gid = download.gid()
await sync_to_async(aria2.client.unpause, new_gid)
LOGGER.info(f'Start Queued Download from Aria2c: {name}. Gid: {gid}')
async with queue_dict_lock:
non_queued_dl.add(listener.uid)

View File

@ -2,11 +2,12 @@
from time import time
from aiofiles.os import remove as aioremove, path as aiopath
from bot import download_dict, download_dict_lock, get_client, LOGGER, config_dict
from bot import download_dict, download_dict_lock, get_client, LOGGER, config_dict, non_queued_dl, queue_dict_lock
from bot.helper.mirror_utils.status_utils.qbit_status import QbittorrentStatus
from bot.helper.telegram_helper.message_utils import sendMessage, deleteMessage, sendStatusMessage
from bot.helper.ext_utils.bot_utils import bt_selection_buttons, sync_to_async
from bot.helper.listeners.qbit_listener import onDownloadStart
from bot.helper.ext_utils.task_manager import is_queued
"""
@ -38,8 +39,9 @@ async def add_qb_torrent(link, path, listener, ratio, seed_time):
if await aiopath.exists(link):
url = None
tpath = link
op = await sync_to_async(client.torrents_add, url, tpath, path, tags=f'{listener.uid}', ratio_limit=ratio,
seeding_time_limit=seed_time, headers={'user-agent': 'Wget/1.12'})
added_to_queue, event = await is_queued(listener.uid)
op = await sync_to_async(client.torrents_add, url, tpath, path, is_paused=added_to_queue, tags=f'{listener.uid}',
ratio_limit=ratio, seeding_time_limit=seed_time, headers={'user-agent': 'Wget/1.12'})
if op.lower() == "ok.":
tor_info = await sync_to_async(client.torrents_info, tag=f'{listener.uid}')
if len(tor_info) == 0:
@ -56,12 +58,23 @@ async def add_qb_torrent(link, path, listener, ratio, seed_time):
else:
await sendMessage(listener.message, "This Torrent already added or unsupported/invalid link/file.")
return
async with download_dict_lock:
download_dict[listener.uid] = QbittorrentStatus(listener)
download_dict[listener.uid] = QbittorrentStatus(
listener, queued=added_to_queue)
await onDownloadStart(f'{listener.uid}')
if added_to_queue:
LOGGER.info(
f"Added to Queue/Download: {tor_info.name} - Hash: {ext_hash}")
else:
async with queue_dict_lock:
non_queued_dl.add(listener.uid)
LOGGER.info(
f"QbitDownload started: {tor_info.name} - Hash: {ext_hash}")
await listener.onDownloadStart()
LOGGER.info(
f"QbitDownload started: {tor_info.name} - Hash: {ext_hash}")
if config_dict['BASE_URL'] and listener.select:
if link.startswith('magnet:'):
metamsg = "Downloading Metadata, wait then you can select files. Use torrent file to avoid this wait."
@ -79,13 +92,30 @@ async def add_qb_torrent(link, path, listener, ratio, seed_time):
except:
await deleteMessage(meta)
return
ext_hash = tor_info.hash
await sync_to_async(client.torrents_pause, torrent_hashes=ext_hash)
if not added_to_queue:
await sync_to_async(client.torrents_pause, torrent_hashes=ext_hash)
SBUTTONS = bt_selection_buttons(ext_hash)
msg = "Your download paused. Choose files then press Done Selecting button to start downloading."
await sendMessage(listener.message, msg, SBUTTONS)
else:
await sendStatusMessage(listener.message)
if added_to_queue:
await event.wait()
async with download_dict_lock:
if listener.uid not in download_dict:
return
download_dict[listener.uid].queued = False
await sync_to_async(client.torrents_resume, torrent_hashes=ext_hash)
LOGGER.info(
f'Start Queued Download from Qbittorrent: {tor_info.name} - Hash: {ext_hash}')
async with queue_dict_lock:
non_queued_dl.add(listener.uid)
except Exception as e:
await sendMessage(listener.message, str(e))
finally:

View File

@ -247,7 +247,7 @@ class YoutubeDLHelper:
async with download_dict_lock:
if self.__listener.uid not in download_dict:
return
LOGGER.info(f'Start Queued Download with YT_DLP: {self.name}')
LOGGER.info(f'Start Queued Download from YT_DLP: {self.name}')
await self.__onDownloadStart(True)
else:
LOGGER.info(f'Download with YT_DLP: {self.name}')

View File

@ -15,10 +15,11 @@ def get_download(gid):
class Aria2Status:
def __init__(self, gid, listener, seeding=False):
def __init__(self, gid, listener, seeding=False, queued=False):
self.__gid = gid
self.__download = get_download(gid)
self.__listener = listener
self.queued = queued
self.start_time = 0
self.seeding = seeding
self.message = listener.message
@ -52,7 +53,7 @@ class Aria2Status:
def status(self):
self.__update()
if self.__download.is_waiting:
if self.__download.is_waiting or self.queued:
if self.seeding:
return MirrorStatus.STATUS_QUEUEUP
else:
@ -106,6 +107,11 @@ class Aria2Status:
downloads.append(self.__download)
await sync_to_async(aria2.remove, downloads, force=True, files=True)
else:
LOGGER.info(f"Cancelling Download: {self.name()}")
await self.__listener.onDownloadError('Download stopped by user!')
if self.queued:
LOGGER.info(f'Cancelling QueueDl: {self.name()}')
msg = 'task have been removed from queue/download'
else:
LOGGER.info(f"Cancelling Download: {self.name()}")
msg = 'Download stopped by user!'
await self.__listener.onDownloadError(msg)
await sync_to_async(aria2.remove, [self.__download], force=True, files=True)

View File

@ -16,10 +16,11 @@ def get_download(client, tag):
class QbittorrentStatus:
def __init__(self, listener, seeding=False):
def __init__(self, listener, seeding=False, queued=False):
self.__client = get_client()
self.__listener = listener
self.__info = get_download(self.__client, f'{self.__listener.uid}')
self.queued = queued
self.seeding = seeding
self.message = listener.message
@ -52,7 +53,7 @@ class QbittorrentStatus:
def status(self):
self.__update()
state = self.__info.state
if state == "queuedDL":
if state == "queuedDL" or self.queued:
return MirrorStatus.STATUS_QUEUEDL
elif state == "queuedUP":
return MirrorStatus.STATUS_QUEUEUP
@ -103,9 +104,14 @@ class QbittorrentStatus:
self.__update()
await sync_to_async(self.__client.torrents_pause, torrent_hashes=self.__info.hash)
if self.status() != MirrorStatus.STATUS_SEEDING:
LOGGER.info(f"Cancelling Download: {self.__info.name}")
if self.queued:
LOGGER.info(f'Cancelling QueueDL: {self.name()}')
msg = 'task have been removed from queue/download'
else:
LOGGER.info(f"Cancelling Download: {self.__info.name}")
msg = 'Download stopped by user!'
await sleep(0.3)
await self.__listener.onDownloadError('Download stopped by user!')
await self.__listener.onDownloadError(msg)
await sync_to_async(self.__client.torrents_delete, torrent_hashes=self.__info.hash, delete_files=True)
await sync_to_async(self.__client.torrents_delete_tags, tags=self.__info.tags)
async with qb_listener_lock:

View File

@ -83,7 +83,7 @@ class GoogleDriveHelper:
else:
LOGGER.error('token.pickle not found!')
return build('drive', 'v3', credentials=credentials, cache_discovery=False)
def __alt_authorize(self):
if not self.__alt_auth:
self.__alt_auth = True
@ -225,7 +225,7 @@ class GoogleDriveHelper:
elif self.__is_errored:
return
async_to_sync(self.__listener.onUploadComplete, link, size, self.__total_files,
self.__total_folders, mime_type, file_name)
self.__total_folders, mime_type, file_name)
def __upload_dir(self, input_directory, dest_id):
list_dirs = listdir(input_directory)
@ -400,7 +400,8 @@ class GoogleDriveHelper:
if not self.__alt_auth:
token_service = self.__alt_authorize()
if token_service is not None:
LOGGER.error('File not found. Trying with token.pickle...')
LOGGER.error(
'File not found. Trying with token.pickle...')
self.__service = token_service
return self.clone(link)
msg = "File not found."
@ -574,7 +575,7 @@ class GoogleDriveHelper:
elif mime_type == 'application/vnd.google-apps.shortcut':
furl = f"https://drive.google.com/drive/folders/{file.get('id')}"
msg += f"⁍<a href='https://drive.google.com/drive/folders/{file.get('id')}'>{file.get('name')}" \
f"</a> (shortcut)"
f"</a> (shortcut)"
else:
furl = f"https://drive.google.com/uc?id={file.get('id')}&export=download"
msg += f"📄 <code>{file.get('name')}<br>({get_readable_file_size(int(file.get('size', 0)))})</code><br>"
@ -587,7 +588,7 @@ class GoogleDriveHelper:
url_path = rquote(f'{file.get("name")}')
url = f'{index_url}/{url_path}'
msg += f' <b>| <a href="{url}">Index Link</a></b>'
if config_dict['VIEW_LINK']:
if mime_type.startswith(('image', 'video', 'audio')):
urlv = f'{index_url}/{url_path}?a=view'
msg += f' <b>| <a href="{urlv}">View Link</a></b>'
msg += '<br><br>'
@ -621,7 +622,8 @@ class GoogleDriveHelper:
if not self.__alt_auth:
token_service = self.__alt_authorize()
if token_service is not None:
LOGGER.error('File not found. Trying with token.pickle...')
LOGGER.error(
'File not found. Trying with token.pickle...')
self.__service = token_service
return self.count(link)
msg = "File not found."
@ -691,7 +693,8 @@ class GoogleDriveHelper:
if not self.__alt_auth:
token_service = self.__alt_authorize()
if token_service is not None:
LOGGER.error('File not found. Trying with token.pickle...')
LOGGER.error(
'File not found. Trying with token.pickle...')
self.__service = token_service
self.__updater.cancel()
return self.download(link)

View File

@ -221,9 +221,6 @@ async def load_config():
STOP_DUPLICATE = environ.get('STOP_DUPLICATE', '')
STOP_DUPLICATE = STOP_DUPLICATE.lower() == 'true'
VIEW_LINK = environ.get('VIEW_LINK', '')
VIEW_LINK = VIEW_LINK.lower() == 'true'
IS_TEAM_DRIVE = environ.get('IS_TEAM_DRIVE', '')
IS_TEAM_DRIVE = IS_TEAM_DRIVE.lower() == 'true'
@ -346,7 +343,6 @@ async def load_config():
'UPTOBOX_TOKEN': UPTOBOX_TOKEN,
'USER_SESSION_STRING': USER_SESSION_STRING,
'USE_SERVICE_ACCOUNTS': USE_SERVICE_ACCOUNTS,
'VIEW_LINK': VIEW_LINK,
'WEB_PINCODE': WEB_PINCODE,
'YT_DLP_OPTIONS': YT_DLP_OPTIONS})

View File

@ -48,14 +48,16 @@ async def select(client, message):
if listener.isQbit:
id_ = dl.hash()
client = dl.client()
await sync_to_async(client.torrents_pause, torrent_hashes=id_)
if not dl.queued:
await sync_to_async(client.torrents_pause, torrent_hashes=id_)
else:
id_ = dl.gid()
try:
await sync_to_async(aria2.client.force_pause, id_)
except Exception as e:
LOGGER.error(
f"{e} Error in pause, this mostly happens after abuse aria2")
if not dl.queued:
try:
await sync_to_async(aria2.client.force_pause, id_)
except Exception as e:
LOGGER.error(
f"{e} Error in pause, this mostly happens after abuse aria2")
listener.select = True
except:
await sendMessage(message, "This is not a bittorrent task!")
@ -101,7 +103,8 @@ async def get_confirm(client, query):
await aioremove(f_path)
except:
pass
await sync_to_async(client.torrents_resume, torrent_hashes=id_)
if not dl.queued:
await sync_to_async(client.torrents_resume, torrent_hashes=id_)
else:
res = await sync_to_async(aria2.client.get_files, id_)
for f in res:
@ -110,11 +113,12 @@ async def get_confirm(client, query):
await aioremove(f['path'])
except:
pass
try:
await sync_to_async(aria2.client.unpause, id_)
except Exception as e:
LOGGER.error(
f"{e} Error in resume, this mostly happens after abuse aria2. Try to use select cmd again!")
if not dl.queued:
try:
await sync_to_async(aria2.client.unpause, id_)
except Exception as e:
LOGGER.error(
f"{e} Error in resume, this mostly happens after abuse aria2. Try to use select cmd again!")
await sendStatusMessage(message)
await message.delete()

View File

@ -394,8 +394,8 @@ async def _ytdl(client, message, isZip=False, isLeech=False, sameDir={}):
user_dict = user_data.get(user_id, {})
if 'format:' in options:
qual = options['format']
elif user_dict.get('yt_ql'):
qual = user_dict['yt_ql']
elif user_dict.get('yt_opt'):
qual = user_dict['yt_opt']
if not qual:
qual = await YtSelection(client, message).get_quality(result)