import datetime import integv import os import re import sentry_sdk import uvloop from pyrogram import Client, filters from pyrogram.types import Message from tiktok_downloader import snaptik from telegram_downloader_bot import logger API_ID = os.getenv("API_ID") # Your API ID from my.telegram.org API_HASH = os.getenv("API_HASH") # Your API Hash from my.telegram.org BOT_TOKEN = os.getenv("BOT_TOKEN") # Your bot token from BotFather STORAGE = os.getenv("STORAGE") # Storage directory for downloads MY_MSG_ID = int(os.getenv("MY_MSG_ID")) # Your message ID for authorization LOG_LEVEL = os.getenv("LOG_LEVEL") # Log level log = logger.configure_logger(LOG_LEVEL) uvloop.install() sentry_sdk.init( dsn="https://12d7a075d483fc133cde0ed82e72ac45@o4508071875313664.ingest.de.sentry.io/4508075566694480", traces_sample_rate=1.0, profiles_sample_rate=1.0, enable_tracing=True ) app = Client("downloader_bot", api_id=API_ID, api_hash=API_HASH, bot_token=BOT_TOKEN) def download_tt_video(url: str) -> bool: videos = snaptik(url) now = datetime.datetime.now() max_tries = 5 log.debug(f"Downloading video from {url}...") for video in videos: video_filename = now.strftime("video-tiktok-%Y-%m-%d_%H-%M-%S.mp4") video_filepath: os.path = os.path.join( STORAGE, "tiktok", video_filename) for i in range(max_tries): video_content = video.download().getbuffer() is_valid_mp4 = integv.verify( video_content.tobytes(), file_type="mp4") log.debug( f"Attempt {i+1}/{max_tries} to " "download video, video valid: {is_valid_mp4}" ) if is_valid_mp4: break if not is_valid_mp4: log.error("Downloaded video is not a valid mp4 file") return False with open(video_filepath, "wb") as f: f.write(video_content) log.debug("Video saved successfully") return True @app.on_message(filters.command("start")) async def start_handler(client, message: Message): await message.reply_text( "This bot downloads TikTok videos " "to my personal server" ) @app.on_message(filters.command("help")) async def help_handler(client, message: Message): await message.reply_text("I won't help you!") @app.on_message(filters.text) async def message_handler(client, message: Message): if message.chat.id != MY_MSG_ID: return await message.reply_text("Nope, not talking to you!") urls = re.findall(r"\bhttps?://[^\s]+", message.text) if not urls: return await message.reply_text( "No links found in the message. " "Nothing to download!" ) success_count = 0 for i, url in enumerate(urls): msg = f"Downloading video {i+1}/{len(urls)}..." log.info(msg) await message.reply_text(msg) outcome = download_tt_video(url) success_count += 1 if outcome else 0 await message.reply_text(f"{success_count}/{len(urls)} " "video(s) downloaded") @app.on_message(filters.media) async def media_handler(client, message: Message): if message.chat.id != MY_MSG_ID: return await message.reply_text("Nope, not talking to you!") # Determine folder name based on whether the message was forwarded # and who it was forwarded from if message.forward_from: user = message.forward_from # User's first and last name for folder name, # fallback to user ID if not available user_folder_name = ( f"{user.first_name}_{user.last_name}".strip() if user.first_name and user.last_name else str(user.id) ) elif message.forward_from_chat: user = message.forward_from_chat # Use chat title for groups and channels user_folder_name = "".join( c for c in user.title if c.isalnum() or c in (" ", "_") ).rstrip() else: user = message.from_user # User's first and last name for folder name, # fallback to user ID if not available user_folder_name = ( f"{user.first_name}_{user.last_name}".strip() if user.first_name and user.last_name else str(user.id) ) user_folder_name = "".join( c for c in user_folder_name if c.isalnum() or c in (" ", "_") ).rstrip() # Sanitize the folder name user_folder = os.path.join(STORAGE, "telegram", user_folder_name) os.makedirs(user_folder, exist_ok=True) # Reply to user that the download is starting await message.reply_text("Downloading media...") # Handle documents if message.document: file_name = message.document.file_name file_path = os.path.join(user_folder, file_name) await client.download_media(message, file_path) await message.reply_text(f"Document saved to {user_folder}") # Handle single or multiple photos elif message.photo: file_name = f"photo_{message.photo.file_id}.jpg" file_path = os.path.join(user_folder, file_name) await client.download_media(message.photo, file_path) await message.reply_text(f"Photo saved to {user_folder}") # Handle videos elif message.video: file_name = f"video_{message.video.file_id}.mp4" file_path = os.path.join(user_folder, file_name) await client.download_media(message, file_path) await message.reply_text(f"Video saved to {user_folder}") # Handle GIFs elif message.animation: file_name = f"gif_{message.animation.file_id}.gif" file_path = os.path.join(user_folder, file_name) await client.download_media(message.animation, file_path) await message.reply_text(f"GIF saved to {user_folder}") # Handle unknown data types else: await message.reply_text("Unknown media type!") if __name__ == "__main__": os.makedirs(os.path.join(STORAGE, "tiktok"), exist_ok=True) os.makedirs(os.path.join(STORAGE, "telegram"), exist_ok=True) app.run()