diff --git a/telegram_downloader_bot/main.py b/telegram_downloader_bot/main.py index 4329d25..7eec879 100644 --- a/telegram_downloader_bot/main.py +++ b/telegram_downloader_bot/main.py @@ -5,6 +5,7 @@ import re import sentry_sdk import uvloop +from functools import wraps from pyrogram import Client, filters from pyrogram.types import Message from tiktok_downloader import snaptik @@ -15,11 +16,15 @@ API_ID = os.getenv("API_ID") # Your API ID from my.telegram.org API_HASH = os.getenv("API_HASH") # Your API Hash from my.telegram.org BOT_TOKEN = os.getenv("BOT_TOKEN") # Your bot token from BotFather STORAGE = os.getenv("STORAGE") # Storage directory for downloads -MY_MSG_ID = int(os.getenv("MY_MSG_ID")) # Your message ID for authorization LOG_LEVEL = os.getenv("LOG_LEVEL") # Log level +# Your message ID for authorization separated by commas +msg_ids = os.getenv("MSG_IDS") +ALLOWED_IDS = set([ int(i) for i in msg_ids.split(",") ]) # Convert to set + log = logger.configure_logger(LOG_LEVEL) + uvloop.install() sentry_sdk.init( @@ -29,84 +34,13 @@ sentry_sdk.init( enable_tracing=True ) -app = Client("downloader_bot", api_id=API_ID, - api_hash=API_HASH, bot_token=BOT_TOKEN) +app = Client("downloader_bot", + api_id=API_ID, + api_hash=API_HASH, + bot_token=BOT_TOKEN) -def download_tt_video(url: str) -> bool: - videos = snaptik(url) - now = datetime.datetime.now() - max_tries = 5 - - log.debug(f"Downloading video from {url}...") - - for video in videos: - video_filename = now.strftime("video-tiktok-%Y-%m-%d_%H-%M-%S.mp4") - video_filepath: os.path = os.path.join( - STORAGE, "tiktok", video_filename) - - for i in range(max_tries): - video_content = video.download().getbuffer() - is_valid_mp4 = integv.verify( - video_content.tobytes(), file_type="mp4") - log.debug( - f"Attempt {i+1}/{max_tries} to " - "download video, video valid: {is_valid_mp4}" - ) - if is_valid_mp4: - break - - if not is_valid_mp4: - log.error("Downloaded video is not a valid mp4 file") - return False - - with open(video_filepath, "wb") as f: - f.write(video_content) - log.debug("Video saved successfully") - - return True - - -@app.on_message(filters.command("start")) -async def start_handler(client, message: Message): - await message.reply_text( - "This bot downloads TikTok videos " "to my personal server" - ) - - -@app.on_message(filters.command("help")) -async def help_handler(client, message: Message): - await message.reply_text("I won't help you!") - - -@app.on_message(filters.text) -async def message_handler(client, message: Message): - if message.chat.id != MY_MSG_ID: - return await message.reply_text("Nope, not talking to you!") - - urls = re.findall(r"\bhttps?://[^\s]+", message.text) - if not urls: - return await message.reply_text( - "No links found in the message. " "Nothing to download!" - ) - - success_count = 0 - for i, url in enumerate(urls): - msg = f"Downloading video {i+1}/{len(urls)}..." - log.info(msg) - await message.reply_text(msg) - outcome = download_tt_video(url) - success_count += 1 if outcome else 0 - - await message.reply_text(f"{success_count}/{len(urls)} " - "video(s) downloaded") - - -@app.on_message(filters.media) -async def media_handler(client, message: Message): - if message.chat.id != MY_MSG_ID: - return await message.reply_text("Nope, not talking to you!") - +async def get_user_folder(message: Message) -> os.path: # Determine folder name based on whether the message was forwarded # and who it was forwarded from if message.forward_from: @@ -134,15 +68,19 @@ async def media_handler(client, message: Message): else str(user.id) ) + # Sanitize the folder name user_folder_name = "".join( c for c in user_folder_name if c.isalnum() or c in (" ", "_") - ).rstrip() # Sanitize the folder name + ).rstrip() user_folder = os.path.join(STORAGE, "telegram", user_folder_name) os.makedirs(user_folder, exist_ok=True) + return user_folder - # Reply to user that the download is starting - await message.reply_text("Downloading media...") + +async def handle_media_message_contents(client: Client, message: Message): + + user_folder = get_user_folder(message) # Handle documents if message.document: @@ -177,6 +115,81 @@ async def media_handler(client, message: Message): await message.reply_text("Unknown media type!") +def download_tt_video(url: str) -> bool: + videos = snaptik(url) + now = datetime.datetime.now() + max_tries = 5 + + log.debug(f"Downloading video from {url}...") + + for video in videos: + video_filename = now.strftime("video-tiktok-%Y-%m-%d_%H-%M-%S.mp4") + video_filepath: os.path = os.path.join( + STORAGE, "tiktok", video_filename) + + for i in range(max_tries): + video_content = video.download() + is_valid_mp4 = integv.verify( + video_content.tobytes(), file_type="mp4") + log.debug( + f"Attempt {i+1}/{max_tries} to " + "download video, video valid: {is_valid_mp4}" + ) + if is_valid_mp4: + break + + if not is_valid_mp4: + log.error("Downloaded video is not a valid mp4 file") + return False + + with open(video_filepath, "wb") as f: + f.write(video_content) + log.debug("Video saved successfully") + + return True + + +@app.on_message(filters.command("start")) +async def start_handler(_, message: Message): + await message.reply_text( + "This bot downloads TikTok videos to my personal server" + ) + + +@app.on_message(filters.command("help")) +async def help_handler(_, message: Message): + await message.reply_text("I won't help you!") + + +@app.on_message(filters.text) +async def message_handler(_, message: Message): + + urls = re.findall(r"\bhttps?://[^\s]+", message.text) + if not urls: + return await message.reply_text( + "No links found in the message. Nothing to download!" + ) + + success_count = 0 + for i, url in enumerate(urls): + msg = f"Downloading video {i+1}/{len(urls)}..." + log.info(msg) + await message.reply_text(msg) + outcome = download_tt_video(url) + success_count += 1 if outcome else 0 + + await message.reply_text(f"{success_count}/{len(urls)} " + "video(s) downloaded") + + +@app.on_message(filters.media) +async def media_handler(client, message: Message): + + await message.reply_text("Downloading media...") + + handle_media_message_contents(client, message) + + if __name__ == "__main__": os.makedirs(os.path.join(STORAGE, "tiktok"), exist_ok=True) os.makedirs(os.path.join(STORAGE, "telegram"), exist_ok=True)