Split some functions to make unittests easier

This commit is contained in:
Roman Krček
2024-10-13 18:14:59 +02:00
parent b74616ab82
commit af6282e26d

View File

@@ -1,4 +1,3 @@
import integv
import os import os
import re import re
@@ -7,43 +6,45 @@ from pyrogram import Client
from pyrogram.types import Message from pyrogram.types import Message
from tiktok_downloader import snaptik from tiktok_downloader import snaptik
from telegram_downloader_bot.logger import log def sanitize_name(input: str) -> str:
"""Sanize string by removing non aplhanumeric characters and spaces."""
output = re.sub("[^a-zA-Z0-9- ]", "", input)
output = output.replace(" ", "_")
return output
async def get_user_folder(storage_path: os.path, message: Message) -> os.path: def get_user_folder(storage_path: os.path, message: Message) -> os.path:
""" Determine folder name used to save the media to. Depending on """ Determine folder name used to save the media to. Depending on
which type of message (forwarded, direct) detect that person's which type of message (forwarded, direct) detect that person's
or group's name.""" or group's name."""
# Message forwarded from someone
if message.forward_from: if message.forward_from:
user = message.forward_from user = message.forward_from
# User's first and last name for folder name, if user.first_name and user.last_name:
# User's first and last name for folder name
user_folder_name = f"{user.first_name} {user.last_name}"
else:
# fallback to user ID if not available # fallback to user ID if not available
user_folder_name = ( user_folder_name = str(user.id)
f"{user.first_name}_{user.last_name}".strip()
if user.first_name and user.last_name # Message forwarded from chat
else str(user.id)
)
elif message.forward_from_chat: elif message.forward_from_chat:
user = message.forward_from_chat user = message.forward_from_chat
# Use chat title for groups and channels user_folder_name = user.title
user_folder_name = "".join(
c for c in user.title if c.isalnum() or c in (" ", "_") # Direct message from user
).rstrip()
else: else:
user = message.from_user user = message.from_user
# User's first and last name for folder name, if user.first_name and user.last_name:
# User's first and last name for folder name
user_folder_name = f"{user.first_name} {user.last_name}"
else:
# fallback to user ID if not available # fallback to user ID if not available
user_folder_name = ( user_folder_name = str(user.id)
f"{user.first_name}_{user.last_name}".strip()
if user.first_name and user.last_name
else str(user.id)
)
# Sanitize the folder name # Sanitize the folder name
user_folder_name = "".join( user_folder_name = sanitize_name(user_folder_name)
c for c in user_folder_name if c.isalnum() or c in (" ", "_")
).rstrip()
user_folder = os.path.join(storage_path, "telegram", user_folder_name) user_folder = os.path.join(storage_path, "telegram", user_folder_name)
os.makedirs(user_folder, exist_ok=True) os.makedirs(user_folder, exist_ok=True)
@@ -92,41 +93,22 @@ async def handle_media_message_contents(storage_path: os.path,
await message.reply_text("Unknown media type!") await message.reply_text("Unknown media type!")
def download_tt_video(storage_path: str, url: str) -> bool: def download_tt_video(storage_path: str, url: str) -> None:
"""Downloads tiktok video from a given URL. """Downloads tiktok video from a given URL.
Makes sure the video integrity is correct.""" Makes sure the video integrity is correct."""
videos = snaptik(url) videos = snaptik(url)
now = datetime.datetime.now() now = datetime.datetime.now()
max_tries = 5
log.debug(f"Downloading video from {url}...")
for video in videos: for video in videos:
video_filename = now.strftime("video-tiktok-%Y-%m-%d_%H-%M-%S.mp4") video_filename = now.strftime("video-tiktok-%Y-%m-%d_%H-%M-%S.mp4")
video_filepath: os.path = os.path.join( video_filepath: os.path = os.path.join(storage_path,
storage_path, "tiktok", video_filename) "tiktok",
video_filename)
for i in range(max_tries):
video_content = video.download().getbuffer() video_content = video.download().getbuffer()
is_valid_mp4 = integv.verify(
video_content.tobytes(), file_type="mp4")
log.debug(
f"Attempt {i+1}/{max_tries} to "
"download video, video valid: {is_valid_mp4}"
)
if is_valid_mp4:
break
if not is_valid_mp4:
log.error("Downloaded video is not a valid mp4 file")
return False
with open(video_filepath, "wb") as f: with open(video_filepath, "wb") as f:
f.write(video_content) f.write(video_content)
log.debug("Video saved successfully")
return True
def make_fs(storaga_path: str) -> None: def make_fs(storaga_path: str) -> None: