diff --git a/telegram_downloader_bot/utils.py b/telegram_downloader_bot/utils.py index 3d7c693..fac09c1 100644 --- a/telegram_downloader_bot/utils.py +++ b/telegram_downloader_bot/utils.py @@ -1,4 +1,3 @@ -import integv import os import re @@ -7,43 +6,45 @@ from pyrogram import Client from pyrogram.types import Message from tiktok_downloader import snaptik -from telegram_downloader_bot.logger import log +def sanitize_name(input: str) -> str: + """Sanize string by removing non aplhanumeric characters and spaces.""" + output = re.sub("[^a-zA-Z0-9- ]", "", input) + output = output.replace(" ", "_") + return output -async def get_user_folder(storage_path: os.path, message: Message) -> os.path: +def get_user_folder(storage_path: os.path, message: Message) -> os.path: """ Determine folder name used to save the media to. Depending on which type of message (forwarded, direct) detect that person's or group's name.""" + # Message forwarded from someone if message.forward_from: user = message.forward_from - # User's first and last name for folder name, - # fallback to user ID if not available - user_folder_name = ( - f"{user.first_name}_{user.last_name}".strip() - if user.first_name and user.last_name - else str(user.id) - ) + if user.first_name and user.last_name: + # User's first and last name for folder name + user_folder_name = f"{user.first_name} {user.last_name}" + else: + # fallback to user ID if not available + user_folder_name = str(user.id) + + # Message forwarded from chat elif message.forward_from_chat: user = message.forward_from_chat - # Use chat title for groups and channels - user_folder_name = "".join( - c for c in user.title if c.isalnum() or c in (" ", "_") - ).rstrip() + user_folder_name = user.title + + # Direct message from user else: user = message.from_user - # User's first and last name for folder name, - # fallback to user ID if not available - user_folder_name = ( - f"{user.first_name}_{user.last_name}".strip() - if user.first_name and user.last_name - else str(user.id) - ) + if user.first_name and user.last_name: + # User's first and last name for folder name + user_folder_name = f"{user.first_name} {user.last_name}" + else: + # fallback to user ID if not available + user_folder_name = str(user.id) # Sanitize the folder name - user_folder_name = "".join( - c for c in user_folder_name if c.isalnum() or c in (" ", "_") - ).rstrip() + user_folder_name = sanitize_name(user_folder_name) user_folder = os.path.join(storage_path, "telegram", user_folder_name) os.makedirs(user_folder, exist_ok=True) @@ -92,41 +93,22 @@ async def handle_media_message_contents(storage_path: os.path, await message.reply_text("Unknown media type!") -def download_tt_video(storage_path: str, url: str) -> bool: +def download_tt_video(storage_path: str, url: str) -> None: """Downloads tiktok video from a given URL. Makes sure the video integrity is correct.""" videos = snaptik(url) now = datetime.datetime.now() - max_tries = 5 - - log.debug(f"Downloading video from {url}...") for video in videos: video_filename = now.strftime("video-tiktok-%Y-%m-%d_%H-%M-%S.mp4") - video_filepath: os.path = os.path.join( - storage_path, "tiktok", video_filename) - - for i in range(max_tries): - video_content = video.download().getbuffer() - is_valid_mp4 = integv.verify( - video_content.tobytes(), file_type="mp4") - log.debug( - f"Attempt {i+1}/{max_tries} to " - "download video, video valid: {is_valid_mp4}" - ) - if is_valid_mp4: - break - - if not is_valid_mp4: - log.error("Downloaded video is not a valid mp4 file") - return False + video_filepath: os.path = os.path.join(storage_path, + "tiktok", + video_filename) + video_content = video.download().getbuffer() with open(video_filepath, "wb") as f: f.write(video_content) - log.debug("Video saved successfully") - - return True def make_fs(storaga_path: str) -> None: