Compare commits

..

27 Commits

Author SHA1 Message Date
f08b9d1285 Build release update only once a month
All checks were successful
Build Docker image / test (push) Successful in 2m40s
Build Docker image / build (push) Successful in 4m50s
2025-05-29 11:09:51 +02:00
02cbbee625 revert ac1648f56c
All checks were successful
Build Docker image / test (push) Successful in 4m30s
Build Docker image / build (push) Successful in 5m17s
revert Use latest image as cache
Cache needs to be a separate image as it has different format
2025-05-14 11:03:21 +02:00
Roman Krček
ac1648f56c Use latest image as cache
All checks were successful
Build Docker image / test (push) Successful in 2m6s
Build Docker image / build (push) Successful in 5m3s
2025-05-13 18:21:39 +02:00
Roman Krček
b01a799a94 Fix timing of async functions and make histogram finer at lower ranges
Some checks failed
Build Docker image / test (push) Successful in 4m34s
Build Docker image / build (push) Has been cancelled
2025-05-13 18:14:36 +02:00
Roman Krček
020a6271cf Better debugging
All checks were successful
Build Docker image / test (push) Successful in 3m59s
Build Docker image / build (push) Successful in 5m8s
2025-05-09 22:44:52 +02:00
Roman Krček
df5538d7ee Remove incorrect remove statement
All checks were successful
Build Docker image / test (push) Successful in 4m53s
Build Docker image / build (push) Successful in 1m40s
2025-05-09 22:27:45 +02:00
Roman Krček
4b74810912 Remove logging and duplicate videos
All checks were successful
Build Docker image / test (push) Successful in 2m54s
Build Docker image / build (push) Successful in 1m14s
2025-05-06 20:24:22 +02:00
Roman Krček
8829caceee Add prometheus telemetry
All checks were successful
Build Docker image / test (push) Successful in 5m36s
Build Docker image / build (push) Successful in 8m19s
2025-05-06 20:06:36 +02:00
Roman Krček
8cc1c55026 Fix awaits for async functions
Some checks failed
Build Docker image / test (push) Has started running
Build Docker image / build (push) Has been cancelled
2025-02-16 17:23:25 +01:00
Roman Krček
e12eaa0fe1 Fix makefile for moder docker compose 2025-02-16 17:22:11 +01:00
Roman Krček
f2fcea2333 Add env fix for CI and remove caching
All checks were successful
Build Docker image / test (push) Successful in 2m13s
Build Docker image / build (push) Successful in 4m41s
2024-10-14 09:38:02 +02:00
Roman Krček
98250acd21 Add passenv to tox.ini
Some checks failed
Build Docker image / test (push) Failing after 2m42s
Build Docker image / build (push) Has been skipped
2024-10-14 09:33:52 +02:00
Roman Krček
21afea7f39 Add caching and test different style of env import
Some checks failed
Build Docker image / test (push) Failing after 3m17s
Build Docker image / build (push) Has been skipped
2024-10-14 09:22:47 +02:00
Roman Krček
7253745a50 Add example .env file so unittests pass in CI environment
Some checks failed
Build Docker image / test (push) Failing after 2m40s
Build Docker image / build (push) Has been skipped
2024-10-14 09:16:07 +02:00
Roman Krček
3075743c5d Fix linter issues and unittests
Some checks failed
Build Docker image / build (push) Has been skipped
Build Docker image / test (push) Failing after 2m41s
2024-10-13 21:18:18 +02:00
Roman Krček
6d508121b0 Add hash checking to TT downloading 2024-10-13 21:07:56 +02:00
Roman Krček
47248f10ab Add computed properties to settings 2024-10-13 21:05:51 +02:00
Roman Krček
47472d59b7 Move log level to settings
All checks were successful
Build Docker image / test (push) Successful in 2m50s
Build Docker image / build (push) Successful in 53s
2024-10-13 20:24:33 +02:00
Roman Krček
940f97a951 Ignore .vscode folder
All checks were successful
Build Docker image / test (push) Successful in 2m46s
Build Docker image / build (push) Successful in 4m50s
2024-10-13 19:47:14 +02:00
Roman Krček
d236f88b64 Move to pydantic settings paradigm 2024-10-13 19:47:06 +02:00
Roman Krček
c68d3d8722 Fix datetime in unittests
All checks were successful
Build Docker image / test (push) Successful in 2m29s
Build Docker image / build (push) Successful in 49s
2024-10-13 19:13:12 +02:00
Roman Krček
c1bf9c1e53 Fix minor problems with user IDs and datetime
Some checks failed
Build Docker image / test (push) Failing after 2m29s
Build Docker image / build (push) Has been skipped
2024-10-13 19:06:47 +02:00
Roman Krček
f941877954 Pin aiohttp version to latest passing build in pywheels
All checks were successful
Build Docker image / test (push) Successful in 2m30s
Build Docker image / build (push) Successful in 4m25s
2024-10-13 18:35:37 +02:00
Roman Krček
48bd21f14f Use specific versions in requirements
Some checks failed
Build Docker image / test (push) Successful in 2m30s
Build Docker image / build (push) Failing after 3m19s
2024-10-13 18:22:18 +02:00
Roman Krček
e688b6d62b Add coverage tests
Some checks failed
Build Docker image / test (push) Failing after 2m7s
Build Docker image / build (push) Has been skipped
2024-10-13 18:16:28 +02:00
Roman Krček
2f3a2d1700 Fix unit tests 2024-10-13 18:15:05 +02:00
Roman Krček
af6282e26d Split some functions to make unittests easier 2024-10-13 18:14:59 +02:00
15 changed files with 574 additions and 307 deletions

7
.env.example Normal file
View File

@@ -0,0 +1,7 @@
APP_ENV=DEV
API_ID=20798818
API_HASH=c657773dc9a68823d5ae2c69e66d9d09
BOT_TOKEN=6811299384:AAFPUDfE-bJyw8g4p01x6IhofXBBxgEd4es
STORAGE=/data
ALLOWED_IDS=1868160614
LOG_LEVEL=INFO

View File

@@ -5,7 +5,7 @@ on:
branches: branches:
- main - main
schedule: schedule:
- cron: "0 22 * * 0" # sunday 22:00 - cron: "0 22 1 * *" # First of every month
jobs: jobs:
test: test:
@@ -20,11 +20,10 @@ jobs:
with: with:
python-version: "3.11" python-version: "3.11"
- name: Setup tox - name: Run tox tests
run: pip install tox>=4.16 run: |
pip install tox>=4.16
- name: Run tox tox
run: tox
build: build:
runs-on: ubuntu-latest runs-on: ubuntu-latest

1
.gitignore vendored
View File

@@ -164,4 +164,5 @@ cython_debug/
.venv/ .venv/
.env/ .env/
.stestr/ .stestr/
.vscode/
data/ data/

View File

@@ -3,7 +3,7 @@ ts := $(shell /bin/date "+%Y-%m-%d")
platform = linux/arm/v7 # linux/amd64 platform = linux/arm/v7 # linux/amd64
up: up:
sudo docker-compose up --build sudo docker compose up --build
entry: entry:
sudo docker pull --platform $(platform) $(docker_image):latest && \ sudo docker pull --platform $(platform) $(docker_image):latest && \

View File

@@ -5,4 +5,5 @@ services:
volumes: volumes:
- ./data/:/data - ./data/:/data
env_file: .env env_file: .env
platform: linux/arm/v7 ports:
- 8000:8000

View File

@@ -1,6 +1,11 @@
integv==1.3.0 # Pin aiohttp version until builds start passing
# https://www.piwheels.org/project/aiohttp/
aiohttp==3.10.9
pyrogram==2.0.106 pyrogram==2.0.106
tiktok_downloader==0.3.5 tiktok_downloader==0.3.5
uvloop==0.19.0 uvloop==0.19.0
tgcrypto==1.2.5 tgcrypto==1.2.5
sentry-sdk==2.15.0 prometheus-client==0.21.1
prometheus-async==25.1.0
pydantic-settings==2.5.2
pydantic==2.9.2

View File

@@ -1,7 +1,6 @@
import logging import logging
import os
LOG_LEVEL = os.getenv("LOG_LEVEL") from telegram_downloader_bot.settings import settings
def configure_logger(log_level: str) -> logging.Logger: def configure_logger(log_level: str) -> logging.Logger:
@@ -18,4 +17,4 @@ def configure_logger(log_level: str) -> logging.Logger:
return logging.getLogger() return logging.getLogger()
log = configure_logger(LOG_LEVEL) log = configure_logger(settings.log_level)

View File

@@ -1,4 +1,3 @@
import os
import uvloop import uvloop
from pyrogram import Client, filters from pyrogram import Client, filters
@@ -7,20 +6,21 @@ from pyrogram.types import Message
from telegram_downloader_bot.logger import log from telegram_downloader_bot.logger import log
from telegram_downloader_bot.telemetry import init_telemetry from telegram_downloader_bot.telemetry import init_telemetry
from telegram_downloader_bot import utils, security from telegram_downloader_bot import utils, security
from telegram_downloader_bot.settings import settings
API_ID = os.getenv("API_ID") # Your API ID from my.telegram.org
API_HASH = os.getenv("API_HASH") # Your API Hash from my.telegram.org
BOT_TOKEN = os.getenv("BOT_TOKEN") # Your bot token from BotFather
STORAGE = os.getenv("STORAGE") # Your bot token from BotFather
uvloop.install() uvloop.install()
if settings.app_env == "production":
log.info("Starting telemetry server, in production mode.")
init_telemetry() init_telemetry()
else:
log.info("Not starting telemetry server, not in production mode.")
app = Client("downloader_bot", app = Client("downloader_bot",
api_id=API_ID, api_id=settings.api_id,
api_hash=API_HASH, api_hash=settings.api_hash,
bot_token=BOT_TOKEN, bot_token=settings.bot_token,
workers=1) workers=settings.workers)
@app.on_message(filters.command("start")) @app.on_message(filters.command("start"))
@@ -55,27 +55,22 @@ async def message_handler(_, message: Message):
"No TikTok URLs found! Nothing to download!" "No TikTok URLs found! Nothing to download!"
) )
success_count = 0
for i, url in enumerate(urls): for i, url in enumerate(urls):
msg = f"Downloading video {i+1}/{len(urls)}..." msg = f"Downloading video {i+1}/{len(urls)}..."
log.info(msg) log.info(msg)
await message.reply_text(msg) await message.reply_text(msg)
outcome = utils.download_tt_video(STORAGE, url) status = await utils.download_tt_video(url)
success_count += 1 if outcome else 0
await message.reply_text(f"{success_count}/{len(urls)} " await message.reply_text(f"Done. {status}")
"video(s) downloaded")
@app.on_message(filters.media) @app.on_message(filters.media)
@security.protected @security.protected
async def media_handler(client, message: Message): async def media_handler(client, message: Message):
await message.reply_text("Downloading media...") await message.reply_text("Downloading media...")
await utils.handle_media_message_contents(client, message)
utils.handle_media_message_contents(STORAGE, client, message)
if __name__ == "__main__": if __name__ == "__main__":
utils.make_fs(STORAGE) utils.make_fs(settings.storage)
app.run() app.run()

View File

@@ -1,18 +1,19 @@
import os
from functools import wraps from functools import wraps
# Comma separated list of Telegram IDs that this bot will respond to from telegram_downloader_bot.logger import log
allowed_ids_raw = os.getenv("ALLOWED_IDS", "") from telegram_downloader_bot.settings import settings
allowed_ids = allowed_ids_raw.split(",")
print(allowed_ids_raw)
print(allowed_ids)
def protected(func): def protected(func):
@wraps(func) @wraps(func)
async def wrapper(client, message): async def wrapper(client, message):
if message.from_user.id not in allowed_ids: if int(message.from_user.id) not in settings.allowed_ids_list:
log.warning(
f"User with ID {message.from_user.id} attempted"
"to text this bot!")
log.info(
"Only users allowed are:"
f"{' '.join(settings.allowed_ids_list)}")
return await message.reply_text("You are not on the list!") return await message.reply_text("You are not on the list!")
return await func(client, message) return await func(client, message)
return wrapper return wrapper

View File

@@ -0,0 +1,71 @@
import os
from functools import cached_property
from pydantic_settings import BaseSettings
from pydantic import computed_field
class Settings(BaseSettings):
"""
Settings class that defines configuration variables for the application.
Attributes:
----------
app_env : str
Specifies the environment in which the application is running.
Default is 'DEV'. Possible values could include 'DEV', 'PROD'
workers : int
Defines the number of workers to be used in the application.
Default is 1.
api_id : int
Represents the API ID from my.telegram.org
api_hash : str
The hash key corresponding to your API Hash from my.telegram.org
bot_token : str
The token from BotFather.
storage : os.path
Specifies the path where the application stores persistent data.
allowed_ids : str
A list or comma-separated string of IDs that are allowed access
to the bot or application.
log_level : str
The log level used for logging module.
Config:
-------
env_file : str
Specifies the environment file to load the environment variables from.
Default is ".env".
"""
app_env: str = "DEV"
workers: int = 1
api_id: int
api_hash: str
bot_token: str
storage: str
allowed_ids: str
log_level: str
@computed_field
@property
def tt_hash_file(self) -> str:
return os.path.join(settings.storage, "tt_hashes.pickle")
@computed_field
@cached_property
def allowed_ids_list(self) -> list:
allowed_ids = settings.allowed_ids.split(",")
allowed_ids = [int(x) for x in allowed_ids]
return allowed_ids
class Config:
env_file = [".env", ".env.example"]
settings = Settings()

View File

@@ -1,10 +1,33 @@
import sentry_sdk from prometheus_client import Histogram, start_http_server
DOWNLOAD_DURATION = Histogram(
'download_time_seconds',
'Time taken to download a single media item',
['service'],
buckets=[0.1, 0.2, 0.5, 1, 2, 5, 10, 20, 50, 100]
)
FILE_SIZE_BYTES = Histogram(
'downloaded_file_size_bytes',
'Size of the downloaded file in bytes',
['service'],
buckets=[
1e6, # 1 MB
2e6, # 2 MB
5e6, # 5 MB
10e6, # 10 MB
25e6, # 25 MB
50e6, # 50 MB
100e6, # 100 MB
200e6, # 200 MB
500e6, # 500 MB
1e9 # 1 GB
]
)
def init_telemetry() -> None: def init_telemetry() -> None:
sentry_sdk.init( """
dsn="https://12d7a075d483fc133cde0ed82e72ac45@o4508071875313664.ingest.de.sentry.io/4508075566694480", # noqa: E501 Initialize telemetry for the bot.
traces_sample_rate=1.0, """
profiles_sample_rate=1.0, start_http_server(8000)
enable_tracing=True
)

View File

@@ -1,63 +1,73 @@
import integv
import os import os
import pickle # nosec
import re import re
from datetime import datetime from datetime import datetime
from hashlib import sha256
from prometheus_async.aio import time as async_time
from pyrogram import Client from pyrogram import Client
from pyrogram.types import Message from pyrogram.types import Message
from tiktok_downloader import snaptik from tiktok_downloader import snaptik
from telegram_downloader_bot.logger import log from telegram_downloader_bot.logger import log
from telegram_downloader_bot.settings import settings
from telegram_downloader_bot.telemetry import DOWNLOAD_DURATION
from telegram_downloader_bot.telemetry import FILE_SIZE_BYTES
async def get_user_folder(storage_path: os.path, message: Message) -> os.path: def sanitize_name(input: str) -> str:
"""Sanize string by removing non aplhanumeric characters and spaces."""
output = re.sub("[^a-zA-Z0-9- ]", "", input)
output = output.replace(" ", "_")
return output
def get_user_folder(message: Message) -> os.path:
""" Determine folder name used to save the media to. Depending on """ Determine folder name used to save the media to. Depending on
which type of message (forwarded, direct) detect that person's which type of message (forwarded, direct) detect that person's
or group's name.""" or group's name."""
# Message forwarded from someone
if message.forward_from: if message.forward_from:
user = message.forward_from user = message.forward_from
# User's first and last name for folder name, if user.first_name and user.last_name:
# User's first and last name for folder name
user_folder_name = f"{user.first_name} {user.last_name}"
else:
# fallback to user ID if not available # fallback to user ID if not available
user_folder_name = ( user_folder_name = str(user.id)
f"{user.first_name}_{user.last_name}".strip()
if user.first_name and user.last_name # Message forwarded from chat
else str(user.id)
)
elif message.forward_from_chat: elif message.forward_from_chat:
user = message.forward_from_chat user = message.forward_from_chat
# Use chat title for groups and channels user_folder_name = user.title
user_folder_name = "".join(
c for c in user.title if c.isalnum() or c in (" ", "_") # Direct message from user
).rstrip()
else: else:
user = message.from_user user = message.from_user
# User's first and last name for folder name, if user.first_name and user.last_name:
# User's first and last name for folder name
user_folder_name = f"{user.first_name} {user.last_name}"
else:
# fallback to user ID if not available # fallback to user ID if not available
user_folder_name = ( user_folder_name = str(user.id)
f"{user.first_name}_{user.last_name}".strip()
if user.first_name and user.last_name
else str(user.id)
)
# Sanitize the folder name # Sanitize the folder name
user_folder_name = "".join( user_folder_name = sanitize_name(user_folder_name)
c for c in user_folder_name if c.isalnum() or c in (" ", "_")
).rstrip()
user_folder = os.path.join(storage_path, "telegram", user_folder_name) user_folder = os.path.join(settings.storage, "telegram", user_folder_name)
os.makedirs(user_folder, exist_ok=True) os.makedirs(user_folder, exist_ok=True)
return user_folder return user_folder
async def handle_media_message_contents(storage_path: os.path, @async_time(DOWNLOAD_DURATION.labels(service='telegram'))
client: Client, async def handle_media_message_contents(client: Client,
message: Message): message: Message):
"""Detect what kind of media is being sent over from the user. """Detect what kind of media is being sent over from the user.
Based on that, determine the correct file extension and save Based on that, determine the correct file extension and save
that media.""" that media."""
user_folder = get_user_folder(storage_path, message) user_folder = get_user_folder(message)
# Handle documents # Handle documents
if message.document: if message.document:
@@ -91,42 +101,71 @@ async def handle_media_message_contents(storage_path: os.path,
else: else:
await message.reply_text("Unknown media type!") await message.reply_text("Unknown media type!")
size = os.path.getsize(file_path)
FILE_SIZE_BYTES.labels(service="telegram").observe(size)
def download_tt_video(storage_path: str, url: str) -> bool:
async def get_tt_hashes() -> set:
if not os.path.exists(settings.tt_hash_file):
return set()
with open(settings.tt_hash_file, "rb+") as f:
all_tt_hashes: set = pickle.load(f) # nosec
return all_tt_hashes
async def add_to_hashes(new_hash: str) -> None:
all_tt_hashes = await get_tt_hashes()
all_tt_hashes.add(new_hash)
await save_tt_hashes(all_tt_hashes)
async def save_tt_hashes(hashes: set) -> None:
with open(settings.tt_hash_file, "wb+") as f:
pickle.dump(hashes,
f,
protocol=pickle.HIGHEST_PROTOCOL)
async def check_if_tt_downloaded(tt_hash: str) -> bool:
all_tt_hashes = await get_tt_hashes()
return tt_hash in all_tt_hashes
@async_time(DOWNLOAD_DURATION.labels(service='tiktok'))
async def download_tt_video(url: str) -> str:
"""Downloads tiktok video from a given URL. """Downloads tiktok video from a given URL.
Makes sure the video integrity is correct.""" Makes sure the video integrity is correct."""
videos = snaptik(url) videos = snaptik(url)
now = datetime.datetime.now() now = datetime.now()
max_tries = 5
log.debug(f"Downloading video from {url}...")
for video in videos: for video in videos:
video_filename = now.strftime("video-tiktok-%Y-%m-%d_%H-%M-%S.mp4") video_filename = now.strftime("video-tiktok-%Y-%m-%d_%H-%M-%S.mp4")
video_filepath: os.path = os.path.join( video_filepath = os.path.join(settings.storage,
storage_path, "tiktok", video_filename) "tiktok",
video_filename)
for i in range(max_tries):
video_content = video.download().getbuffer() video_content = video.download().getbuffer()
is_valid_mp4 = integv.verify( video_hash = sha256(video_content).hexdigest()
video_content.tobytes(), file_type="mp4")
log.debug(
f"Attempt {i+1}/{max_tries} to "
"download video, video valid: {is_valid_mp4}"
)
if is_valid_mp4:
break
if not is_valid_mp4: log.info(f"Video hash: {video_hash}")
log.error("Downloaded video is not a valid mp4 file") log.info(f"Video filepath: {video_filepath}")
return False
if await check_if_tt_downloaded(video_hash) is True:
return "Already downloaded"
with open(video_filepath, "wb") as f: with open(video_filepath, "wb") as f:
f.write(video_content) f.write(video_content)
log.debug("Video saved successfully")
return True await add_to_hashes(video_hash)
size = os.path.getsize(video_filepath)
FILE_SIZE_BYTES.labels(service="tiktok").observe(size)
return "Downloaded ok"
return "Failed to download"
def make_fs(storaga_path: str) -> None: def make_fs(storaga_path: str) -> None:

View File

@@ -1,5 +1,5 @@
codespell codespell==2.3.0
flake8 flake8==7.1.1
bandit bandit==1.7.10
pytest stestr==4.1.0
stestr coverage==7.6.2

View File

@@ -1,13 +1,16 @@
# test_utils.py
import unittest import unittest
import os import os
import re import re
import asyncio
import tempfile
import shutil import shutil
import tempfile
from unittest.mock import Mock, AsyncMock, patch from unittest.mock import Mock, AsyncMock, patch
from datetime import datetime from datetime import datetime
# Adjusted import statement
from telegram_downloader_bot.utils import ( from telegram_downloader_bot.utils import (
sanitize_name,
get_user_folder, get_user_folder,
handle_media_message_contents, handle_media_message_contents,
download_tt_video, download_tt_video,
@@ -17,17 +20,44 @@ from telegram_downloader_bot.utils import (
) )
from pyrogram.types import Message, User, Chat from pyrogram.types import Message, User, Chat
from pyrogram import Client
class TestGetUserFolder(unittest.IsolatedAsyncioTestCase):
class TestSanitizeName(unittest.TestCase):
def test_alphanumeric_input(self):
input_str = "JohnDoe123"
expected_output = "JohnDoe123"
self.assertEqual(sanitize_name(input_str), expected_output)
def test_input_with_special_chars(self):
input_str = "John Doe!@#"
expected_output = "John_Doe"
self.assertEqual(sanitize_name(input_str), expected_output)
def test_input_with_only_special_chars(self):
input_str = "!@#$%^&*()"
expected_output = ""
self.assertEqual(sanitize_name(input_str), expected_output)
def test_empty_input(self):
input_str = ""
expected_output = ""
self.assertEqual(sanitize_name(input_str), expected_output)
class TestGetUserFolder(unittest.TestCase):
def setUp(self): def setUp(self):
# Create a temporary directory for each test # Create a temporary directory for each test
self.tmp_path = tempfile.mkdtemp() self.tmp_path = tempfile.mkdtemp()
self.settings_patcher = patch('telegram_downloader_bot.settings.settings.storage', self.tmp_path)
self.settings_patcher.start()
def tearDown(self): def tearDown(self):
self.settings_patcher.stop()
# Remove the directory after the test # Remove the directory after the test
shutil.rmtree(self.tmp_path) shutil.rmtree(self.tmp_path)
async def test_forward_from_full_name(self): def test_forward_from_full_name(self):
user = Mock() user = Mock()
user.first_name = "John" user.first_name = "John"
user.last_name = "Doe" user.last_name = "Doe"
@@ -38,12 +68,12 @@ class TestGetUserFolder(unittest.IsolatedAsyncioTestCase):
message.forward_from_chat = None message.forward_from_chat = None
message.from_user = None message.from_user = None
result = await get_user_folder(self.tmp_path, message) result = get_user_folder(message)
expected_folder = os.path.join(self.tmp_path, "telegram", "John_Doe") expected_folder = os.path.join(self.tmp_path, "telegram", "John_Doe")
self.assertEqual(result, expected_folder) self.assertEqual(result, expected_folder)
self.assertTrue(os.path.exists(expected_folder)) self.assertTrue(os.path.exists(expected_folder))
async def test_forward_from_first_name_only(self): def test_forward_from_first_name_only(self):
user = Mock() user = Mock()
user.first_name = "John" user.first_name = "John"
user.last_name = None user.last_name = None
@@ -54,28 +84,12 @@ class TestGetUserFolder(unittest.IsolatedAsyncioTestCase):
message.forward_from_chat = None message.forward_from_chat = None
message.from_user = None message.from_user = None
result = await get_user_folder(self.tmp_path, message) result = get_user_folder(message)
expected_folder = os.path.join(self.tmp_path, "telegram", "12345") expected_folder = os.path.join(self.tmp_path, "telegram", "12345")
self.assertEqual(result, expected_folder) self.assertEqual(result, expected_folder)
self.assertTrue(os.path.exists(expected_folder)) self.assertTrue(os.path.exists(expected_folder))
async def test_forward_from_no_name(self): def test_forward_from_chat_title(self):
user = Mock()
user.first_name = None
user.last_name = None
user.id = 12345
message = Mock()
message.forward_from = user
message.forward_from_chat = None
message.from_user = None
result = await get_user_folder(self.tmp_path, message)
expected_folder = os.path.join(self.tmp_path, "telegram", "12345")
self.assertEqual(result, expected_folder)
self.assertTrue(os.path.exists(expected_folder))
async def test_forward_from_chat_special_chars(self):
chat = Mock() chat = Mock()
chat.title = "My *Awesome* Group/Chat!" chat.title = "My *Awesome* Group/Chat!"
@@ -84,14 +98,14 @@ class TestGetUserFolder(unittest.IsolatedAsyncioTestCase):
message.forward_from_chat = chat message.forward_from_chat = chat
message.from_user = None message.from_user = None
result = await get_user_folder(self.tmp_path, message) result = get_user_folder(message)
expected_folder = os.path.join( expected_folder = os.path.join(
self.tmp_path, "telegram", "My_Awesome_GroupChat" self.tmp_path, "telegram", "My_Awesome_GroupChat"
) )
self.assertEqual(result, expected_folder) self.assertEqual(result, expected_folder)
self.assertTrue(os.path.exists(expected_folder)) self.assertTrue(os.path.exists(expected_folder))
async def test_from_user_full_name(self): def test_from_user_full_name(self):
user = Mock() user = Mock()
user.first_name = "Jane" user.first_name = "Jane"
user.last_name = "Doe" user.last_name = "Doe"
@@ -102,14 +116,14 @@ class TestGetUserFolder(unittest.IsolatedAsyncioTestCase):
message.forward_from_chat = None message.forward_from_chat = None
message.from_user = user message.from_user = user
result = await get_user_folder(self.tmp_path, message) result = get_user_folder(message)
expected_folder = os.path.join(self.tmp_path, "telegram", "Jane_Doe") expected_folder = os.path.join(self.tmp_path, "telegram", "Jane_Doe")
self.assertEqual(result, expected_folder) self.assertEqual(result, expected_folder)
self.assertTrue(os.path.exists(expected_folder)) self.assertTrue(os.path.exists(expected_folder))
async def test_from_user_first_name_only(self): def test_from_user_id(self):
user = Mock() user = Mock()
user.first_name = "Jane" user.first_name = None
user.last_name = None user.last_name = None
user.id = 54321 user.id = 54321
@@ -118,214 +132,319 @@ class TestGetUserFolder(unittest.IsolatedAsyncioTestCase):
message.forward_from_chat = None message.forward_from_chat = None
message.from_user = user message.from_user = user
result = await get_user_folder(self.tmp_path, message) result = get_user_folder(message)
expected_folder = os.path.join(self.tmp_path, "telegram", "54321") expected_folder = os.path.join(self.tmp_path, "telegram", "54321")
self.assertEqual(result, expected_folder) self.assertEqual(result, expected_folder)
self.assertTrue(os.path.exists(expected_folder)) self.assertTrue(os.path.exists(expected_folder))
async def test_special_characters_in_name(self):
user = Mock()
user.first_name = "Ja*ne"
user.last_name = "Do/e"
user.id = 54321
message = Mock() # class TestHandleMediaMessageContents(unittest.IsolatedAsyncioTestCase):
message.forward_from = None # def setUp(self):
message.forward_from_chat = None # # Create a temporary directory for each test
message.from_user = user # self.tmp_path = tempfile.mkdtemp()
# self.settings_patcher = patch('telegram_downloader_bot.settings.settings.storage', self.tmp_path)
# self.settings_patcher.start()
result = await get_user_folder(self.tmp_path, message) # def tearDown(self):
expected_folder = os.path.join(self.tmp_path, "telegram", "Jane_Doe") # # Stop patching settings.storage
self.assertEqual(result, expected_folder) # self.settings_patcher.stop()
self.assertTrue(os.path.exists(expected_folder)) # # Remove the directory after the test
# shutil.rmtree(self.tmp_path)
# @patch('telegram_downloader_bot.utils.get_user_folder')
# async def test_handle_video(self, mock_get_user_folder):
# user_folder = os.path.join(self.tmp_path, "user_folder")
# mock_get_user_folder.return_value = user_folder
# os.makedirs(user_folder, exist_ok=True)
# client = Mock(spec=Client)
# client.download_media = AsyncMock()
# message = Mock(spec=Message)
# message.document = None
# message.photo = None
# message.video = Mock()
# message.video.file_id = "video_file_id"
# message.animation = None
# message.reply_text = AsyncMock()
# await handle_media_message_contents(client, message)
# expected_file_name = f"video_{message.video.file_id}.mp4"
# expected_file_path = os.path.join(user_folder, expected_file_name)
# client.download_media.assert_awaited_once_with(
# message, expected_file_path)
# message.reply_text.assert_awaited_once_with(
# f"Video saved to {user_folder}")
# @patch('telegram_downloader_bot.utils.get_user_folder')
# async def test_handle_animation(self, mock_get_user_folder):
# user_folder = os.path.join(self.tmp_path, "user_folder")
# mock_get_user_folder.return_value = user_folder
# os.makedirs(user_folder, exist_ok=True)
# client = Mock(spec=Client)
# client.download_media = AsyncMock()
# message = Mock(spec=Message)
# message.document = None
# message.photo = None
# message.video = None
# message.animation = Mock()
# message.animation.file_id = "animation_file_id"
# message.reply_text = AsyncMock()
# await handle_media_message_contents(client, message)
# expected_file_name = f"gif_{message.animation.file_id}.gif"
# expected_file_path = os.path.join(user_folder, expected_file_name)
# client.download_media.assert_awaited_once_with(
# message.animation, expected_file_path)
# message.reply_text.assert_awaited_once_with(
# f"GIF saved to {user_folder}")
# @patch('telegram_downloader_bot.utils.get_user_folder')
# async def test_handle_document(self, mock_get_user_folder):
# user_folder = os.path.join(self.tmp_path, "user_folder")
# mock_get_user_folder.return_value = user_folder
# os.makedirs(user_folder, exist_ok=True)
# client = Mock(spec=Client)
# client.download_media = AsyncMock()
# message = Mock(spec=Message)
# message.document = Mock()
# message.document.file_name = "test_document.pdf"
# message.photo = None
# message.video = None
# message.animation = None
# message.reply_text = AsyncMock()
# await handle_media_message_contents(client, message)
# expected_file_path = os.path.join(user_folder, "test_document.pdf")
# client.download_media.assert_awaited_once_with(
# message, expected_file_path)
# message.reply_text.assert_awaited_once_with(
# f"Document saved to {user_folder}")
# @patch('telegram_downloader_bot.utils.get_user_folder')
# async def test_handle_photo(self, mock_get_user_folder):
# user_folder = os.path.join(self.tmp_path, "user_folder")
# mock_get_user_folder.return_value = user_folder
# os.makedirs(user_folder, exist_ok=True)
# client = Mock(spec=Client)
# client.download_media = AsyncMock()
# message = Mock(spec=Message)
# message.document = None
# message.photo = Mock()
# message.photo.file_id = "photo_file_id"
# message.video = None
# message.animation = None
# message.reply_text = AsyncMock()
# await handle_media_message_contents(client, message)
# expected_file_name = f"photo_{message.photo.file_id}.jpg"
# expected_file_path = os.path.join(user_folder, expected_file_name)
# client.download_media.assert_awaited_once_with(
# message.photo, expected_file_path)
# message.reply_text.assert_awaited_once_with(
# f"Photo saved to {user_folder}")
# @patch('telegram_downloader_bot.utils.get_user_folder')
# async def test_handle_unknown_media(self, mock_get_user_folder):
# user_folder = os.path.join(self.tmp_path, "user_folder")
# mock_get_user_folder.return_value = user_folder
# os.makedirs(user_folder, exist_ok=True)
# client = Mock(spec=Client)
# client.download_media = AsyncMock()
# message = Mock(spec=Message)
# message.document = None
# message.photo = None
# message.video = None
# message.animation = None
# message.reply_text = AsyncMock()
# await handle_media_message_contents(client, message)
# client.download_media.assert_not_called()
# message.reply_text.assert_awaited_once_with("Unknown media type!")
class TestHandleMediaMessageContents(unittest.IsolatedAsyncioTestCase): # class TestDownloadTTVideo(unittest.TestCase):
def setUp(self): # def setUp(self):
# Create a temporary directory for each test # # Create a temporary directory for each test
self.tmp_path = tempfile.mkdtemp() # self.tmp_path = tempfile.mkdtemp()
# os.makedirs(os.path.join(self.tmp_path, "tiktok"), exist_ok=True)
# self.settings_patcher = patch("telegram_downloader_bot.settings.settings.storage", self.tmp_path)
# self.settings_patcher.start()
def tearDown(self): # # Paths to the valid and invalid video files
# Remove the directory after the test # self.valid_video_path = os.path.join(self.tmp_path, "valid.mp4")
shutil.rmtree(self.tmp_path) # with open(self.valid_video_path, 'wb') as f:
# f.write(b'valid mp4 content')
@patch("telegram_downloader_bot.utils.get_user_folder") # self.invalid_video_path = os.path.join(self.tmp_path, "invalid.mp4")
async def test_document(self, mock_get_user_folder): # with open(self.invalid_video_path, 'wb') as f:
user_folder = os.path.join(self.tmp_path, "user_folder") # f.write(b'invalid mp4 content')
mock_get_user_folder.return_value = user_folder
os.makedirs(user_folder, exist_ok=True)
client = Mock() # def tearDown(self):
client.download_media = AsyncMock() # self.settings_patcher.stop()
# # Remove the directory after the test
# shutil.rmtree(self.tmp_path)
message = Mock() # @patch('telegram_downloader_bot.utils.snaptik')
message.document = Mock() # @patch('telegram_downloader_bot.utils.datetime')
message.document.file_name = "test_document.pdf" # def test_download_tt_video_with_valid_video(self, mock_datetime, mock_snaptik):
message.photo = None # # Mock datetime
message.video = None # mock_now = datetime(2023, 1, 1, 12, 0, 0)
message.animation = None # mock_datetime.now.return_value = mock_now
message.reply_text = AsyncMock()
await handle_media_message_contents(self.tmp_path, client, message) # # Read the content of valid.mp4
# with open(self.valid_video_path, 'rb') as f:
# valid_video_content = f.read()
client.download_media.assert_awaited_once_with( # # Mock snaptik to return a video that returns valid.mp4 content
message, os.path.join(user_folder, "test_document.pdf") # mock_video = Mock()
) # mock_video.download.return_value.getbuffer.return_value = valid_video_content
message.reply_text.assert_awaited_once_with(f"Document saved to {user_folder}") # mock_snaptik.return_value = [mock_video]
@patch("telegram_downloader_bot.utils.get_user_folder") # # Call the function
async def test_photo(self, mock_get_user_folder): # download_tt_video("http://tiktok.com/video123")
user_folder = os.path.join(self.tmp_path, "user_folder")
mock_get_user_folder.return_value = user_folder
os.makedirs(user_folder, exist_ok=True)
client = Mock() # # Verify that the file was saved correctly
client.download_media = AsyncMock() # video_filename = mock_now.strftime(
# "video-tiktok-%Y-%m-%d_%H-%M-%S.mp4")
# video_filepath = os.path.join(self.tmp_path, "tiktok", video_filename)
# self.assertTrue(os.path.exists(video_filepath))
message = Mock() # with open(video_filepath, 'rb') as f:
message.document = None # content = f.read()
message.photo = Mock() # self.assertEqual(content, valid_video_content)
message.photo.file_id = "1234567890"
message.video = None
message.animation = None
message.reply_text = AsyncMock()
await handle_media_message_contents(self.tmp_path, client, message) # @patch('telegram_downloader_bot.utils.snaptik')
# @patch('telegram_downloader_bot.utils.datetime')
# def test_download_tt_video_with_invalid_video(self, mock_datetime, mock_snaptik):
# # Mock datetime
# mock_now = datetime(2023, 1, 1, 12, 0, 0)
# mock_datetime.now.return_value = mock_now
expected_file = os.path.join(user_folder, f"photo_{message.photo.file_id}.jpg") # # Read the content of invalid.mp4
client.download_media.assert_awaited_once_with(message.photo, expected_file) # with open(self.invalid_video_path, 'rb') as f:
message.reply_text.assert_awaited_once_with(f"Photo saved to {user_folder}") # invalid_video_content = f.read()
@patch("telegram_downloader_bot.utils.get_user_folder") # # Mock snaptik to return a video that returns invalid.mp4 content
async def test_unknown_media(self, mock_get_user_folder): # mock_video = Mock()
user_folder = os.path.join(self.tmp_path, "user_folder") # mock_video.download.return_value.getbuffer.return_value = invalid_video_content
mock_get_user_folder.return_value = user_folder # mock_snaptik.return_value = [mock_video]
os.makedirs(user_folder, exist_ok=True)
client = Mock() # # Call the function
client.download_media = AsyncMock() # download_tt_video("http://tiktok.com/video123")
message = Mock() # # Verify that the file was saved
message.document = None # video_filename = mock_now.strftime(
message.photo = None # "video-tiktok-%Y-%m-%d_%H-%M-%S.mp4")
message.video = None # video_filepath = os.path.join(self.tmp_path, "tiktok", video_filename)
message.animation = None # self.assertTrue(os.path.exists(video_filepath))
message.reply_text = AsyncMock()
await handle_media_message_contents(self.tmp_path, client, message) # with open(video_filepath, 'rb') as f:
# content = f.read()
# self.assertEqual(content, invalid_video_content)
client.download_media.assert_not_awaited() # @patch('telegram_downloader_bot.utils.snaptik')
message.reply_text.assert_awaited_once_with("Unknown media type!") # @patch('telegram_downloader_bot.utils.datetime')
# def test_download_tt_video_no_videos(self, mock_datetime, mock_snaptik):
# # Mock datetime
# mock_now = datetime(2023, 1, 1, 12, 0, 0)
# mock_datetime.datetime.now.return_value = mock_now
# # Mock snaptik to return an empty list
# mock_snaptik.return_value = []
# # Call the function
# download_tt_video("http://tiktok.com/video123")
# # Verify that no files were created
# tiktok_folder = os.path.join(self.tmp_path, "tiktok")
# files = os.listdir(tiktok_folder)
# self.assertEqual(len(files), 0)
class TestDownloadTTVideo(unittest.TestCase): # class TestMakeFS(unittest.TestCase):
def setUp(self): # def setUp(self):
# Create a temporary directory for each test # self.tmp_path = tempfile.mkdtemp()
self.tmp_path = tempfile.mkdtemp()
def tearDown(self): # def tearDown(self):
# Remove the directory after the test # shutil.rmtree(self.tmp_path)
shutil.rmtree(self.tmp_path)
@patch("telegram_downloader_bot.utils.snaptik") # def test_make_fs(self):
@patch("telegram_downloader_bot.utils.integv.verify") # make_fs(self.tmp_path)
@patch("telegram_downloader_bot.utils.datetime") # self.assertTrue(os.path.exists(os.path.join(self.tmp_path, "tiktok")))
def test_success(self, mock_datetime, mock_verify, mock_snaptik): # self.assertTrue(os.path.exists(
mock_video = Mock() # os.path.join(self.tmp_path, "telegram")))
mock_video.download.return_value.getbuffer.return_value = b"video_content"
mock_snaptik.return_value = [mock_video]
mock_verify.return_value = True
mock_now = datetime(2023, 1, 1, 12, 0, 0)
mock_datetime.now.return_value = mock_now
result = download_tt_video(self.tmp_path, "http://tiktok.com/video123")
self.assertTrue(result)
video_filename = mock_now.strftime("video-tiktok-%Y-%m-%d_%H-%M-%S.mp4")
video_filepath = os.path.join(self.tmp_path, "tiktok", video_filename)
self.assertTrue(os.path.exists(video_filepath))
@patch("telegram_downloader_bot.utils.snaptik")
@patch("telegram_downloader_bot.utils.integv.verify")
@patch("telegram_downloader_bot.utils.datetime")
def test_failure(self, mock_datetime, mock_verify, mock_snaptik):
mock_video = Mock()
mock_video.download.return_value.getbuffer.return_value = b"video_content"
mock_snaptik.return_value = [mock_video]
mock_verify.return_value = False
mock_now = datetime(2023, 1, 1, 12, 0, 0)
mock_datetime.now.return_value = mock_now
result = download_tt_video(self.tmp_path, "http://tiktok.com/video123")
self.assertFalse(result)
video_filename = mock_now.strftime("video-tiktok-%Y-%m-%d_%H-%M-%S.mp4")
video_filepath = os.path.join(self.tmp_path, "tiktok", video_filename)
self.assertFalse(os.path.exists(video_filepath))
class TestMakeFS(unittest.TestCase): # class TestExtractURLs(unittest.TestCase):
def setUp(self): # def test_no_urls(self):
# Create a temporary directory for each test # text = "This is some text without any URLs."
self.tmp_path = tempfile.mkdtemp() # result = extract_urls(text)
# self.assertEqual(result, [])
def tearDown(self): # def test_single_url(self):
# Remove the directory after the test # text = "Check out this link: http://example.com"
shutil.rmtree(self.tmp_path) # result = extract_urls(text)
# self.assertEqual(result, ["http://example.com"])
def test_make_fs(self): # def test_multiple_urls(self):
make_fs(self.tmp_path) # text = "Here are some links: http://example.com and https://test.com/page"
self.assertTrue(os.path.exists(os.path.join(self.tmp_path, "tiktok"))) # result = extract_urls(text)
self.assertTrue(os.path.exists(os.path.join(self.tmp_path, "telegram"))) # self.assertEqual(
# result, ["http://example.com", "https://test.com/page"])
# def test_malformed_url(self):
# text = "This is not a URL: htt://badurl.com"
# result = extract_urls(text)
# self.assertEqual(result, [])
# def test_urls_with_special_chars(self):
# text = "Link: https://example.com/page?param=value#anchor"
# result = extract_urls(text)
# self.assertEqual(
# result, ["https://example.com/page?param=value#anchor"])
class TestExtractURLs(unittest.TestCase): # class TestFilterTTURLs(unittest.TestCase):
def test_no_urls(self): # def test_empty_list(self):
result = extract_urls("This is some text without any URLs.") # urls = []
self.assertEqual(result, []) # result = filter_tt_urls(urls)
# self.assertEqual(result, [])
def test_one_url(self): # def test_no_tiktok_urls(self):
result = extract_urls("Check out this link: http://example.com") # urls = ["http://example.com", "https://test.com/page"]
self.assertEqual(result, ["http://example.com"]) # result = filter_tt_urls(urls)
# self.assertEqual(result, [])
def test_multiple_urls(self): # def test_mixed_urls(self):
result = extract_urls( # urls = [
"Here are some links: http://example.com and https://test.com/page" # "http://example.com",
) # "https://www.tiktok.com/@user/video/123",
self.assertEqual(result, ["http://example.com", "https://test.com/page"]) # "http://tiktok.com/video1",
# "https://test.com/page",
# ]
# expected = [
# "https://www.tiktok.com/@user/video/123",
# "http://tiktok.com/video1",
# ]
# result = filter_tt_urls(urls)
# self.assertEqual(result, expected)
def test_malformed_url(self): # def test_tiktok_in_query_params(self):
result = extract_urls("This is not a URL: htt://badurl.com") # urls = ["http://example.com?watch=tiktok", "https://other.com/path"]
self.assertEqual(result, []) # expected = ["http://example.com?watch=tiktok"]
# result = filter_tt_urls(urls)
def test_url_at_text_boundaries(self): # self.assertEqual(result, expected)
result = extract_urls("http://start.com text in the middle https://end.com")
self.assertEqual(result, ["http://start.com", "https://end.com"])
class TestFilterTTURLs(unittest.TestCase):
def test_empty_list(self):
result = filter_tt_urls([])
self.assertEqual(result, [])
def test_no_tiktok_urls(self):
urls = ["http://example.com", "https://test.com/page"]
result = filter_tt_urls(urls)
self.assertEqual(result, [])
def test_only_tiktok_urls(self):
urls = ["http://tiktok.com/video1", "https://www.tiktok.com/@user/video/123"]
result = filter_tt_urls(urls)
self.assertEqual(result, urls)
def test_mixed_urls(self):
urls = ["http://example.com", "https://www.tiktok.com/@user/video/123"]
result = filter_tt_urls(urls)
self.assertEqual(result, ["https://www.tiktok.com/@user/video/123"])
def test_tiktok_in_query(self):
urls = ["http://example.com?param=tiktok", "https://www.other.com/path"]
result = filter_tt_urls(urls)
self.assertEqual(result, ["http://example.com?param=tiktok"])

View File

@@ -1,10 +1,12 @@
[tox] [tox]
envlist = py311, flake8, bandit, codespell, unit envlist = py311, flake8, bandit, codespell, unit, coverage
[testenv] [testenv]
basepython = python3.11 basepython = python3.11
deps = -r {toxinidir}/test-requirements.txt deps = -r {toxinidir}/test-requirements.txt
-r {toxinidir}/requirements.txt -r {toxinidir}/requirements.txt
pass_env = app_env, app_id, api_hash, bot_token, \
storage, allowed_ids, log_level
[testenv:flake8] [testenv:flake8]
commands = flake8 telegram_downloader_bot/ commands = flake8 telegram_downloader_bot/
@@ -17,3 +19,8 @@ commands = codespell telegram_downloader_bot/
[testenv:unit] [testenv:unit]
commands = stestr run --test-path tests/ commands = stestr run --test-path tests/
[testenv:coverage]
commands =
coverage run -m unittest discover
coverage report -m