import unittest import os import re import asyncio import tempfile import shutil from unittest.mock import Mock, AsyncMock, patch from datetime import datetime from telegram_downloader_bot.utils import ( get_user_folder, handle_media_message_contents, download_tt_video, make_fs, extract_urls, filter_tt_urls, ) from pyrogram.types import Message, User, Chat class TestGetUserFolder(unittest.IsolatedAsyncioTestCase): def setUp(self): # Create a temporary directory for each test self.tmp_path = tempfile.mkdtemp() def tearDown(self): # Remove the directory after the test shutil.rmtree(self.tmp_path) async def test_forward_from_full_name(self): user = Mock() user.first_name = "John" user.last_name = "Doe" user.id = 12345 message = Mock() message.forward_from = user message.forward_from_chat = None message.from_user = None result = await get_user_folder(self.tmp_path, message) expected_folder = os.path.join(self.tmp_path, "telegram", "John_Doe") self.assertEqual(result, expected_folder) self.assertTrue(os.path.exists(expected_folder)) async def test_forward_from_first_name_only(self): user = Mock() user.first_name = "John" user.last_name = None user.id = 12345 message = Mock() message.forward_from = user message.forward_from_chat = None message.from_user = None result = await get_user_folder(self.tmp_path, message) expected_folder = os.path.join(self.tmp_path, "telegram", "12345") self.assertEqual(result, expected_folder) self.assertTrue(os.path.exists(expected_folder)) async def test_forward_from_no_name(self): user = Mock() user.first_name = None user.last_name = None user.id = 12345 message = Mock() message.forward_from = user message.forward_from_chat = None message.from_user = None result = await get_user_folder(self.tmp_path, message) expected_folder = os.path.join(self.tmp_path, "telegram", "12345") self.assertEqual(result, expected_folder) self.assertTrue(os.path.exists(expected_folder)) async def test_forward_from_chat_special_chars(self): chat = Mock() chat.title = "My *Awesome* Group/Chat!" message = Mock() message.forward_from = None message.forward_from_chat = chat message.from_user = None result = await get_user_folder(self.tmp_path, message) expected_folder = os.path.join( self.tmp_path, "telegram", "My_Awesome_GroupChat" ) self.assertEqual(result, expected_folder) self.assertTrue(os.path.exists(expected_folder)) async def test_from_user_full_name(self): user = Mock() user.first_name = "Jane" user.last_name = "Doe" user.id = 54321 message = Mock() message.forward_from = None message.forward_from_chat = None message.from_user = user result = await get_user_folder(self.tmp_path, message) expected_folder = os.path.join(self.tmp_path, "telegram", "Jane_Doe") self.assertEqual(result, expected_folder) self.assertTrue(os.path.exists(expected_folder)) async def test_from_user_first_name_only(self): user = Mock() user.first_name = "Jane" user.last_name = None user.id = 54321 message = Mock() message.forward_from = None message.forward_from_chat = None message.from_user = user result = await get_user_folder(self.tmp_path, message) expected_folder = os.path.join(self.tmp_path, "telegram", "54321") self.assertEqual(result, expected_folder) self.assertTrue(os.path.exists(expected_folder)) async def test_special_characters_in_name(self): user = Mock() user.first_name = "Ja*ne" user.last_name = "Do/e" user.id = 54321 message = Mock() message.forward_from = None message.forward_from_chat = None message.from_user = user result = await get_user_folder(self.tmp_path, message) expected_folder = os.path.join(self.tmp_path, "telegram", "Jane_Doe") self.assertEqual(result, expected_folder) self.assertTrue(os.path.exists(expected_folder)) class TestHandleMediaMessageContents(unittest.IsolatedAsyncioTestCase): def setUp(self): # Create a temporary directory for each test self.tmp_path = tempfile.mkdtemp() def tearDown(self): # Remove the directory after the test shutil.rmtree(self.tmp_path) @patch("telegram_downloader_bot.utils.get_user_folder") async def test_document(self, mock_get_user_folder): user_folder = os.path.join(self.tmp_path, "user_folder") mock_get_user_folder.return_value = user_folder os.makedirs(user_folder, exist_ok=True) client = Mock() client.download_media = AsyncMock() message = Mock() message.document = Mock() message.document.file_name = "test_document.pdf" message.photo = None message.video = None message.animation = None message.reply_text = AsyncMock() await handle_media_message_contents(self.tmp_path, client, message) client.download_media.assert_awaited_once_with( message, os.path.join(user_folder, "test_document.pdf") ) message.reply_text.assert_awaited_once_with(f"Document saved to {user_folder}") @patch("telegram_downloader_bot.utils.get_user_folder") async def test_photo(self, mock_get_user_folder): user_folder = os.path.join(self.tmp_path, "user_folder") mock_get_user_folder.return_value = user_folder os.makedirs(user_folder, exist_ok=True) client = Mock() client.download_media = AsyncMock() message = Mock() message.document = None message.photo = Mock() message.photo.file_id = "1234567890" message.video = None message.animation = None message.reply_text = AsyncMock() await handle_media_message_contents(self.tmp_path, client, message) expected_file = os.path.join(user_folder, f"photo_{message.photo.file_id}.jpg") client.download_media.assert_awaited_once_with(message.photo, expected_file) message.reply_text.assert_awaited_once_with(f"Photo saved to {user_folder}") @patch("telegram_downloader_bot.utils.get_user_folder") async def test_unknown_media(self, mock_get_user_folder): user_folder = os.path.join(self.tmp_path, "user_folder") mock_get_user_folder.return_value = user_folder os.makedirs(user_folder, exist_ok=True) client = Mock() client.download_media = AsyncMock() message = Mock() message.document = None message.photo = None message.video = None message.animation = None message.reply_text = AsyncMock() await handle_media_message_contents(self.tmp_path, client, message) client.download_media.assert_not_awaited() message.reply_text.assert_awaited_once_with("Unknown media type!") class TestDownloadTTVideo(unittest.TestCase): def setUp(self): # Create a temporary directory for each test self.tmp_path = tempfile.mkdtemp() def tearDown(self): # Remove the directory after the test shutil.rmtree(self.tmp_path) @patch("telegram_downloader_bot.utils.snaptik") @patch("telegram_downloader_bot.utils.integv.verify") @patch("telegram_downloader_bot.utils.datetime") def test_success(self, mock_datetime, mock_verify, mock_snaptik): mock_video = Mock() mock_video.download.return_value.getbuffer.return_value = b"video_content" mock_snaptik.return_value = [mock_video] mock_verify.return_value = True mock_now = datetime(2023, 1, 1, 12, 0, 0) mock_datetime.now.return_value = mock_now result = download_tt_video(self.tmp_path, "http://tiktok.com/video123") self.assertTrue(result) video_filename = mock_now.strftime("video-tiktok-%Y-%m-%d_%H-%M-%S.mp4") video_filepath = os.path.join(self.tmp_path, "tiktok", video_filename) self.assertTrue(os.path.exists(video_filepath)) @patch("telegram_downloader_bot.utils.snaptik") @patch("telegram_downloader_bot.utils.integv.verify") @patch("telegram_downloader_bot.utils.datetime") def test_failure(self, mock_datetime, mock_verify, mock_snaptik): mock_video = Mock() mock_video.download.return_value.getbuffer.return_value = b"video_content" mock_snaptik.return_value = [mock_video] mock_verify.return_value = False mock_now = datetime(2023, 1, 1, 12, 0, 0) mock_datetime.now.return_value = mock_now result = download_tt_video(self.tmp_path, "http://tiktok.com/video123") self.assertFalse(result) video_filename = mock_now.strftime("video-tiktok-%Y-%m-%d_%H-%M-%S.mp4") video_filepath = os.path.join(self.tmp_path, "tiktok", video_filename) self.assertFalse(os.path.exists(video_filepath)) class TestMakeFS(unittest.TestCase): def setUp(self): # Create a temporary directory for each test self.tmp_path = tempfile.mkdtemp() def tearDown(self): # Remove the directory after the test shutil.rmtree(self.tmp_path) def test_make_fs(self): make_fs(self.tmp_path) self.assertTrue(os.path.exists(os.path.join(self.tmp_path, "tiktok"))) self.assertTrue(os.path.exists(os.path.join(self.tmp_path, "telegram"))) class TestExtractURLs(unittest.TestCase): def test_no_urls(self): result = extract_urls("This is some text without any URLs.") self.assertEqual(result, []) def test_one_url(self): result = extract_urls("Check out this link: http://example.com") self.assertEqual(result, ["http://example.com"]) def test_multiple_urls(self): result = extract_urls( "Here are some links: http://example.com and https://test.com/page" ) self.assertEqual(result, ["http://example.com", "https://test.com/page"]) def test_malformed_url(self): result = extract_urls("This is not a URL: htt://badurl.com") self.assertEqual(result, []) def test_url_at_text_boundaries(self): result = extract_urls("http://start.com text in the middle https://end.com") self.assertEqual(result, ["http://start.com", "https://end.com"]) class TestFilterTTURLs(unittest.TestCase): def test_empty_list(self): result = filter_tt_urls([]) self.assertEqual(result, []) def test_no_tiktok_urls(self): urls = ["http://example.com", "https://test.com/page"] result = filter_tt_urls(urls) self.assertEqual(result, []) def test_only_tiktok_urls(self): urls = ["http://tiktok.com/video1", "https://www.tiktok.com/@user/video/123"] result = filter_tt_urls(urls) self.assertEqual(result, urls) def test_mixed_urls(self): urls = ["http://example.com", "https://www.tiktok.com/@user/video/123"] result = filter_tt_urls(urls) self.assertEqual(result, ["https://www.tiktok.com/@user/video/123"]) def test_tiktok_in_query(self): urls = ["http://example.com?param=tiktok", "https://www.other.com/path"] result = filter_tt_urls(urls) self.assertEqual(result, ["http://example.com?param=tiktok"])