diff --git a/telegram_downloader_bot/logger.py b/telegram_downloader_bot/logger.py index 7defa75..441298c 100644 --- a/telegram_downloader_bot/logger.py +++ b/telegram_downloader_bot/logger.py @@ -1,4 +1,7 @@ import logging +import os + +LOG_LEVEL = os.getenv("LOG_LEVEL") def configure_logger(log_level: str) -> logging.Logger: @@ -13,3 +16,6 @@ def configure_logger(log_level: str) -> logging.Logger: ) return logging.getLogger() + + +log = configure_logger(LOG_LEVEL) diff --git a/telegram_downloader_bot/main.py b/telegram_downloader_bot/main.py index cfba02f..15f4273 100644 --- a/telegram_downloader_bot/main.py +++ b/telegram_downloader_bot/main.py @@ -1,165 +1,30 @@ -import datetime -import integv import os -import re -import sentry_sdk import uvloop -from functools import wraps from pyrogram import Client, filters from pyrogram.types import Message -from tiktok_downloader import snaptik -from telegram_downloader_bot import logger +from telegram_downloader_bot.logger import log +from telegram_downloader_bot.telemetry import init_telemetry +from telegram_downloader_bot import utils, security API_ID = os.getenv("API_ID") # Your API ID from my.telegram.org API_HASH = os.getenv("API_HASH") # Your API Hash from my.telegram.org BOT_TOKEN = os.getenv("BOT_TOKEN") # Your bot token from BotFather -STORAGE = os.getenv("STORAGE") # Storage directory for downloads -LOG_LEVEL = os.getenv("LOG_LEVEL") # Log level - -# Your message ID for authorization separated by commas -msg_ids = os.getenv("MSG_IDS") -ALLOWED_IDS = set([int(i) for i in msg_ids.split(",")]) # Convert to set - -log = logger.configure_logger(LOG_LEVEL) - +STORAGE = os.getenv("STORAGE") # Your bot token from BotFather uvloop.install() - -sentry_sdk.init( - dsn="https://12d7a075d483fc133cde0ed82e72ac45@o4508071875313664.ingest.de.sentry.io/4508075566694480", # noqa: E501 - traces_sample_rate=1.0, - profiles_sample_rate=1.0, - enable_tracing=True -) +init_telemetry() app = Client("downloader_bot", api_id=API_ID, api_hash=API_HASH, - bot_token=BOT_TOKEN) - - -def protected(func): - @wraps(func) - async def wrapper(client, message): - if message.from_user.id not in ALLOWED_IDS: - return await message.reply_text("You are not on the list!") - return await func(client, message) - return wrapper - - -async def get_user_folder(message: Message) -> os.path: - # Determine folder name based on whether the message was forwarded - # and who it was forwarded from - if message.forward_from: - user = message.forward_from - # User's first and last name for folder name, - # fallback to user ID if not available - user_folder_name = ( - f"{user.first_name}_{user.last_name}".strip() - if user.first_name and user.last_name - else str(user.id) - ) - elif message.forward_from_chat: - user = message.forward_from_chat - # Use chat title for groups and channels - user_folder_name = "".join( - c for c in user.title if c.isalnum() or c in (" ", "_") - ).rstrip() - else: - user = message.from_user - # User's first and last name for folder name, - # fallback to user ID if not available - user_folder_name = ( - f"{user.first_name}_{user.last_name}".strip() - if user.first_name and user.last_name - else str(user.id) - ) - - # Sanitize the folder name - user_folder_name = "".join( - c for c in user_folder_name if c.isalnum() or c in (" ", "_") - ).rstrip() - - user_folder = os.path.join(STORAGE, "telegram", user_folder_name) - os.makedirs(user_folder, exist_ok=True) - return user_folder - - -async def handle_media_message_contents(client: Client, message: Message): - - user_folder = get_user_folder(message) - - # Handle documents - if message.document: - file_name = message.document.file_name - file_path = os.path.join(user_folder, file_name) - await client.download_media(message, file_path) - await message.reply_text(f"Document saved to {user_folder}") - - # Handle single or multiple photos - elif message.photo: - file_name = f"photo_{message.photo.file_id}.jpg" - file_path = os.path.join(user_folder, file_name) - await client.download_media(message.photo, file_path) - await message.reply_text(f"Photo saved to {user_folder}") - - # Handle videos - elif message.video: - file_name = f"video_{message.video.file_id}.mp4" - file_path = os.path.join(user_folder, file_name) - await client.download_media(message, file_path) - await message.reply_text(f"Video saved to {user_folder}") - - # Handle GIFs - elif message.animation: - file_name = f"gif_{message.animation.file_id}.gif" - file_path = os.path.join(user_folder, file_name) - await client.download_media(message.animation, file_path) - await message.reply_text(f"GIF saved to {user_folder}") - - # Handle unknown data types - else: - await message.reply_text("Unknown media type!") - - -def download_tt_video(url: str) -> bool: - videos = snaptik(url) - now = datetime.datetime.now() - max_tries = 5 - - log.debug(f"Downloading video from {url}...") - - for video in videos: - video_filename = now.strftime("video-tiktok-%Y-%m-%d_%H-%M-%S.mp4") - video_filepath: os.path = os.path.join( - STORAGE, "tiktok", video_filename) - - for i in range(max_tries): - video_content = video.download() - is_valid_mp4 = integv.verify( - video_content.tobytes(), file_type="mp4") - log.debug( - f"Attempt {i+1}/{max_tries} to " - "download video, video valid: {is_valid_mp4}" - ) - if is_valid_mp4: - break - - if not is_valid_mp4: - log.error("Downloaded video is not a valid mp4 file") - return False - - with open(video_filepath, "wb") as f: - f.write(video_content) - log.debug("Video saved successfully") - - return True + bot_token=BOT_TOKEN, + workers=1) @app.on_message(filters.command("start")) -@protected +@security.protected async def start_handler(_, message: Message): await message.reply_text( "This bot downloads TikTok videos to my personal server" @@ -167,27 +32,35 @@ async def start_handler(_, message: Message): @app.on_message(filters.command("help")) -@protected +@security.protected async def help_handler(_, message: Message): await message.reply_text("I won't help you!") @app.on_message(filters.text) -@protected +@security.protected async def message_handler(_, message: Message): - urls = re.findall(r"\bhttps?://[^\s]+", message.text) + urls = utils.extract_urls(message.text) + if not urls: return await message.reply_text( "No links found in the message. Nothing to download!" ) + tt_urls = utils.filter_tt_urls(urls) + + if not tt_urls: + return await message.reply_text( + "No TikTok URLs found! Nothing to download!" + ) + success_count = 0 for i, url in enumerate(urls): msg = f"Downloading video {i+1}/{len(urls)}..." log.info(msg) await message.reply_text(msg) - outcome = download_tt_video(url) + outcome = utils.download_tt_video(STORAGE, url) success_count += 1 if outcome else 0 await message.reply_text(f"{success_count}/{len(urls)} " @@ -195,15 +68,14 @@ async def message_handler(_, message: Message): @app.on_message(filters.media) -@protected +@security.protected async def media_handler(client, message: Message): await message.reply_text("Downloading media...") - handle_media_message_contents(client, message) + utils.handle_media_message_contents(STORAGE, client, message) if __name__ == "__main__": - os.makedirs(os.path.join(STORAGE, "tiktok"), exist_ok=True) - os.makedirs(os.path.join(STORAGE, "telegram"), exist_ok=True) + utils.make_fs(STORAGE) app.run() diff --git a/telegram_downloader_bot/security.py b/telegram_downloader_bot/security.py new file mode 100644 index 0000000..6494046 --- /dev/null +++ b/telegram_downloader_bot/security.py @@ -0,0 +1,18 @@ +import os +from functools import wraps + +# Comma separated list of Telegram IDs that this bot will respond to +allowed_ids_raw = os.getenv("ALLOWED_IDS", "") +allowed_ids = allowed_ids_raw.split(",") + +print(allowed_ids_raw) +print(allowed_ids) + + +def protected(func): + @wraps(func) + async def wrapper(client, message): + if message.from_user.id not in allowed_ids: + return await message.reply_text("You are not on the list!") + return await func(client, message) + return wrapper diff --git a/telegram_downloader_bot/telemetry.py b/telegram_downloader_bot/telemetry.py new file mode 100644 index 0000000..8868bc5 --- /dev/null +++ b/telegram_downloader_bot/telemetry.py @@ -0,0 +1,10 @@ +import sentry_sdk + + +def init_telemetry() -> None: + sentry_sdk.init( + dsn="https://12d7a075d483fc133cde0ed82e72ac45@o4508071875313664.ingest.de.sentry.io/4508075566694480", # noqa: E501 + traces_sample_rate=1.0, + profiles_sample_rate=1.0, + enable_tracing=True + ) diff --git a/telegram_downloader_bot/utils.py b/telegram_downloader_bot/utils.py new file mode 100644 index 0000000..3d7c693 --- /dev/null +++ b/telegram_downloader_bot/utils.py @@ -0,0 +1,142 @@ +import integv +import os +import re + +from datetime import datetime +from pyrogram import Client +from pyrogram.types import Message +from tiktok_downloader import snaptik + +from telegram_downloader_bot.logger import log + + +async def get_user_folder(storage_path: os.path, message: Message) -> os.path: + """ Determine folder name used to save the media to. Depending on + which type of message (forwarded, direct) detect that person's + or group's name.""" + + if message.forward_from: + user = message.forward_from + # User's first and last name for folder name, + # fallback to user ID if not available + user_folder_name = ( + f"{user.first_name}_{user.last_name}".strip() + if user.first_name and user.last_name + else str(user.id) + ) + elif message.forward_from_chat: + user = message.forward_from_chat + # Use chat title for groups and channels + user_folder_name = "".join( + c for c in user.title if c.isalnum() or c in (" ", "_") + ).rstrip() + else: + user = message.from_user + # User's first and last name for folder name, + # fallback to user ID if not available + user_folder_name = ( + f"{user.first_name}_{user.last_name}".strip() + if user.first_name and user.last_name + else str(user.id) + ) + + # Sanitize the folder name + user_folder_name = "".join( + c for c in user_folder_name if c.isalnum() or c in (" ", "_") + ).rstrip() + + user_folder = os.path.join(storage_path, "telegram", user_folder_name) + os.makedirs(user_folder, exist_ok=True) + return user_folder + + +async def handle_media_message_contents(storage_path: os.path, + client: Client, + message: Message): + """Detect what kind of media is being sent over from the user. + Based on that, determine the correct file extension and save + that media.""" + + user_folder = get_user_folder(storage_path, message) + + # Handle documents + if message.document: + file_name = message.document.file_name + file_path = os.path.join(user_folder, file_name) + await client.download_media(message, file_path) + await message.reply_text(f"Document saved to {user_folder}") + + # Handle single or multiple photos + elif message.photo: + file_name = f"photo_{message.photo.file_id}.jpg" + file_path = os.path.join(user_folder, file_name) + await client.download_media(message.photo, file_path) + await message.reply_text(f"Photo saved to {user_folder}") + + # Handle videos + elif message.video: + file_name = f"video_{message.video.file_id}.mp4" + file_path = os.path.join(user_folder, file_name) + await client.download_media(message, file_path) + await message.reply_text(f"Video saved to {user_folder}") + + # Handle GIFs + elif message.animation: + file_name = f"gif_{message.animation.file_id}.gif" + file_path = os.path.join(user_folder, file_name) + await client.download_media(message.animation, file_path) + await message.reply_text(f"GIF saved to {user_folder}") + + # Handle unknown data types + else: + await message.reply_text("Unknown media type!") + + +def download_tt_video(storage_path: str, url: str) -> bool: + """Downloads tiktok video from a given URL. + Makes sure the video integrity is correct.""" + + videos = snaptik(url) + now = datetime.datetime.now() + max_tries = 5 + + log.debug(f"Downloading video from {url}...") + + for video in videos: + video_filename = now.strftime("video-tiktok-%Y-%m-%d_%H-%M-%S.mp4") + video_filepath: os.path = os.path.join( + storage_path, "tiktok", video_filename) + + for i in range(max_tries): + video_content = video.download().getbuffer() + is_valid_mp4 = integv.verify( + video_content.tobytes(), file_type="mp4") + log.debug( + f"Attempt {i+1}/{max_tries} to " + "download video, video valid: {is_valid_mp4}" + ) + if is_valid_mp4: + break + + if not is_valid_mp4: + log.error("Downloaded video is not a valid mp4 file") + return False + + with open(video_filepath, "wb") as f: + f.write(video_content) + log.debug("Video saved successfully") + + return True + + +def make_fs(storaga_path: str) -> None: + os.makedirs(os.path.join(storaga_path, "tiktok"), exist_ok=True) + os.makedirs(os.path.join(storaga_path, "telegram"), exist_ok=True) + + +def extract_urls(text: str) -> list: + return re.findall(r"\bhttps?://[^\s]+", text) + + +def filter_tt_urls(urls: list) -> list: + return [x for x in urls if "tiktok" in x]