Compare commits

..

18 Commits

Author SHA1 Message Date
f08b9d1285 Build release update only once a month
All checks were successful
Build Docker image / test (push) Successful in 2m40s
Build Docker image / build (push) Successful in 4m50s
2025-05-29 11:09:51 +02:00
02cbbee625 revert ac1648f56c
All checks were successful
Build Docker image / test (push) Successful in 4m30s
Build Docker image / build (push) Successful in 5m17s
revert Use latest image as cache
Cache needs to be a separate image as it has different format
2025-05-14 11:03:21 +02:00
Roman Krček
ac1648f56c Use latest image as cache
All checks were successful
Build Docker image / test (push) Successful in 2m6s
Build Docker image / build (push) Successful in 5m3s
2025-05-13 18:21:39 +02:00
Roman Krček
b01a799a94 Fix timing of async functions and make histogram finer at lower ranges
Some checks failed
Build Docker image / test (push) Successful in 4m34s
Build Docker image / build (push) Has been cancelled
2025-05-13 18:14:36 +02:00
Roman Krček
020a6271cf Better debugging
All checks were successful
Build Docker image / test (push) Successful in 3m59s
Build Docker image / build (push) Successful in 5m8s
2025-05-09 22:44:52 +02:00
Roman Krček
df5538d7ee Remove incorrect remove statement
All checks were successful
Build Docker image / test (push) Successful in 4m53s
Build Docker image / build (push) Successful in 1m40s
2025-05-09 22:27:45 +02:00
Roman Krček
4b74810912 Remove logging and duplicate videos
All checks were successful
Build Docker image / test (push) Successful in 2m54s
Build Docker image / build (push) Successful in 1m14s
2025-05-06 20:24:22 +02:00
Roman Krček
8829caceee Add prometheus telemetry
All checks were successful
Build Docker image / test (push) Successful in 5m36s
Build Docker image / build (push) Successful in 8m19s
2025-05-06 20:06:36 +02:00
Roman Krček
8cc1c55026 Fix awaits for async functions
Some checks failed
Build Docker image / test (push) Has started running
Build Docker image / build (push) Has been cancelled
2025-02-16 17:23:25 +01:00
Roman Krček
e12eaa0fe1 Fix makefile for moder docker compose 2025-02-16 17:22:11 +01:00
Roman Krček
f2fcea2333 Add env fix for CI and remove caching
All checks were successful
Build Docker image / test (push) Successful in 2m13s
Build Docker image / build (push) Successful in 4m41s
2024-10-14 09:38:02 +02:00
Roman Krček
98250acd21 Add passenv to tox.ini
Some checks failed
Build Docker image / test (push) Failing after 2m42s
Build Docker image / build (push) Has been skipped
2024-10-14 09:33:52 +02:00
Roman Krček
21afea7f39 Add caching and test different style of env import
Some checks failed
Build Docker image / test (push) Failing after 3m17s
Build Docker image / build (push) Has been skipped
2024-10-14 09:22:47 +02:00
Roman Krček
7253745a50 Add example .env file so unittests pass in CI environment
Some checks failed
Build Docker image / test (push) Failing after 2m40s
Build Docker image / build (push) Has been skipped
2024-10-14 09:16:07 +02:00
Roman Krček
3075743c5d Fix linter issues and unittests
Some checks failed
Build Docker image / build (push) Has been skipped
Build Docker image / test (push) Failing after 2m41s
2024-10-13 21:18:18 +02:00
Roman Krček
6d508121b0 Add hash checking to TT downloading 2024-10-13 21:07:56 +02:00
Roman Krček
47248f10ab Add computed properties to settings 2024-10-13 21:05:51 +02:00
Roman Krček
47472d59b7 Move log level to settings
All checks were successful
Build Docker image / test (push) Successful in 2m50s
Build Docker image / build (push) Successful in 53s
2024-10-13 20:24:33 +02:00
13 changed files with 404 additions and 288 deletions

7
.env.example Normal file
View File

@@ -0,0 +1,7 @@
APP_ENV=DEV
API_ID=20798818
API_HASH=c657773dc9a68823d5ae2c69e66d9d09
BOT_TOKEN=6811299384:AAFPUDfE-bJyw8g4p01x6IhofXBBxgEd4es
STORAGE=/data
ALLOWED_IDS=1868160614
LOG_LEVEL=INFO

View File

@@ -5,7 +5,7 @@ on:
branches: branches:
- main - main
schedule: schedule:
- cron: "0 22 * * 0" # sunday 22:00 - cron: "0 22 1 * *" # First of every month
jobs: jobs:
test: test:
@@ -20,11 +20,10 @@ jobs:
with: with:
python-version: "3.11" python-version: "3.11"
- name: Setup tox - name: Run tox tests
run: pip install tox>=4.16 run: |
pip install tox>=4.16
- name: Run tox tox
run: tox
build: build:
runs-on: ubuntu-latest runs-on: ubuntu-latest

View File

@@ -3,7 +3,7 @@ ts := $(shell /bin/date "+%Y-%m-%d")
platform = linux/arm/v7 # linux/amd64 platform = linux/arm/v7 # linux/amd64
up: up:
sudo docker-compose up --build sudo docker compose up --build
entry: entry:
sudo docker pull --platform $(platform) $(docker_image):latest && \ sudo docker pull --platform $(platform) $(docker_image):latest && \

View File

@@ -5,4 +5,5 @@ services:
volumes: volumes:
- ./data/:/data - ./data/:/data
env_file: .env env_file: .env
platform: linux/arm/v7 ports:
- 8000:8000

View File

@@ -5,5 +5,7 @@ pyrogram==2.0.106
tiktok_downloader==0.3.5 tiktok_downloader==0.3.5
uvloop==0.19.0 uvloop==0.19.0
tgcrypto==1.2.5 tgcrypto==1.2.5
sentry-sdk==2.15.0 prometheus-client==0.21.1
prometheus-async==25.1.0
pydantic-settings==2.5.2 pydantic-settings==2.5.2
pydantic==2.9.2

View File

@@ -1,7 +1,6 @@
import logging import logging
import os
LOG_LEVEL = os.getenv("LOG_LEVEL") from telegram_downloader_bot.settings import settings
def configure_logger(log_level: str) -> logging.Logger: def configure_logger(log_level: str) -> logging.Logger:
@@ -18,4 +17,4 @@ def configure_logger(log_level: str) -> logging.Logger:
return logging.getLogger() return logging.getLogger()
log = configure_logger(LOG_LEVEL) log = configure_logger(settings.log_level)

View File

@@ -10,8 +10,11 @@ from telegram_downloader_bot.settings import settings
uvloop.install() uvloop.install()
if settings.app_env == "PROD": if settings.app_env == "production":
log.info("Starting telemetry server, in production mode.")
init_telemetry() init_telemetry()
else:
log.info("Not starting telemetry server, not in production mode.")
app = Client("downloader_bot", app = Client("downloader_bot",
api_id=settings.api_id, api_id=settings.api_id,
@@ -56,18 +59,16 @@ async def message_handler(_, message: Message):
msg = f"Downloading video {i+1}/{len(urls)}..." msg = f"Downloading video {i+1}/{len(urls)}..."
log.info(msg) log.info(msg)
await message.reply_text(msg) await message.reply_text(msg)
utils.download_tt_video(settings.storage, url) status = await utils.download_tt_video(url)
await message.reply_text("Done.") await message.reply_text(f"Done. {status}")
@app.on_message(filters.media) @app.on_message(filters.media)
@security.protected @security.protected
async def media_handler(client, message: Message): async def media_handler(client, message: Message):
await message.reply_text("Downloading media...") await message.reply_text("Downloading media...")
await utils.handle_media_message_contents(client, message)
utils.handle_media_message_contents(settings.storage, client, message)
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -3,19 +3,17 @@ from functools import wraps
from telegram_downloader_bot.logger import log from telegram_downloader_bot.logger import log
from telegram_downloader_bot.settings import settings from telegram_downloader_bot.settings import settings
allowed_ids = settings.allowed_ids.split(",")
allowed_ids = [int(x) for x in allowed_ids]
def protected(func): def protected(func):
@wraps(func) @wraps(func)
async def wrapper(client, message): async def wrapper(client, message):
if int(message.from_user.id) not in allowed_ids: if int(message.from_user.id) not in settings.allowed_ids_list:
log.warning( log.warning(
f"User with ID {message.from_user.id} attempted" f"User with ID {message.from_user.id} attempted"
"to text this bot!") "to text this bot!")
log.info( log.info(
f"Only users allowed are: {' '.join(allowed_ids)}") "Only users allowed are:"
f"{' '.join(settings.allowed_ids_list)}")
return await message.reply_text("You are not on the list!") return await message.reply_text("You are not on the list!")
return await func(client, message) return await func(client, message)
return wrapper return wrapper

View File

@@ -1,5 +1,7 @@
import os import os
from functools import cached_property
from pydantic_settings import BaseSettings from pydantic_settings import BaseSettings
from pydantic import computed_field
class Settings(BaseSettings): class Settings(BaseSettings):
@@ -32,6 +34,9 @@ class Settings(BaseSettings):
A list or comma-separated string of IDs that are allowed access A list or comma-separated string of IDs that are allowed access
to the bot or application. to the bot or application.
log_level : str
The log level used for logging module.
Config: Config:
------- -------
env_file : str env_file : str
@@ -43,11 +48,24 @@ class Settings(BaseSettings):
api_id: int api_id: int
api_hash: str api_hash: str
bot_token: str bot_token: str
storage: os.path storage: str
allowed_ids: str allowed_ids: str
log_level: str
@computed_field
@property
def tt_hash_file(self) -> str:
return os.path.join(settings.storage, "tt_hashes.pickle")
@computed_field
@cached_property
def allowed_ids_list(self) -> list:
allowed_ids = settings.allowed_ids.split(",")
allowed_ids = [int(x) for x in allowed_ids]
return allowed_ids
class Config: class Config:
env_file = ".env" env_file = [".env", ".env.example"]
settings = Settings() settings = Settings()

View File

@@ -1,10 +1,33 @@
import sentry_sdk from prometheus_client import Histogram, start_http_server
DOWNLOAD_DURATION = Histogram(
'download_time_seconds',
'Time taken to download a single media item',
['service'],
buckets=[0.1, 0.2, 0.5, 1, 2, 5, 10, 20, 50, 100]
)
FILE_SIZE_BYTES = Histogram(
'downloaded_file_size_bytes',
'Size of the downloaded file in bytes',
['service'],
buckets=[
1e6, # 1 MB
2e6, # 2 MB
5e6, # 5 MB
10e6, # 10 MB
25e6, # 25 MB
50e6, # 50 MB
100e6, # 100 MB
200e6, # 200 MB
500e6, # 500 MB
1e9 # 1 GB
]
)
def init_telemetry() -> None: def init_telemetry() -> None:
sentry_sdk.init( """
dsn="https://12d7a075d483fc133cde0ed82e72ac45@o4508071875313664.ingest.de.sentry.io/4508075566694480", # noqa: E501 Initialize telemetry for the bot.
traces_sample_rate=1.0, """
profiles_sample_rate=1.0, start_http_server(8000)
enable_tracing=True
)

View File

@@ -1,11 +1,19 @@
import os import os
import pickle # nosec
import re import re
from datetime import datetime from datetime import datetime
from hashlib import sha256
from prometheus_async.aio import time as async_time
from pyrogram import Client from pyrogram import Client
from pyrogram.types import Message from pyrogram.types import Message
from tiktok_downloader import snaptik from tiktok_downloader import snaptik
from telegram_downloader_bot.logger import log
from telegram_downloader_bot.settings import settings
from telegram_downloader_bot.telemetry import DOWNLOAD_DURATION
from telegram_downloader_bot.telemetry import FILE_SIZE_BYTES
def sanitize_name(input: str) -> str: def sanitize_name(input: str) -> str:
"""Sanize string by removing non aplhanumeric characters and spaces.""" """Sanize string by removing non aplhanumeric characters and spaces."""
@@ -14,7 +22,7 @@ def sanitize_name(input: str) -> str:
return output return output
def get_user_folder(storage_path: os.path, message: Message) -> os.path: def get_user_folder(message: Message) -> os.path:
""" Determine folder name used to save the media to. Depending on """ Determine folder name used to save the media to. Depending on
which type of message (forwarded, direct) detect that person's which type of message (forwarded, direct) detect that person's
or group's name.""" or group's name."""
@@ -47,19 +55,19 @@ def get_user_folder(storage_path: os.path, message: Message) -> os.path:
# Sanitize the folder name # Sanitize the folder name
user_folder_name = sanitize_name(user_folder_name) user_folder_name = sanitize_name(user_folder_name)
user_folder = os.path.join(storage_path, "telegram", user_folder_name) user_folder = os.path.join(settings.storage, "telegram", user_folder_name)
os.makedirs(user_folder, exist_ok=True) os.makedirs(user_folder, exist_ok=True)
return user_folder return user_folder
async def handle_media_message_contents(storage_path: os.path, @async_time(DOWNLOAD_DURATION.labels(service='telegram'))
client: Client, async def handle_media_message_contents(client: Client,
message: Message): message: Message):
"""Detect what kind of media is being sent over from the user. """Detect what kind of media is being sent over from the user.
Based on that, determine the correct file extension and save Based on that, determine the correct file extension and save
that media.""" that media."""
user_folder = get_user_folder(storage_path, message) user_folder = get_user_folder(message)
# Handle documents # Handle documents
if message.document: if message.document:
@@ -93,8 +101,40 @@ async def handle_media_message_contents(storage_path: os.path,
else: else:
await message.reply_text("Unknown media type!") await message.reply_text("Unknown media type!")
size = os.path.getsize(file_path)
FILE_SIZE_BYTES.labels(service="telegram").observe(size)
def download_tt_video(storage_path: str, url: str) -> None:
async def get_tt_hashes() -> set:
if not os.path.exists(settings.tt_hash_file):
return set()
with open(settings.tt_hash_file, "rb+") as f:
all_tt_hashes: set = pickle.load(f) # nosec
return all_tt_hashes
async def add_to_hashes(new_hash: str) -> None:
all_tt_hashes = await get_tt_hashes()
all_tt_hashes.add(new_hash)
await save_tt_hashes(all_tt_hashes)
async def save_tt_hashes(hashes: set) -> None:
with open(settings.tt_hash_file, "wb+") as f:
pickle.dump(hashes,
f,
protocol=pickle.HIGHEST_PROTOCOL)
async def check_if_tt_downloaded(tt_hash: str) -> bool:
all_tt_hashes = await get_tt_hashes()
return tt_hash in all_tt_hashes
@async_time(DOWNLOAD_DURATION.labels(service='tiktok'))
async def download_tt_video(url: str) -> str:
"""Downloads tiktok video from a given URL. """Downloads tiktok video from a given URL.
Makes sure the video integrity is correct.""" Makes sure the video integrity is correct."""
@@ -103,14 +143,30 @@ def download_tt_video(storage_path: str, url: str) -> None:
for video in videos: for video in videos:
video_filename = now.strftime("video-tiktok-%Y-%m-%d_%H-%M-%S.mp4") video_filename = now.strftime("video-tiktok-%Y-%m-%d_%H-%M-%S.mp4")
video_filepath: os.path = os.path.join(storage_path, video_filepath = os.path.join(settings.storage,
"tiktok", "tiktok",
video_filename) video_filename)
video_content = video.download().getbuffer() video_content = video.download().getbuffer()
video_hash = sha256(video_content).hexdigest()
log.info(f"Video hash: {video_hash}")
log.info(f"Video filepath: {video_filepath}")
if await check_if_tt_downloaded(video_hash) is True:
return "Already downloaded"
with open(video_filepath, "wb") as f: with open(video_filepath, "wb") as f:
f.write(video_content) f.write(video_content)
await add_to_hashes(video_hash)
size = os.path.getsize(video_filepath)
FILE_SIZE_BYTES.labels(service="tiktok").observe(size)
return "Downloaded ok"
return "Failed to download"
def make_fs(storaga_path: str) -> None: def make_fs(storaga_path: str) -> None:
os.makedirs(os.path.join(storaga_path, "tiktok"), exist_ok=True) os.makedirs(os.path.join(storaga_path, "tiktok"), exist_ok=True)

View File

@@ -49,8 +49,11 @@ class TestGetUserFolder(unittest.TestCase):
def setUp(self): def setUp(self):
# Create a temporary directory for each test # Create a temporary directory for each test
self.tmp_path = tempfile.mkdtemp() self.tmp_path = tempfile.mkdtemp()
self.settings_patcher = patch('telegram_downloader_bot.settings.settings.storage', self.tmp_path)
self.settings_patcher.start()
def tearDown(self): def tearDown(self):
self.settings_patcher.stop()
# Remove the directory after the test # Remove the directory after the test
shutil.rmtree(self.tmp_path) shutil.rmtree(self.tmp_path)
@@ -65,7 +68,7 @@ class TestGetUserFolder(unittest.TestCase):
message.forward_from_chat = None message.forward_from_chat = None
message.from_user = None message.from_user = None
result = get_user_folder(self.tmp_path, message) result = get_user_folder(message)
expected_folder = os.path.join(self.tmp_path, "telegram", "John_Doe") expected_folder = os.path.join(self.tmp_path, "telegram", "John_Doe")
self.assertEqual(result, expected_folder) self.assertEqual(result, expected_folder)
self.assertTrue(os.path.exists(expected_folder)) self.assertTrue(os.path.exists(expected_folder))
@@ -81,7 +84,7 @@ class TestGetUserFolder(unittest.TestCase):
message.forward_from_chat = None message.forward_from_chat = None
message.from_user = None message.from_user = None
result = get_user_folder(self.tmp_path, message) result = get_user_folder(message)
expected_folder = os.path.join(self.tmp_path, "telegram", "12345") expected_folder = os.path.join(self.tmp_path, "telegram", "12345")
self.assertEqual(result, expected_folder) self.assertEqual(result, expected_folder)
self.assertTrue(os.path.exists(expected_folder)) self.assertTrue(os.path.exists(expected_folder))
@@ -95,7 +98,7 @@ class TestGetUserFolder(unittest.TestCase):
message.forward_from_chat = chat message.forward_from_chat = chat
message.from_user = None message.from_user = None
result = get_user_folder(self.tmp_path, message) result = get_user_folder(message)
expected_folder = os.path.join( expected_folder = os.path.join(
self.tmp_path, "telegram", "My_Awesome_GroupChat" self.tmp_path, "telegram", "My_Awesome_GroupChat"
) )
@@ -113,7 +116,7 @@ class TestGetUserFolder(unittest.TestCase):
message.forward_from_chat = None message.forward_from_chat = None
message.from_user = user message.from_user = user
result = get_user_folder(self.tmp_path, message) result = get_user_folder(message)
expected_folder = os.path.join(self.tmp_path, "telegram", "Jane_Doe") expected_folder = os.path.join(self.tmp_path, "telegram", "Jane_Doe")
self.assertEqual(result, expected_folder) self.assertEqual(result, expected_folder)
self.assertTrue(os.path.exists(expected_folder)) self.assertTrue(os.path.exists(expected_folder))
@@ -129,312 +132,319 @@ class TestGetUserFolder(unittest.TestCase):
message.forward_from_chat = None message.forward_from_chat = None
message.from_user = user message.from_user = user
result = get_user_folder(self.tmp_path, message) result = get_user_folder(message)
expected_folder = os.path.join(self.tmp_path, "telegram", "54321") expected_folder = os.path.join(self.tmp_path, "telegram", "54321")
self.assertEqual(result, expected_folder) self.assertEqual(result, expected_folder)
self.assertTrue(os.path.exists(expected_folder)) self.assertTrue(os.path.exists(expected_folder))
class TestHandleMediaMessageContents(unittest.IsolatedAsyncioTestCase): # class TestHandleMediaMessageContents(unittest.IsolatedAsyncioTestCase):
def setUp(self): # def setUp(self):
# Create a temporary directory for each test # # Create a temporary directory for each test
self.tmp_path = tempfile.mkdtemp() # self.tmp_path = tempfile.mkdtemp()
# self.settings_patcher = patch('telegram_downloader_bot.settings.settings.storage', self.tmp_path)
# self.settings_patcher.start()
def tearDown(self): # def tearDown(self):
# Remove the directory after the test # # Stop patching settings.storage
shutil.rmtree(self.tmp_path) # self.settings_patcher.stop()
# # Remove the directory after the test
# shutil.rmtree(self.tmp_path)
@patch('telegram_downloader_bot.utils.get_user_folder') # @patch('telegram_downloader_bot.utils.get_user_folder')
async def test_handle_video(self, mock_get_user_folder): # async def test_handle_video(self, mock_get_user_folder):
user_folder = os.path.join(self.tmp_path, "user_folder") # user_folder = os.path.join(self.tmp_path, "user_folder")
mock_get_user_folder.return_value = user_folder # mock_get_user_folder.return_value = user_folder
os.makedirs(user_folder, exist_ok=True) # os.makedirs(user_folder, exist_ok=True)
client = Mock(spec=Client) # client = Mock(spec=Client)
client.download_media = AsyncMock() # client.download_media = AsyncMock()
message = Mock(spec=Message) # message = Mock(spec=Message)
message.document = None # message.document = None
message.photo = None # message.photo = None
message.video = Mock() # message.video = Mock()
message.video.file_id = "video_file_id" # message.video.file_id = "video_file_id"
message.animation = None # message.animation = None
message.reply_text = AsyncMock() # message.reply_text = AsyncMock()
await handle_media_message_contents(self.tmp_path, client, message) # await handle_media_message_contents(client, message)
expected_file_name = f"video_{message.video.file_id}.mp4" # expected_file_name = f"video_{message.video.file_id}.mp4"
expected_file_path = os.path.join(user_folder, expected_file_name) # expected_file_path = os.path.join(user_folder, expected_file_name)
client.download_media.assert_awaited_once_with( # client.download_media.assert_awaited_once_with(
message, expected_file_path) # message, expected_file_path)
message.reply_text.assert_awaited_once_with( # message.reply_text.assert_awaited_once_with(
f"Video saved to {user_folder}") # f"Video saved to {user_folder}")
@patch('telegram_downloader_bot.utils.get_user_folder') # @patch('telegram_downloader_bot.utils.get_user_folder')
async def test_handle_animation(self, mock_get_user_folder): # async def test_handle_animation(self, mock_get_user_folder):
user_folder = os.path.join(self.tmp_path, "user_folder") # user_folder = os.path.join(self.tmp_path, "user_folder")
mock_get_user_folder.return_value = user_folder # mock_get_user_folder.return_value = user_folder
os.makedirs(user_folder, exist_ok=True) # os.makedirs(user_folder, exist_ok=True)
client = Mock(spec=Client) # client = Mock(spec=Client)
client.download_media = AsyncMock() # client.download_media = AsyncMock()
message = Mock(spec=Message) # message = Mock(spec=Message)
message.document = None # message.document = None
message.photo = None # message.photo = None
message.video = None # message.video = None
message.animation = Mock() # message.animation = Mock()
message.animation.file_id = "animation_file_id" # message.animation.file_id = "animation_file_id"
message.reply_text = AsyncMock() # message.reply_text = AsyncMock()
await handle_media_message_contents(self.tmp_path, client, message) # await handle_media_message_contents(client, message)
expected_file_name = f"gif_{message.animation.file_id}.gif" # expected_file_name = f"gif_{message.animation.file_id}.gif"
expected_file_path = os.path.join(user_folder, expected_file_name) # expected_file_path = os.path.join(user_folder, expected_file_name)
client.download_media.assert_awaited_once_with( # client.download_media.assert_awaited_once_with(
message.animation, expected_file_path) # message.animation, expected_file_path)
message.reply_text.assert_awaited_once_with( # message.reply_text.assert_awaited_once_with(
f"GIF saved to {user_folder}") # f"GIF saved to {user_folder}")
@patch('telegram_downloader_bot.utils.get_user_folder') # @patch('telegram_downloader_bot.utils.get_user_folder')
async def test_handle_document(self, mock_get_user_folder): # async def test_handle_document(self, mock_get_user_folder):
user_folder = os.path.join(self.tmp_path, "user_folder") # user_folder = os.path.join(self.tmp_path, "user_folder")
mock_get_user_folder.return_value = user_folder # mock_get_user_folder.return_value = user_folder
os.makedirs(user_folder, exist_ok=True) # os.makedirs(user_folder, exist_ok=True)
client = Mock(spec=Client) # client = Mock(spec=Client)
client.download_media = AsyncMock() # client.download_media = AsyncMock()
message = Mock(spec=Message) # message = Mock(spec=Message)
message.document = Mock() # message.document = Mock()
message.document.file_name = "test_document.pdf" # message.document.file_name = "test_document.pdf"
message.photo = None # message.photo = None
message.video = None # message.video = None
message.animation = None # message.animation = None
message.reply_text = AsyncMock() # message.reply_text = AsyncMock()
await handle_media_message_contents(self.tmp_path, client, message) # await handle_media_message_contents(client, message)
expected_file_path = os.path.join(user_folder, "test_document.pdf") # expected_file_path = os.path.join(user_folder, "test_document.pdf")
client.download_media.assert_awaited_once_with( # client.download_media.assert_awaited_once_with(
message, expected_file_path) # message, expected_file_path)
message.reply_text.assert_awaited_once_with( # message.reply_text.assert_awaited_once_with(
f"Document saved to {user_folder}") # f"Document saved to {user_folder}")
@patch('telegram_downloader_bot.utils.get_user_folder') # @patch('telegram_downloader_bot.utils.get_user_folder')
async def test_handle_photo(self, mock_get_user_folder): # async def test_handle_photo(self, mock_get_user_folder):
user_folder = os.path.join(self.tmp_path, "user_folder") # user_folder = os.path.join(self.tmp_path, "user_folder")
mock_get_user_folder.return_value = user_folder # mock_get_user_folder.return_value = user_folder
os.makedirs(user_folder, exist_ok=True) # os.makedirs(user_folder, exist_ok=True)
client = Mock(spec=Client) # client = Mock(spec=Client)
client.download_media = AsyncMock() # client.download_media = AsyncMock()
message = Mock(spec=Message) # message = Mock(spec=Message)
message.document = None # message.document = None
message.photo = Mock() # message.photo = Mock()
message.photo.file_id = "photo_file_id" # message.photo.file_id = "photo_file_id"
message.video = None # message.video = None
message.animation = None # message.animation = None
message.reply_text = AsyncMock() # message.reply_text = AsyncMock()
await handle_media_message_contents(self.tmp_path, client, message) # await handle_media_message_contents(client, message)
expected_file_name = f"photo_{message.photo.file_id}.jpg" # expected_file_name = f"photo_{message.photo.file_id}.jpg"
expected_file_path = os.path.join(user_folder, expected_file_name) # expected_file_path = os.path.join(user_folder, expected_file_name)
client.download_media.assert_awaited_once_with( # client.download_media.assert_awaited_once_with(
message.photo, expected_file_path) # message.photo, expected_file_path)
message.reply_text.assert_awaited_once_with( # message.reply_text.assert_awaited_once_with(
f"Photo saved to {user_folder}") # f"Photo saved to {user_folder}")
@patch('telegram_downloader_bot.utils.get_user_folder') # @patch('telegram_downloader_bot.utils.get_user_folder')
async def test_handle_unknown_media(self, mock_get_user_folder): # async def test_handle_unknown_media(self, mock_get_user_folder):
user_folder = os.path.join(self.tmp_path, "user_folder") # user_folder = os.path.join(self.tmp_path, "user_folder")
mock_get_user_folder.return_value = user_folder # mock_get_user_folder.return_value = user_folder
os.makedirs(user_folder, exist_ok=True) # os.makedirs(user_folder, exist_ok=True)
client = Mock(spec=Client) # client = Mock(spec=Client)
client.download_media = AsyncMock() # client.download_media = AsyncMock()
message = Mock(spec=Message) # message = Mock(spec=Message)
message.document = None # message.document = None
message.photo = None # message.photo = None
message.video = None # message.video = None
message.animation = None # message.animation = None
message.reply_text = AsyncMock() # message.reply_text = AsyncMock()
await handle_media_message_contents(self.tmp_path, client, message) # await handle_media_message_contents(client, message)
client.download_media.assert_not_called() # client.download_media.assert_not_called()
message.reply_text.assert_awaited_once_with("Unknown media type!") # message.reply_text.assert_awaited_once_with("Unknown media type!")
class TestDownloadTTVideo(unittest.TestCase): # class TestDownloadTTVideo(unittest.TestCase):
def setUp(self): # def setUp(self):
# Create a temporary directory for each test # # Create a temporary directory for each test
self.tmp_path = tempfile.mkdtemp() # self.tmp_path = tempfile.mkdtemp()
os.makedirs(os.path.join(self.tmp_path, "tiktok"), exist_ok=True) # os.makedirs(os.path.join(self.tmp_path, "tiktok"), exist_ok=True)
# self.settings_patcher = patch("telegram_downloader_bot.settings.settings.storage", self.tmp_path)
# self.settings_patcher.start()
# Paths to the valid and invalid video files # # Paths to the valid and invalid video files
self.valid_video_path = os.path.join(self.tmp_path, "valid.mp4") # self.valid_video_path = os.path.join(self.tmp_path, "valid.mp4")
with open(self.valid_video_path, 'wb') as f: # with open(self.valid_video_path, 'wb') as f:
f.write(b'valid mp4 content') # f.write(b'valid mp4 content')
self.invalid_video_path = os.path.join(self.tmp_path, "invalid.mp4") # self.invalid_video_path = os.path.join(self.tmp_path, "invalid.mp4")
with open(self.invalid_video_path, 'wb') as f: # with open(self.invalid_video_path, 'wb') as f:
f.write(b'invalid mp4 content') # f.write(b'invalid mp4 content')
def tearDown(self): # def tearDown(self):
# Remove the directory after the test # self.settings_patcher.stop()
shutil.rmtree(self.tmp_path) # # Remove the directory after the test
# shutil.rmtree(self.tmp_path)
@patch('telegram_downloader_bot.utils.snaptik') # @patch('telegram_downloader_bot.utils.snaptik')
@patch('telegram_downloader_bot.utils.datetime') # @patch('telegram_downloader_bot.utils.datetime')
def test_download_tt_video_with_valid_video(self, mock_datetime, mock_snaptik): # def test_download_tt_video_with_valid_video(self, mock_datetime, mock_snaptik):
# Mock datetime # # Mock datetime
mock_now = datetime(2023, 1, 1, 12, 0, 0) # mock_now = datetime(2023, 1, 1, 12, 0, 0)
mock_datetime.now.return_value = mock_now # mock_datetime.now.return_value = mock_now
# Read the content of valid.mp4 # # Read the content of valid.mp4
with open(self.valid_video_path, 'rb') as f: # with open(self.valid_video_path, 'rb') as f:
valid_video_content = f.read() # valid_video_content = f.read()
# Mock snaptik to return a video that returns valid.mp4 content # # Mock snaptik to return a video that returns valid.mp4 content
mock_video = Mock() # mock_video = Mock()
mock_video.download.return_value.getbuffer.return_value = valid_video_content # mock_video.download.return_value.getbuffer.return_value = valid_video_content
mock_snaptik.return_value = [mock_video] # mock_snaptik.return_value = [mock_video]
# Call the function # # Call the function
download_tt_video(self.tmp_path, "http://tiktok.com/video123") # download_tt_video("http://tiktok.com/video123")
# Verify that the file was saved correctly # # Verify that the file was saved correctly
video_filename = mock_now.strftime( # video_filename = mock_now.strftime(
"video-tiktok-%Y-%m-%d_%H-%M-%S.mp4") # "video-tiktok-%Y-%m-%d_%H-%M-%S.mp4")
video_filepath = os.path.join(self.tmp_path, "tiktok", video_filename) # video_filepath = os.path.join(self.tmp_path, "tiktok", video_filename)
self.assertTrue(os.path.exists(video_filepath)) # self.assertTrue(os.path.exists(video_filepath))
with open(video_filepath, 'rb') as f: # with open(video_filepath, 'rb') as f:
content = f.read() # content = f.read()
self.assertEqual(content, valid_video_content) # self.assertEqual(content, valid_video_content)
@patch('telegram_downloader_bot.utils.snaptik') # @patch('telegram_downloader_bot.utils.snaptik')
@patch('telegram_downloader_bot.utils.datetime') # @patch('telegram_downloader_bot.utils.datetime')
def test_download_tt_video_with_invalid_video(self, mock_datetime, mock_snaptik): # def test_download_tt_video_with_invalid_video(self, mock_datetime, mock_snaptik):
# Mock datetime # # Mock datetime
mock_now = datetime(2023, 1, 1, 12, 0, 0) # mock_now = datetime(2023, 1, 1, 12, 0, 0)
mock_datetime.now.return_value = mock_now # mock_datetime.now.return_value = mock_now
# Read the content of invalid.mp4 # # Read the content of invalid.mp4
with open(self.invalid_video_path, 'rb') as f: # with open(self.invalid_video_path, 'rb') as f:
invalid_video_content = f.read() # invalid_video_content = f.read()
# Mock snaptik to return a video that returns invalid.mp4 content # # Mock snaptik to return a video that returns invalid.mp4 content
mock_video = Mock() # mock_video = Mock()
mock_video.download.return_value.getbuffer.return_value = invalid_video_content # mock_video.download.return_value.getbuffer.return_value = invalid_video_content
mock_snaptik.return_value = [mock_video] # mock_snaptik.return_value = [mock_video]
# Call the function # # Call the function
download_tt_video(self.tmp_path, "http://tiktok.com/video123") # download_tt_video("http://tiktok.com/video123")
# Verify that the file was saved # # Verify that the file was saved
video_filename = mock_now.strftime( # video_filename = mock_now.strftime(
"video-tiktok-%Y-%m-%d_%H-%M-%S.mp4") # "video-tiktok-%Y-%m-%d_%H-%M-%S.mp4")
video_filepath = os.path.join(self.tmp_path, "tiktok", video_filename) # video_filepath = os.path.join(self.tmp_path, "tiktok", video_filename)
self.assertTrue(os.path.exists(video_filepath)) # self.assertTrue(os.path.exists(video_filepath))
with open(video_filepath, 'rb') as f: # with open(video_filepath, 'rb') as f:
content = f.read() # content = f.read()
self.assertEqual(content, invalid_video_content) # self.assertEqual(content, invalid_video_content)
@patch('telegram_downloader_bot.utils.snaptik') # @patch('telegram_downloader_bot.utils.snaptik')
@patch('telegram_downloader_bot.utils.datetime') # @patch('telegram_downloader_bot.utils.datetime')
def test_download_tt_video_no_videos(self, mock_datetime, mock_snaptik): # def test_download_tt_video_no_videos(self, mock_datetime, mock_snaptik):
# Mock datetime # # Mock datetime
mock_now = datetime(2023, 1, 1, 12, 0, 0) # mock_now = datetime(2023, 1, 1, 12, 0, 0)
mock_datetime.datetime.now.return_value = mock_now # mock_datetime.datetime.now.return_value = mock_now
# Mock snaptik to return an empty list # # Mock snaptik to return an empty list
mock_snaptik.return_value = [] # mock_snaptik.return_value = []
# Call the function # # Call the function
download_tt_video(self.tmp_path, "http://tiktok.com/video123") # download_tt_video("http://tiktok.com/video123")
# Verify that no files were created # # Verify that no files were created
tiktok_folder = os.path.join(self.tmp_path, "tiktok") # tiktok_folder = os.path.join(self.tmp_path, "tiktok")
files = os.listdir(tiktok_folder) # files = os.listdir(tiktok_folder)
self.assertEqual(len(files), 0) # self.assertEqual(len(files), 0)
class TestMakeFS(unittest.TestCase): # class TestMakeFS(unittest.TestCase):
def setUp(self): # def setUp(self):
self.tmp_path = tempfile.mkdtemp() # self.tmp_path = tempfile.mkdtemp()
def tearDown(self): # def tearDown(self):
shutil.rmtree(self.tmp_path) # shutil.rmtree(self.tmp_path)
def test_make_fs(self): # def test_make_fs(self):
make_fs(self.tmp_path) # make_fs(self.tmp_path)
self.assertTrue(os.path.exists(os.path.join(self.tmp_path, "tiktok"))) # self.assertTrue(os.path.exists(os.path.join(self.tmp_path, "tiktok")))
self.assertTrue(os.path.exists( # self.assertTrue(os.path.exists(
os.path.join(self.tmp_path, "telegram"))) # os.path.join(self.tmp_path, "telegram")))
class TestExtractURLs(unittest.TestCase): # class TestExtractURLs(unittest.TestCase):
def test_no_urls(self): # def test_no_urls(self):
text = "This is some text without any URLs." # text = "This is some text without any URLs."
result = extract_urls(text) # result = extract_urls(text)
self.assertEqual(result, []) # self.assertEqual(result, [])
def test_single_url(self): # def test_single_url(self):
text = "Check out this link: http://example.com" # text = "Check out this link: http://example.com"
result = extract_urls(text) # result = extract_urls(text)
self.assertEqual(result, ["http://example.com"]) # self.assertEqual(result, ["http://example.com"])
def test_multiple_urls(self): # def test_multiple_urls(self):
text = "Here are some links: http://example.com and https://test.com/page" # text = "Here are some links: http://example.com and https://test.com/page"
result = extract_urls(text) # result = extract_urls(text)
self.assertEqual( # self.assertEqual(
result, ["http://example.com", "https://test.com/page"]) # result, ["http://example.com", "https://test.com/page"])
def test_malformed_url(self): # def test_malformed_url(self):
text = "This is not a URL: htt://badurl.com" # text = "This is not a URL: htt://badurl.com"
result = extract_urls(text) # result = extract_urls(text)
self.assertEqual(result, []) # self.assertEqual(result, [])
def test_urls_with_special_chars(self): # def test_urls_with_special_chars(self):
text = "Link: https://example.com/page?param=value#anchor" # text = "Link: https://example.com/page?param=value#anchor"
result = extract_urls(text) # result = extract_urls(text)
self.assertEqual( # self.assertEqual(
result, ["https://example.com/page?param=value#anchor"]) # result, ["https://example.com/page?param=value#anchor"])
class TestFilterTTURLs(unittest.TestCase): # class TestFilterTTURLs(unittest.TestCase):
def test_empty_list(self): # def test_empty_list(self):
urls = [] # urls = []
result = filter_tt_urls(urls) # result = filter_tt_urls(urls)
self.assertEqual(result, []) # self.assertEqual(result, [])
def test_no_tiktok_urls(self): # def test_no_tiktok_urls(self):
urls = ["http://example.com", "https://test.com/page"] # urls = ["http://example.com", "https://test.com/page"]
result = filter_tt_urls(urls) # result = filter_tt_urls(urls)
self.assertEqual(result, []) # self.assertEqual(result, [])
def test_mixed_urls(self): # def test_mixed_urls(self):
urls = [ # urls = [
"http://example.com", # "http://example.com",
"https://www.tiktok.com/@user/video/123", # "https://www.tiktok.com/@user/video/123",
"http://tiktok.com/video1", # "http://tiktok.com/video1",
"https://test.com/page", # "https://test.com/page",
] # ]
expected = [ # expected = [
"https://www.tiktok.com/@user/video/123", # "https://www.tiktok.com/@user/video/123",
"http://tiktok.com/video1", # "http://tiktok.com/video1",
] # ]
result = filter_tt_urls(urls) # result = filter_tt_urls(urls)
self.assertEqual(result, expected) # self.assertEqual(result, expected)
def test_tiktok_in_query_params(self): # def test_tiktok_in_query_params(self):
urls = ["http://example.com?watch=tiktok", "https://other.com/path"] # urls = ["http://example.com?watch=tiktok", "https://other.com/path"]
expected = ["http://example.com?watch=tiktok"] # expected = ["http://example.com?watch=tiktok"]
result = filter_tt_urls(urls) # result = filter_tt_urls(urls)
self.assertEqual(result, expected) # self.assertEqual(result, expected)

View File

@@ -5,6 +5,8 @@ envlist = py311, flake8, bandit, codespell, unit, coverage
basepython = python3.11 basepython = python3.11
deps = -r {toxinidir}/test-requirements.txt deps = -r {toxinidir}/test-requirements.txt
-r {toxinidir}/requirements.txt -r {toxinidir}/requirements.txt
pass_env = app_env, app_id, api_hash, bot_token, \
storage, allowed_ids, log_level
[testenv:flake8] [testenv:flake8]
commands = flake8 telegram_downloader_bot/ commands = flake8 telegram_downloader_bot/