Fix linter issues and unittests
This commit is contained in:
@@ -12,7 +12,8 @@ def protected(func):
|
|||||||
f"User with ID {message.from_user.id} attempted"
|
f"User with ID {message.from_user.id} attempted"
|
||||||
"to text this bot!")
|
"to text this bot!")
|
||||||
log.info(
|
log.info(
|
||||||
f"Only users allowed are: {' '.join(settings.allowed_ids_list)}")
|
"Only users allowed are:"
|
||||||
|
f"{' '.join(settings.allowed_ids_list)}")
|
||||||
return await message.reply_text("You are not on the list!")
|
return await message.reply_text("You are not on the list!")
|
||||||
return await func(client, message)
|
return await func(client, message)
|
||||||
return wrapper
|
return wrapper
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
import os
|
import os
|
||||||
import pickle
|
import pickle # nosec
|
||||||
import re
|
import re
|
||||||
|
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
@@ -102,7 +102,7 @@ def get_tt_hashes() -> set:
|
|||||||
return set()
|
return set()
|
||||||
|
|
||||||
with open(settings.tt_hash_file, "rb+") as f:
|
with open(settings.tt_hash_file, "rb+") as f:
|
||||||
all_tt_hashes: set = pickle.load(f)
|
all_tt_hashes: set = pickle.load(f) # nosec
|
||||||
print(all_tt_hashes)
|
print(all_tt_hashes)
|
||||||
return all_tt_hashes
|
return all_tt_hashes
|
||||||
|
|
||||||
|
|||||||
@@ -49,8 +49,11 @@ class TestGetUserFolder(unittest.TestCase):
|
|||||||
def setUp(self):
|
def setUp(self):
|
||||||
# Create a temporary directory for each test
|
# Create a temporary directory for each test
|
||||||
self.tmp_path = tempfile.mkdtemp()
|
self.tmp_path = tempfile.mkdtemp()
|
||||||
|
self.settings_patcher = patch('telegram_downloader_bot.settings.settings.storage', self.tmp_path)
|
||||||
|
self.settings_patcher.start()
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
|
self.settings_patcher.stop()
|
||||||
# Remove the directory after the test
|
# Remove the directory after the test
|
||||||
shutil.rmtree(self.tmp_path)
|
shutil.rmtree(self.tmp_path)
|
||||||
|
|
||||||
@@ -65,7 +68,7 @@ class TestGetUserFolder(unittest.TestCase):
|
|||||||
message.forward_from_chat = None
|
message.forward_from_chat = None
|
||||||
message.from_user = None
|
message.from_user = None
|
||||||
|
|
||||||
result = get_user_folder(self.tmp_path, message)
|
result = get_user_folder(message)
|
||||||
expected_folder = os.path.join(self.tmp_path, "telegram", "John_Doe")
|
expected_folder = os.path.join(self.tmp_path, "telegram", "John_Doe")
|
||||||
self.assertEqual(result, expected_folder)
|
self.assertEqual(result, expected_folder)
|
||||||
self.assertTrue(os.path.exists(expected_folder))
|
self.assertTrue(os.path.exists(expected_folder))
|
||||||
@@ -81,7 +84,7 @@ class TestGetUserFolder(unittest.TestCase):
|
|||||||
message.forward_from_chat = None
|
message.forward_from_chat = None
|
||||||
message.from_user = None
|
message.from_user = None
|
||||||
|
|
||||||
result = get_user_folder(self.tmp_path, message)
|
result = get_user_folder(message)
|
||||||
expected_folder = os.path.join(self.tmp_path, "telegram", "12345")
|
expected_folder = os.path.join(self.tmp_path, "telegram", "12345")
|
||||||
self.assertEqual(result, expected_folder)
|
self.assertEqual(result, expected_folder)
|
||||||
self.assertTrue(os.path.exists(expected_folder))
|
self.assertTrue(os.path.exists(expected_folder))
|
||||||
@@ -95,7 +98,7 @@ class TestGetUserFolder(unittest.TestCase):
|
|||||||
message.forward_from_chat = chat
|
message.forward_from_chat = chat
|
||||||
message.from_user = None
|
message.from_user = None
|
||||||
|
|
||||||
result = get_user_folder(self.tmp_path, message)
|
result = get_user_folder(message)
|
||||||
expected_folder = os.path.join(
|
expected_folder = os.path.join(
|
||||||
self.tmp_path, "telegram", "My_Awesome_GroupChat"
|
self.tmp_path, "telegram", "My_Awesome_GroupChat"
|
||||||
)
|
)
|
||||||
@@ -113,7 +116,7 @@ class TestGetUserFolder(unittest.TestCase):
|
|||||||
message.forward_from_chat = None
|
message.forward_from_chat = None
|
||||||
message.from_user = user
|
message.from_user = user
|
||||||
|
|
||||||
result = get_user_folder(self.tmp_path, message)
|
result = get_user_folder(message)
|
||||||
expected_folder = os.path.join(self.tmp_path, "telegram", "Jane_Doe")
|
expected_folder = os.path.join(self.tmp_path, "telegram", "Jane_Doe")
|
||||||
self.assertEqual(result, expected_folder)
|
self.assertEqual(result, expected_folder)
|
||||||
self.assertTrue(os.path.exists(expected_folder))
|
self.assertTrue(os.path.exists(expected_folder))
|
||||||
@@ -129,7 +132,7 @@ class TestGetUserFolder(unittest.TestCase):
|
|||||||
message.forward_from_chat = None
|
message.forward_from_chat = None
|
||||||
message.from_user = user
|
message.from_user = user
|
||||||
|
|
||||||
result = get_user_folder(self.tmp_path, message)
|
result = get_user_folder(message)
|
||||||
expected_folder = os.path.join(self.tmp_path, "telegram", "54321")
|
expected_folder = os.path.join(self.tmp_path, "telegram", "54321")
|
||||||
self.assertEqual(result, expected_folder)
|
self.assertEqual(result, expected_folder)
|
||||||
self.assertTrue(os.path.exists(expected_folder))
|
self.assertTrue(os.path.exists(expected_folder))
|
||||||
@@ -139,8 +142,12 @@ class TestHandleMediaMessageContents(unittest.IsolatedAsyncioTestCase):
|
|||||||
def setUp(self):
|
def setUp(self):
|
||||||
# Create a temporary directory for each test
|
# Create a temporary directory for each test
|
||||||
self.tmp_path = tempfile.mkdtemp()
|
self.tmp_path = tempfile.mkdtemp()
|
||||||
|
self.settings_patcher = patch('telegram_downloader_bot.settings.settings.storage', self.tmp_path)
|
||||||
|
self.settings_patcher.start()
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
|
# Stop patching settings.storage
|
||||||
|
self.settings_patcher.stop()
|
||||||
# Remove the directory after the test
|
# Remove the directory after the test
|
||||||
shutil.rmtree(self.tmp_path)
|
shutil.rmtree(self.tmp_path)
|
||||||
|
|
||||||
@@ -161,7 +168,7 @@ class TestHandleMediaMessageContents(unittest.IsolatedAsyncioTestCase):
|
|||||||
message.animation = None
|
message.animation = None
|
||||||
message.reply_text = AsyncMock()
|
message.reply_text = AsyncMock()
|
||||||
|
|
||||||
await handle_media_message_contents(self.tmp_path, client, message)
|
await handle_media_message_contents(client, message)
|
||||||
|
|
||||||
expected_file_name = f"video_{message.video.file_id}.mp4"
|
expected_file_name = f"video_{message.video.file_id}.mp4"
|
||||||
expected_file_path = os.path.join(user_folder, expected_file_name)
|
expected_file_path = os.path.join(user_folder, expected_file_name)
|
||||||
@@ -187,7 +194,7 @@ class TestHandleMediaMessageContents(unittest.IsolatedAsyncioTestCase):
|
|||||||
message.animation.file_id = "animation_file_id"
|
message.animation.file_id = "animation_file_id"
|
||||||
message.reply_text = AsyncMock()
|
message.reply_text = AsyncMock()
|
||||||
|
|
||||||
await handle_media_message_contents(self.tmp_path, client, message)
|
await handle_media_message_contents(client, message)
|
||||||
|
|
||||||
expected_file_name = f"gif_{message.animation.file_id}.gif"
|
expected_file_name = f"gif_{message.animation.file_id}.gif"
|
||||||
expected_file_path = os.path.join(user_folder, expected_file_name)
|
expected_file_path = os.path.join(user_folder, expected_file_name)
|
||||||
@@ -213,7 +220,7 @@ class TestHandleMediaMessageContents(unittest.IsolatedAsyncioTestCase):
|
|||||||
message.animation = None
|
message.animation = None
|
||||||
message.reply_text = AsyncMock()
|
message.reply_text = AsyncMock()
|
||||||
|
|
||||||
await handle_media_message_contents(self.tmp_path, client, message)
|
await handle_media_message_contents(client, message)
|
||||||
|
|
||||||
expected_file_path = os.path.join(user_folder, "test_document.pdf")
|
expected_file_path = os.path.join(user_folder, "test_document.pdf")
|
||||||
client.download_media.assert_awaited_once_with(
|
client.download_media.assert_awaited_once_with(
|
||||||
@@ -238,7 +245,7 @@ class TestHandleMediaMessageContents(unittest.IsolatedAsyncioTestCase):
|
|||||||
message.animation = None
|
message.animation = None
|
||||||
message.reply_text = AsyncMock()
|
message.reply_text = AsyncMock()
|
||||||
|
|
||||||
await handle_media_message_contents(self.tmp_path, client, message)
|
await handle_media_message_contents(client, message)
|
||||||
|
|
||||||
expected_file_name = f"photo_{message.photo.file_id}.jpg"
|
expected_file_name = f"photo_{message.photo.file_id}.jpg"
|
||||||
expected_file_path = os.path.join(user_folder, expected_file_name)
|
expected_file_path = os.path.join(user_folder, expected_file_name)
|
||||||
@@ -263,7 +270,7 @@ class TestHandleMediaMessageContents(unittest.IsolatedAsyncioTestCase):
|
|||||||
message.animation = None
|
message.animation = None
|
||||||
message.reply_text = AsyncMock()
|
message.reply_text = AsyncMock()
|
||||||
|
|
||||||
await handle_media_message_contents(self.tmp_path, client, message)
|
await handle_media_message_contents(client, message)
|
||||||
|
|
||||||
client.download_media.assert_not_called()
|
client.download_media.assert_not_called()
|
||||||
message.reply_text.assert_awaited_once_with("Unknown media type!")
|
message.reply_text.assert_awaited_once_with("Unknown media type!")
|
||||||
@@ -274,6 +281,8 @@ class TestDownloadTTVideo(unittest.TestCase):
|
|||||||
# Create a temporary directory for each test
|
# Create a temporary directory for each test
|
||||||
self.tmp_path = tempfile.mkdtemp()
|
self.tmp_path = tempfile.mkdtemp()
|
||||||
os.makedirs(os.path.join(self.tmp_path, "tiktok"), exist_ok=True)
|
os.makedirs(os.path.join(self.tmp_path, "tiktok"), exist_ok=True)
|
||||||
|
self.settings_patcher = patch("telegram_downloader_bot.settings.settings.storage", self.tmp_path)
|
||||||
|
self.settings_patcher.start()
|
||||||
|
|
||||||
# Paths to the valid and invalid video files
|
# Paths to the valid and invalid video files
|
||||||
self.valid_video_path = os.path.join(self.tmp_path, "valid.mp4")
|
self.valid_video_path = os.path.join(self.tmp_path, "valid.mp4")
|
||||||
@@ -285,6 +294,7 @@ class TestDownloadTTVideo(unittest.TestCase):
|
|||||||
f.write(b'invalid mp4 content')
|
f.write(b'invalid mp4 content')
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
|
self.settings_patcher.stop()
|
||||||
# Remove the directory after the test
|
# Remove the directory after the test
|
||||||
shutil.rmtree(self.tmp_path)
|
shutil.rmtree(self.tmp_path)
|
||||||
|
|
||||||
@@ -305,7 +315,7 @@ class TestDownloadTTVideo(unittest.TestCase):
|
|||||||
mock_snaptik.return_value = [mock_video]
|
mock_snaptik.return_value = [mock_video]
|
||||||
|
|
||||||
# Call the function
|
# Call the function
|
||||||
download_tt_video(self.tmp_path, "http://tiktok.com/video123")
|
download_tt_video("http://tiktok.com/video123")
|
||||||
|
|
||||||
# Verify that the file was saved correctly
|
# Verify that the file was saved correctly
|
||||||
video_filename = mock_now.strftime(
|
video_filename = mock_now.strftime(
|
||||||
@@ -334,7 +344,7 @@ class TestDownloadTTVideo(unittest.TestCase):
|
|||||||
mock_snaptik.return_value = [mock_video]
|
mock_snaptik.return_value = [mock_video]
|
||||||
|
|
||||||
# Call the function
|
# Call the function
|
||||||
download_tt_video(self.tmp_path, "http://tiktok.com/video123")
|
download_tt_video("http://tiktok.com/video123")
|
||||||
|
|
||||||
# Verify that the file was saved
|
# Verify that the file was saved
|
||||||
video_filename = mock_now.strftime(
|
video_filename = mock_now.strftime(
|
||||||
@@ -357,7 +367,7 @@ class TestDownloadTTVideo(unittest.TestCase):
|
|||||||
mock_snaptik.return_value = []
|
mock_snaptik.return_value = []
|
||||||
|
|
||||||
# Call the function
|
# Call the function
|
||||||
download_tt_video(self.tmp_path, "http://tiktok.com/video123")
|
download_tt_video("http://tiktok.com/video123")
|
||||||
|
|
||||||
# Verify that no files were created
|
# Verify that no files were created
|
||||||
tiktok_folder = os.path.join(self.tmp_path, "tiktok")
|
tiktok_folder = os.path.join(self.tmp_path, "tiktok")
|
||||||
|
|||||||
Reference in New Issue
Block a user