import os import pickle # nosec import re from datetime import datetime from hashlib import sha256 from pyrogram import Client from pyrogram.types import Message from tiktok_downloader import snaptik from telegram_downloader_bot.settings import settings def sanitize_name(input: str) -> str: """Sanize string by removing non aplhanumeric characters and spaces.""" output = re.sub("[^a-zA-Z0-9- ]", "", input) output = output.replace(" ", "_") return output def get_user_folder(message: Message) -> os.path: """ Determine folder name used to save the media to. Depending on which type of message (forwarded, direct) detect that person's or group's name.""" # Message forwarded from someone if message.forward_from: user = message.forward_from if user.first_name and user.last_name: # User's first and last name for folder name user_folder_name = f"{user.first_name} {user.last_name}" else: # fallback to user ID if not available user_folder_name = str(user.id) # Message forwarded from chat elif message.forward_from_chat: user = message.forward_from_chat user_folder_name = user.title # Direct message from user else: user = message.from_user if user.first_name and user.last_name: # User's first and last name for folder name user_folder_name = f"{user.first_name} {user.last_name}" else: # fallback to user ID if not available user_folder_name = str(user.id) # Sanitize the folder name user_folder_name = sanitize_name(user_folder_name) user_folder = os.path.join(settings.storage, "telegram", user_folder_name) os.makedirs(user_folder, exist_ok=True) return user_folder async def handle_media_message_contents(client: Client, message: Message): """Detect what kind of media is being sent over from the user. Based on that, determine the correct file extension and save that media.""" user_folder = get_user_folder(message) # Handle documents if message.document: file_name = message.document.file_name file_path = os.path.join(user_folder, file_name) await client.download_media(message, file_path) await message.reply_text(f"Document saved to {user_folder}") # Handle single or multiple photos elif message.photo: file_name = f"photo_{message.photo.file_id}.jpg" file_path = os.path.join(user_folder, file_name) await client.download_media(message.photo, file_path) await message.reply_text(f"Photo saved to {user_folder}") # Handle videos elif message.video: file_name = f"video_{message.video.file_id}.mp4" file_path = os.path.join(user_folder, file_name) await client.download_media(message, file_path) await message.reply_text(f"Video saved to {user_folder}") # Handle GIFs elif message.animation: file_name = f"gif_{message.animation.file_id}.gif" file_path = os.path.join(user_folder, file_name) await client.download_media(message.animation, file_path) await message.reply_text(f"GIF saved to {user_folder}") # Handle unknown data types else: await message.reply_text("Unknown media type!") def get_tt_hashes() -> set: if not os.path.exists(settings.tt_hash_file): return set() with open(settings.tt_hash_file, "rb+") as f: all_tt_hashes: set = pickle.load(f) # nosec print(all_tt_hashes) return all_tt_hashes def add_to_hashes(new_hash: str) -> None: all_tt_hashes = get_tt_hashes() all_tt_hashes.add(new_hash) save_tt_hashes(all_tt_hashes) def save_tt_hashes(hashes: set) -> None: with open(settings.tt_hash_file, "wb+") as f: pickle.dump(hashes, f, protocol=pickle.HIGHEST_PROTOCOL) def check_if_tt_downloaded(tt_hash: str) -> bool: all_tt_hashes = get_tt_hashes() return tt_hash in all_tt_hashes def download_tt_video(url: str) -> str: """Downloads tiktok video from a given URL. Makes sure the video integrity is correct.""" videos = snaptik(url) now = datetime.now() for video in videos: video_filename = now.strftime("video-tiktok-%Y-%m-%d_%H-%M-%S.mp4") video_filepath: os.path = os.path.join(settings.storage, "tiktok", video_filename) video_content = video.download().getbuffer() video_hash = sha256(video_content).hexdigest() print(video_hash) if check_if_tt_downloaded(video_hash): return "Already downloaded" with open(video_filepath, "wb") as f: f.write(video_content) add_to_hashes(video_hash) return "Downloaded ok" def make_fs(storaga_path: str) -> None: os.makedirs(os.path.join(storaga_path, "tiktok"), exist_ok=True) os.makedirs(os.path.join(storaga_path, "telegram"), exist_ok=True) def extract_urls(text: str) -> list: return re.findall(r"\bhttps?://[^\s]+", text) def filter_tt_urls(urls: list) -> list: return [x for x in urls if "tiktok" in x]