Compare commits

...

51 Commits

Author SHA1 Message Date
f08b9d1285 Build release update only once a month
All checks were successful
Build Docker image / test (push) Successful in 2m40s
Build Docker image / build (push) Successful in 4m50s
2025-05-29 11:09:51 +02:00
02cbbee625 revert ac1648f56c
All checks were successful
Build Docker image / test (push) Successful in 4m30s
Build Docker image / build (push) Successful in 5m17s
revert Use latest image as cache
Cache needs to be a separate image as it has different format
2025-05-14 11:03:21 +02:00
Roman Krček
ac1648f56c Use latest image as cache
All checks were successful
Build Docker image / test (push) Successful in 2m6s
Build Docker image / build (push) Successful in 5m3s
2025-05-13 18:21:39 +02:00
Roman Krček
b01a799a94 Fix timing of async functions and make histogram finer at lower ranges
Some checks failed
Build Docker image / test (push) Successful in 4m34s
Build Docker image / build (push) Has been cancelled
2025-05-13 18:14:36 +02:00
Roman Krček
020a6271cf Better debugging
All checks were successful
Build Docker image / test (push) Successful in 3m59s
Build Docker image / build (push) Successful in 5m8s
2025-05-09 22:44:52 +02:00
Roman Krček
df5538d7ee Remove incorrect remove statement
All checks were successful
Build Docker image / test (push) Successful in 4m53s
Build Docker image / build (push) Successful in 1m40s
2025-05-09 22:27:45 +02:00
Roman Krček
4b74810912 Remove logging and duplicate videos
All checks were successful
Build Docker image / test (push) Successful in 2m54s
Build Docker image / build (push) Successful in 1m14s
2025-05-06 20:24:22 +02:00
Roman Krček
8829caceee Add prometheus telemetry
All checks were successful
Build Docker image / test (push) Successful in 5m36s
Build Docker image / build (push) Successful in 8m19s
2025-05-06 20:06:36 +02:00
Roman Krček
8cc1c55026 Fix awaits for async functions
Some checks failed
Build Docker image / test (push) Has started running
Build Docker image / build (push) Has been cancelled
2025-02-16 17:23:25 +01:00
Roman Krček
e12eaa0fe1 Fix makefile for moder docker compose 2025-02-16 17:22:11 +01:00
Roman Krček
f2fcea2333 Add env fix for CI and remove caching
All checks were successful
Build Docker image / test (push) Successful in 2m13s
Build Docker image / build (push) Successful in 4m41s
2024-10-14 09:38:02 +02:00
Roman Krček
98250acd21 Add passenv to tox.ini
Some checks failed
Build Docker image / test (push) Failing after 2m42s
Build Docker image / build (push) Has been skipped
2024-10-14 09:33:52 +02:00
Roman Krček
21afea7f39 Add caching and test different style of env import
Some checks failed
Build Docker image / test (push) Failing after 3m17s
Build Docker image / build (push) Has been skipped
2024-10-14 09:22:47 +02:00
Roman Krček
7253745a50 Add example .env file so unittests pass in CI environment
Some checks failed
Build Docker image / test (push) Failing after 2m40s
Build Docker image / build (push) Has been skipped
2024-10-14 09:16:07 +02:00
Roman Krček
3075743c5d Fix linter issues and unittests
Some checks failed
Build Docker image / build (push) Has been skipped
Build Docker image / test (push) Failing after 2m41s
2024-10-13 21:18:18 +02:00
Roman Krček
6d508121b0 Add hash checking to TT downloading 2024-10-13 21:07:56 +02:00
Roman Krček
47248f10ab Add computed properties to settings 2024-10-13 21:05:51 +02:00
Roman Krček
47472d59b7 Move log level to settings
All checks were successful
Build Docker image / test (push) Successful in 2m50s
Build Docker image / build (push) Successful in 53s
2024-10-13 20:24:33 +02:00
Roman Krček
940f97a951 Ignore .vscode folder
All checks were successful
Build Docker image / test (push) Successful in 2m46s
Build Docker image / build (push) Successful in 4m50s
2024-10-13 19:47:14 +02:00
Roman Krček
d236f88b64 Move to pydantic settings paradigm 2024-10-13 19:47:06 +02:00
Roman Krček
c68d3d8722 Fix datetime in unittests
All checks were successful
Build Docker image / test (push) Successful in 2m29s
Build Docker image / build (push) Successful in 49s
2024-10-13 19:13:12 +02:00
Roman Krček
c1bf9c1e53 Fix minor problems with user IDs and datetime
Some checks failed
Build Docker image / test (push) Failing after 2m29s
Build Docker image / build (push) Has been skipped
2024-10-13 19:06:47 +02:00
Roman Krček
f941877954 Pin aiohttp version to latest passing build in pywheels
All checks were successful
Build Docker image / test (push) Successful in 2m30s
Build Docker image / build (push) Successful in 4m25s
2024-10-13 18:35:37 +02:00
Roman Krček
48bd21f14f Use specific versions in requirements
Some checks failed
Build Docker image / test (push) Successful in 2m30s
Build Docker image / build (push) Failing after 3m19s
2024-10-13 18:22:18 +02:00
Roman Krček
e688b6d62b Add coverage tests
Some checks failed
Build Docker image / test (push) Failing after 2m7s
Build Docker image / build (push) Has been skipped
2024-10-13 18:16:28 +02:00
Roman Krček
2f3a2d1700 Fix unit tests 2024-10-13 18:15:05 +02:00
Roman Krček
af6282e26d Split some functions to make unittests easier 2024-10-13 18:14:59 +02:00
Roman Krček
b74616ab82 Revert tox parallel
Some checks failed
Build Docker image / test (push) Failing after 1m58s
Build Docker image / build (push) Has been skipped
2024-10-13 11:36:47 +02:00
Roman Krček
4879c05a3b Add basic unittests
Some checks failed
Build Docker image / test (push) Failing after 1m6s
Build Docker image / build (push) Has been skipped
2024-10-13 11:33:29 +02:00
Roman Krček
b2254e99a2 Make testing faster by tunning tox in parallel 2024-10-13 11:30:33 +02:00
Roman Krček
725cf30319 Reorganize the project for better compatibility with unittests 2024-10-13 11:22:37 +02:00
Roman Krček
3c34c0f947 Prepare git for unittests 2024-10-13 11:21:31 +02:00
Roman Krček
32a423bf28 Fix code spell not being used in CI 2024-10-13 11:21:17 +02:00
Roman Krček
7a54a4c0f5 Fix linter issues
All checks were successful
Build Docker image / test (push) Successful in 30s
Build Docker image / build (push) Successful in 52s
2024-10-10 23:06:14 +02:00
Roman Krček
53228e7294 Protect chat by decorator
Some checks failed
Build Docker image / test (push) Failing after 33s
Build Docker image / build (push) Has been skipped
2024-10-10 23:00:01 +02:00
Roman Krček
801580e3f5 Reduce cognitive load by slitting long functions 2024-10-10 22:59:24 +02:00
Roman Krček
c633aebbc2 Fix CVE-2024-6345 by upgrading setuptools
All checks were successful
Build Docker image / test (push) Successful in 32s
Build Docker image / build (push) Successful in 41s
2024-10-07 17:43:46 +02:00
Roman Krček
a8d2714da9 Change Trivy output type to table
All checks were successful
Build Docker image / test (push) Successful in 30s
Build Docker image / build (push) Successful in 54s
2024-10-07 13:58:46 +02:00
Roman Krček
4a4a04900c Add Tricy to CI
Some checks failed
Build Docker image / test (push) Successful in 1m5s
Build Docker image / build (push) Failing after 1m10s
2024-10-07 13:36:18 +02:00
Roman Krček
4fbcbbc261 Added dockerignore 2024-10-07 13:32:48 +02:00
Roman Krček
78d3165ff1 Final version of labels for container
All checks were successful
Build Docker image / test (push) Successful in 32s
Build Docker image / build (push) Successful in 20s
2024-10-06 15:02:03 +02:00
Roman Krček
a39d71c091 More chnages to variables
All checks were successful
Build Docker image / test (push) Successful in 32s
Build Docker image / build (push) Successful in 23s
2024-10-06 14:56:59 +02:00
Roman Krček
33131b6a2a Change variable once more
All checks were successful
Build Docker image / test (push) Successful in 33s
Build Docker image / build (push) Successful in 23s
2024-10-06 14:51:16 +02:00
Roman Krček
a826c20e07 Reorder tasks to fix
All checks were successful
Build Docker image / test (push) Successful in 31s
Build Docker image / build (push) Successful in 23s
2024-10-06 14:46:22 +02:00
Roman Krček
6cfc6a565c Replace variables with gitlog
All checks were successful
Build Docker image / test (push) Successful in 32s
Build Docker image / build (push) Successful in 23s
2024-10-06 14:35:42 +02:00
Roman Krček
2da78e066a Fix variable names in actions
All checks were successful
Build Docker image / test (push) Successful in 32s
Build Docker image / build (push) Successful in 22s
2024-10-06 14:29:39 +02:00
Roman Krček
a81e41fdf4 Add commit info to image
All checks were successful
Build Docker image / test (push) Successful in 32s
Build Docker image / build (push) Successful in 23s
2024-10-06 14:27:09 +02:00
Roman Krček
3ddfbb1aa9 Fix linter error line too long
All checks were successful
Build Docker image / test (push) Successful in 34s
Build Docker image / build (push) Successful in 4m7s
2024-10-06 14:14:53 +02:00
Roman Krček
c549ef71f0 Add sentry
Some checks failed
Build Docker image / test (push) Failing after 38s
Build Docker image / build (push) Has been skipped
2024-10-06 14:06:42 +02:00
Roman Krček
237d6bb22c Ignore data folder 2024-10-06 13:53:41 +02:00
Roman Krček
6bb6180f6c Make sure to use python3 everywhere 2024-10-06 13:47:14 +02:00
19 changed files with 868 additions and 159 deletions

6
.dockerignore Normal file
View File

@@ -0,0 +1,6 @@
# Ignore everything
**
# Except for
!telegram_downloader_bot/
!requirements.txt

7
.env.example Normal file
View File

@@ -0,0 +1,7 @@
APP_ENV=DEV
API_ID=20798818
API_HASH=c657773dc9a68823d5ae2c69e66d9d09
BOT_TOKEN=6811299384:AAFPUDfE-bJyw8g4p01x6IhofXBBxgEd4es
STORAGE=/data
ALLOWED_IDS=1868160614
LOG_LEVEL=INFO

View File

@@ -5,7 +5,7 @@ on:
branches: branches:
- main - main
schedule: schedule:
- cron: "0 22 * * 0" # sunday 22:00 - cron: "0 22 1 * *" # First of every month
jobs: jobs:
test: test:
@@ -20,37 +20,36 @@ jobs:
with: with:
python-version: "3.11" python-version: "3.11"
- name: Setup tox - name: Run tox tests
run: pip install tox>=4.16 run: |
pip install tox>=4.16
- name: Run tox tox
run: tox
build: build:
runs-on: ubuntu-latest runs-on: ubuntu-latest
needs: test needs: test
steps: steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Get date for image label - name: Get date for image label
id: date id: date
run: echo "::set-output name=date::$(date +'%Y-%m-%d')" run: echo "::set-output name=date::$(date +'%Y-%m-%d')"
- name: Checkout code
uses: https://github.com/actions/checkout@v4
- name: Set up Docker Buildx - name: Set up Docker Buildx
uses: https://github.com/docker/setup-buildx-action@v3 uses: docker/setup-buildx-action@v3
with: with:
driver: docker-container driver: docker-container
- name: Login to Docker Registry - name: Login to Docker Registry
uses: https://github.com/docker/login-action@v3 uses: docker/login-action@v3
with: with:
registry: git.orebolt.cz registry: git.orebolt.cz
username: ${{ secrets.REGISTRY_USERNAME }} username: ${{ secrets.REGISTRY_USERNAME }}
password: ${{ secrets.REGISTRY_TOKEN }} password: ${{ secrets.REGISTRY_TOKEN }}
- name: Build and push image - name: Build and push image
uses: https://github.com/docker/build-push-action@v5 uses: docker/build-push-action@v5
with: with:
context: . context: .
push: true push: true
@@ -58,3 +57,17 @@ jobs:
platforms: linux/amd64,linux/arm/v7 platforms: linux/amd64,linux/arm/v7
cache-to: "mode=max,image-manifest=true,oci-mediatypes=true,type=registry,ref=${{ vars.DOCKER_IMAGE }}:cache" cache-to: "mode=max,image-manifest=true,oci-mediatypes=true,type=registry,ref=${{ vars.DOCKER_IMAGE }}:cache"
cache-from: "mode=max,image-manifest=true,oci-mediatypes=true,type=registry,ref=${{ vars.DOCKER_IMAGE }}:cache" cache-from: "mode=max,image-manifest=true,oci-mediatypes=true,type=registry,ref=${{ vars.DOCKER_IMAGE }}:cache"
labels: |
org.opencontainers.image.created=${{ steps.date.outputs.date }}
org.opencontainers.image.authors=Roman Krček
org.opencontainers.image.source=${{ env.GITHUB_REPOSITORY }}
org.opencontainers.image.revision=${{ env.GITHUB_SHA }}
org.opencontainers.image.vendor=Orebolt.cz
org.opencontainers.image.ref.name=${{ env.GITHUB_REF }}
org.opencontainers.image.title=Telegram Downloader
- name: Run Trivy vulnerability scanner
uses: aquasecurity/trivy-action@0.24.0
with:
image-ref: '${{ vars.DOCKER_IMAGE }}:latest'
format: 'table'

5
.gitignore vendored
View File

@@ -162,4 +162,7 @@ cython_debug/
#.idea/ #.idea/
.venv/ .venv/
.env .env/
.stestr/
.vscode/
data/

View File

@@ -8,7 +8,7 @@ FROM base AS builder
COPY requirements.txt ./ COPY requirements.txt ./
RUN --mount=type=cache,target=/tmp/pip_cache \ RUN --mount=type=cache,target=/tmp/pip_cache \
python -m pip install --upgrade pip && \ python3 -m pip install --upgrade pip setuptools && \
pip install \ pip install \
-r requirements.txt \ -r requirements.txt \
--extra-index-url https://www.piwheels.org/simple \ --extra-index-url https://www.piwheels.org/simple \

View File

@@ -3,7 +3,7 @@ ts := $(shell /bin/date "+%Y-%m-%d")
platform = linux/arm/v7 # linux/amd64 platform = linux/arm/v7 # linux/amd64
up: up:
sudo docker-compose up --build sudo docker compose up --build
entry: entry:
sudo docker pull --platform $(platform) $(docker_image):latest && \ sudo docker pull --platform $(platform) $(docker_image):latest && \

View File

@@ -5,4 +5,5 @@ services:
volumes: volumes:
- ./data/:/data - ./data/:/data
env_file: .env env_file: .env
platform: linux/arm/v7 ports:
- 8000:8000

View File

@@ -1,5 +1,11 @@
integv==1.3.0 # Pin aiohttp version until builds start passing
# https://www.piwheels.org/project/aiohttp/
aiohttp==3.10.9
pyrogram==2.0.106 pyrogram==2.0.106
tiktok_downloader==0.3.5 tiktok_downloader==0.3.5
uvloop==0.19.0 uvloop==0.19.0
tgcrypto==1.2.5 tgcrypto==1.2.5
prometheus-client==0.21.1
prometheus-async==25.1.0
pydantic-settings==2.5.2
pydantic==2.9.2

View File

@@ -1,5 +1,7 @@
import logging import logging
from telegram_downloader_bot.settings import settings
def configure_logger(log_level: str) -> logging.Logger: def configure_logger(log_level: str) -> logging.Logger:
log_format = ( log_format = (
@@ -13,3 +15,6 @@ def configure_logger(log_level: str) -> logging.Logger:
) )
return logging.getLogger() return logging.getLogger()
log = configure_logger(settings.log_level)

View File

@@ -1,175 +1,76 @@
import datetime
import integv
import os
import re
import uvloop import uvloop
from pyrogram import Client, filters from pyrogram import Client, filters
from pyrogram.types import Message from pyrogram.types import Message
from tiktok_downloader import snaptik
from telegram_downloader_bot import logger from telegram_downloader_bot.logger import log
from telegram_downloader_bot.telemetry import init_telemetry
API_ID = os.getenv("API_ID") # Your API ID from my.telegram.org from telegram_downloader_bot import utils, security
API_HASH = os.getenv("API_HASH") # Your API Hash from my.telegram.org from telegram_downloader_bot.settings import settings
BOT_TOKEN = os.getenv("BOT_TOKEN") # Your bot token from BotFather
STORAGE = os.getenv("STORAGE") # Storage directory for downloads
MY_MSG_ID = int(os.getenv("MY_MSG_ID")) # Your message ID for authorization
LOG_LEVEL = os.getenv("LOG_LEVEL") # Log level
log = logger.configure_logger(LOG_LEVEL)
uvloop.install() uvloop.install()
app = Client("downloader_bot", api_id=API_ID, if settings.app_env == "production":
api_hash=API_HASH, bot_token=BOT_TOKEN) log.info("Starting telemetry server, in production mode.")
init_telemetry()
else:
log.info("Not starting telemetry server, not in production mode.")
app = Client("downloader_bot",
def download_tt_video(url: str) -> bool: api_id=settings.api_id,
videos = snaptik(url) api_hash=settings.api_hash,
now = datetime.datetime.now() bot_token=settings.bot_token,
max_tries = 5 workers=settings.workers)
log.debug(f"Downloading video from {url}...")
for video in videos:
video_filename = now.strftime("video-tiktok-%Y-%m-%d_%H-%M-%S.mp4")
video_filepath: os.path = os.path.join(
STORAGE, "tiktok", video_filename)
for i in range(max_tries):
video_content = video.download().getbuffer()
is_valid_mp4 = integv.verify(
video_content.tobytes(), file_type="mp4")
log.debug(
f"Attempt {i+1}/{max_tries} to "
"download video, video valid: {is_valid_mp4}"
)
if is_valid_mp4:
break
if not is_valid_mp4:
log.error("Downloaded video is not a valid mp4 file")
return False
with open(video_filepath, "wb") as f:
f.write(video_content)
log.debug("Video saved successfully")
return True
@app.on_message(filters.command("start")) @app.on_message(filters.command("start"))
async def start_handler(client, message: Message): @security.protected
async def start_handler(_, message: Message):
await message.reply_text( await message.reply_text(
"This bot downloads TikTok videos " "to my personal server" "This bot downloads TikTok videos to my personal server"
) )
@app.on_message(filters.command("help")) @app.on_message(filters.command("help"))
async def help_handler(client, message: Message): @security.protected
async def help_handler(_, message: Message):
await message.reply_text("I won't help you!") await message.reply_text("I won't help you!")
@app.on_message(filters.text) @app.on_message(filters.text)
async def message_handler(client, message: Message): @security.protected
if message.chat.id != MY_MSG_ID: async def message_handler(_, message: Message):
return await message.reply_text("Nope, not talking to you!")
urls = utils.extract_urls(message.text)
urls = re.findall(r"\bhttps?://[^\s]+", message.text)
if not urls: if not urls:
return await message.reply_text( return await message.reply_text(
"No links found in the message. " "Nothing to download!" "No links found in the message. Nothing to download!"
)
tt_urls = utils.filter_tt_urls(urls)
if not tt_urls:
return await message.reply_text(
"No TikTok URLs found! Nothing to download!"
) )
success_count = 0
for i, url in enumerate(urls): for i, url in enumerate(urls):
msg = f"Downloading video {i+1}/{len(urls)}..." msg = f"Downloading video {i+1}/{len(urls)}..."
log.info(msg) log.info(msg)
await message.reply_text(msg) await message.reply_text(msg)
outcome = download_tt_video(url) status = await utils.download_tt_video(url)
success_count += 1 if outcome else 0
await message.reply_text(f"{success_count}/{len(urls)} " await message.reply_text(f"Done. {status}")
"video(s) downloaded")
@app.on_message(filters.media) @app.on_message(filters.media)
@security.protected
async def media_handler(client, message: Message): async def media_handler(client, message: Message):
if message.chat.id != MY_MSG_ID:
return await message.reply_text("Nope, not talking to you!")
# Determine folder name based on whether the message was forwarded
# and who it was forwarded from
if message.forward_from:
user = message.forward_from
# User's first and last name for folder name,
# fallback to user ID if not available
user_folder_name = (
f"{user.first_name}_{user.last_name}".strip()
if user.first_name and user.last_name
else str(user.id)
)
elif message.forward_from_chat:
user = message.forward_from_chat
# Use chat title for groups and channels
user_folder_name = "".join(
c for c in user.title if c.isalnum() or c in (" ", "_")
).rstrip()
else:
user = message.from_user
# User's first and last name for folder name,
# fallback to user ID if not available
user_folder_name = (
f"{user.first_name}_{user.last_name}".strip()
if user.first_name and user.last_name
else str(user.id)
)
user_folder_name = "".join(
c for c in user_folder_name if c.isalnum() or c in (" ", "_")
).rstrip() # Sanitize the folder name
user_folder = os.path.join(STORAGE, "telegram", user_folder_name)
os.makedirs(user_folder, exist_ok=True)
# Reply to user that the download is starting
await message.reply_text("Downloading media...") await message.reply_text("Downloading media...")
await utils.handle_media_message_contents(client, message)
# Handle documents
if message.document:
file_name = message.document.file_name
file_path = os.path.join(user_folder, file_name)
await client.download_media(message, file_path)
await message.reply_text(f"Document saved to {user_folder}")
# Handle single or multiple photos
elif message.photo:
file_name = f"photo_{message.photo.file_id}.jpg"
file_path = os.path.join(user_folder, file_name)
await client.download_media(message.photo, file_path)
await message.reply_text(f"Photo saved to {user_folder}")
# Handle videos
elif message.video:
file_name = f"video_{message.video.file_id}.mp4"
file_path = os.path.join(user_folder, file_name)
await client.download_media(message, file_path)
await message.reply_text(f"Video saved to {user_folder}")
# Handle GIFs
elif message.animation:
file_name = f"gif_{message.animation.file_id}.gif"
file_path = os.path.join(user_folder, file_name)
await client.download_media(message.animation, file_path)
await message.reply_text(f"GIF saved to {user_folder}")
# Handle unknown data types
else:
await message.reply_text("Unknown media type!")
if __name__ == "__main__": if __name__ == "__main__":
os.makedirs(os.path.join(STORAGE, "tiktok"), exist_ok=True) utils.make_fs(settings.storage)
os.makedirs(os.path.join(STORAGE, "telegram"), exist_ok=True)
app.run() app.run()

View File

@@ -0,0 +1,19 @@
from functools import wraps
from telegram_downloader_bot.logger import log
from telegram_downloader_bot.settings import settings
def protected(func):
@wraps(func)
async def wrapper(client, message):
if int(message.from_user.id) not in settings.allowed_ids_list:
log.warning(
f"User with ID {message.from_user.id} attempted"
"to text this bot!")
log.info(
"Only users allowed are:"
f"{' '.join(settings.allowed_ids_list)}")
return await message.reply_text("You are not on the list!")
return await func(client, message)
return wrapper

View File

@@ -0,0 +1,71 @@
import os
from functools import cached_property
from pydantic_settings import BaseSettings
from pydantic import computed_field
class Settings(BaseSettings):
"""
Settings class that defines configuration variables for the application.
Attributes:
----------
app_env : str
Specifies the environment in which the application is running.
Default is 'DEV'. Possible values could include 'DEV', 'PROD'
workers : int
Defines the number of workers to be used in the application.
Default is 1.
api_id : int
Represents the API ID from my.telegram.org
api_hash : str
The hash key corresponding to your API Hash from my.telegram.org
bot_token : str
The token from BotFather.
storage : os.path
Specifies the path where the application stores persistent data.
allowed_ids : str
A list or comma-separated string of IDs that are allowed access
to the bot or application.
log_level : str
The log level used for logging module.
Config:
-------
env_file : str
Specifies the environment file to load the environment variables from.
Default is ".env".
"""
app_env: str = "DEV"
workers: int = 1
api_id: int
api_hash: str
bot_token: str
storage: str
allowed_ids: str
log_level: str
@computed_field
@property
def tt_hash_file(self) -> str:
return os.path.join(settings.storage, "tt_hashes.pickle")
@computed_field
@cached_property
def allowed_ids_list(self) -> list:
allowed_ids = settings.allowed_ids.split(",")
allowed_ids = [int(x) for x in allowed_ids]
return allowed_ids
class Config:
env_file = [".env", ".env.example"]
settings = Settings()

View File

@@ -0,0 +1,33 @@
from prometheus_client import Histogram, start_http_server
DOWNLOAD_DURATION = Histogram(
'download_time_seconds',
'Time taken to download a single media item',
['service'],
buckets=[0.1, 0.2, 0.5, 1, 2, 5, 10, 20, 50, 100]
)
FILE_SIZE_BYTES = Histogram(
'downloaded_file_size_bytes',
'Size of the downloaded file in bytes',
['service'],
buckets=[
1e6, # 1 MB
2e6, # 2 MB
5e6, # 5 MB
10e6, # 10 MB
25e6, # 25 MB
50e6, # 50 MB
100e6, # 100 MB
200e6, # 200 MB
500e6, # 500 MB
1e9 # 1 GB
]
)
def init_telemetry() -> None:
"""
Initialize telemetry for the bot.
"""
start_http_server(8000)

View File

@@ -0,0 +1,181 @@
import os
import pickle # nosec
import re
from datetime import datetime
from hashlib import sha256
from prometheus_async.aio import time as async_time
from pyrogram import Client
from pyrogram.types import Message
from tiktok_downloader import snaptik
from telegram_downloader_bot.logger import log
from telegram_downloader_bot.settings import settings
from telegram_downloader_bot.telemetry import DOWNLOAD_DURATION
from telegram_downloader_bot.telemetry import FILE_SIZE_BYTES
def sanitize_name(input: str) -> str:
"""Sanize string by removing non aplhanumeric characters and spaces."""
output = re.sub("[^a-zA-Z0-9- ]", "", input)
output = output.replace(" ", "_")
return output
def get_user_folder(message: Message) -> os.path:
""" Determine folder name used to save the media to. Depending on
which type of message (forwarded, direct) detect that person's
or group's name."""
# Message forwarded from someone
if message.forward_from:
user = message.forward_from
if user.first_name and user.last_name:
# User's first and last name for folder name
user_folder_name = f"{user.first_name} {user.last_name}"
else:
# fallback to user ID if not available
user_folder_name = str(user.id)
# Message forwarded from chat
elif message.forward_from_chat:
user = message.forward_from_chat
user_folder_name = user.title
# Direct message from user
else:
user = message.from_user
if user.first_name and user.last_name:
# User's first and last name for folder name
user_folder_name = f"{user.first_name} {user.last_name}"
else:
# fallback to user ID if not available
user_folder_name = str(user.id)
# Sanitize the folder name
user_folder_name = sanitize_name(user_folder_name)
user_folder = os.path.join(settings.storage, "telegram", user_folder_name)
os.makedirs(user_folder, exist_ok=True)
return user_folder
@async_time(DOWNLOAD_DURATION.labels(service='telegram'))
async def handle_media_message_contents(client: Client,
message: Message):
"""Detect what kind of media is being sent over from the user.
Based on that, determine the correct file extension and save
that media."""
user_folder = get_user_folder(message)
# Handle documents
if message.document:
file_name = message.document.file_name
file_path = os.path.join(user_folder, file_name)
await client.download_media(message, file_path)
await message.reply_text(f"Document saved to {user_folder}")
# Handle single or multiple photos
elif message.photo:
file_name = f"photo_{message.photo.file_id}.jpg"
file_path = os.path.join(user_folder, file_name)
await client.download_media(message.photo, file_path)
await message.reply_text(f"Photo saved to {user_folder}")
# Handle videos
elif message.video:
file_name = f"video_{message.video.file_id}.mp4"
file_path = os.path.join(user_folder, file_name)
await client.download_media(message, file_path)
await message.reply_text(f"Video saved to {user_folder}")
# Handle GIFs
elif message.animation:
file_name = f"gif_{message.animation.file_id}.gif"
file_path = os.path.join(user_folder, file_name)
await client.download_media(message.animation, file_path)
await message.reply_text(f"GIF saved to {user_folder}")
# Handle unknown data types
else:
await message.reply_text("Unknown media type!")
size = os.path.getsize(file_path)
FILE_SIZE_BYTES.labels(service="telegram").observe(size)
async def get_tt_hashes() -> set:
if not os.path.exists(settings.tt_hash_file):
return set()
with open(settings.tt_hash_file, "rb+") as f:
all_tt_hashes: set = pickle.load(f) # nosec
return all_tt_hashes
async def add_to_hashes(new_hash: str) -> None:
all_tt_hashes = await get_tt_hashes()
all_tt_hashes.add(new_hash)
await save_tt_hashes(all_tt_hashes)
async def save_tt_hashes(hashes: set) -> None:
with open(settings.tt_hash_file, "wb+") as f:
pickle.dump(hashes,
f,
protocol=pickle.HIGHEST_PROTOCOL)
async def check_if_tt_downloaded(tt_hash: str) -> bool:
all_tt_hashes = await get_tt_hashes()
return tt_hash in all_tt_hashes
@async_time(DOWNLOAD_DURATION.labels(service='tiktok'))
async def download_tt_video(url: str) -> str:
"""Downloads tiktok video from a given URL.
Makes sure the video integrity is correct."""
videos = snaptik(url)
now = datetime.now()
for video in videos:
video_filename = now.strftime("video-tiktok-%Y-%m-%d_%H-%M-%S.mp4")
video_filepath = os.path.join(settings.storage,
"tiktok",
video_filename)
video_content = video.download().getbuffer()
video_hash = sha256(video_content).hexdigest()
log.info(f"Video hash: {video_hash}")
log.info(f"Video filepath: {video_filepath}")
if await check_if_tt_downloaded(video_hash) is True:
return "Already downloaded"
with open(video_filepath, "wb") as f:
f.write(video_content)
await add_to_hashes(video_hash)
size = os.path.getsize(video_filepath)
FILE_SIZE_BYTES.labels(service="tiktok").observe(size)
return "Downloaded ok"
return "Failed to download"
def make_fs(storaga_path: str) -> None:
os.makedirs(os.path.join(storaga_path, "tiktok"), exist_ok=True)
os.makedirs(os.path.join(storaga_path, "telegram"), exist_ok=True)
def extract_urls(text: str) -> list:
return re.findall(r"\bhttps?://[^\s]+", text)
def filter_tt_urls(urls: list) -> list:
return [x for x in urls if "tiktok" in x]

View File

@@ -1,4 +1,5 @@
codespell codespell==2.3.0
flake8 flake8==7.1.1
bandit bandit==1.7.10
pytest stestr==4.1.0
coverage==7.6.2

0
tests/__init__.py Normal file
View File

View File

@@ -0,0 +1 @@
# To be implemented

450
tests/test_utils.py Normal file
View File

@@ -0,0 +1,450 @@
# test_utils.py
import unittest
import os
import re
import shutil
import tempfile
from unittest.mock import Mock, AsyncMock, patch
from datetime import datetime
# Adjusted import statement
from telegram_downloader_bot.utils import (
sanitize_name,
get_user_folder,
handle_media_message_contents,
download_tt_video,
make_fs,
extract_urls,
filter_tt_urls,
)
from pyrogram.types import Message, User, Chat
from pyrogram import Client
class TestSanitizeName(unittest.TestCase):
def test_alphanumeric_input(self):
input_str = "JohnDoe123"
expected_output = "JohnDoe123"
self.assertEqual(sanitize_name(input_str), expected_output)
def test_input_with_special_chars(self):
input_str = "John Doe!@#"
expected_output = "John_Doe"
self.assertEqual(sanitize_name(input_str), expected_output)
def test_input_with_only_special_chars(self):
input_str = "!@#$%^&*()"
expected_output = ""
self.assertEqual(sanitize_name(input_str), expected_output)
def test_empty_input(self):
input_str = ""
expected_output = ""
self.assertEqual(sanitize_name(input_str), expected_output)
class TestGetUserFolder(unittest.TestCase):
def setUp(self):
# Create a temporary directory for each test
self.tmp_path = tempfile.mkdtemp()
self.settings_patcher = patch('telegram_downloader_bot.settings.settings.storage', self.tmp_path)
self.settings_patcher.start()
def tearDown(self):
self.settings_patcher.stop()
# Remove the directory after the test
shutil.rmtree(self.tmp_path)
def test_forward_from_full_name(self):
user = Mock()
user.first_name = "John"
user.last_name = "Doe"
user.id = 12345
message = Mock()
message.forward_from = user
message.forward_from_chat = None
message.from_user = None
result = get_user_folder(message)
expected_folder = os.path.join(self.tmp_path, "telegram", "John_Doe")
self.assertEqual(result, expected_folder)
self.assertTrue(os.path.exists(expected_folder))
def test_forward_from_first_name_only(self):
user = Mock()
user.first_name = "John"
user.last_name = None
user.id = 12345
message = Mock()
message.forward_from = user
message.forward_from_chat = None
message.from_user = None
result = get_user_folder(message)
expected_folder = os.path.join(self.tmp_path, "telegram", "12345")
self.assertEqual(result, expected_folder)
self.assertTrue(os.path.exists(expected_folder))
def test_forward_from_chat_title(self):
chat = Mock()
chat.title = "My *Awesome* Group/Chat!"
message = Mock()
message.forward_from = None
message.forward_from_chat = chat
message.from_user = None
result = get_user_folder(message)
expected_folder = os.path.join(
self.tmp_path, "telegram", "My_Awesome_GroupChat"
)
self.assertEqual(result, expected_folder)
self.assertTrue(os.path.exists(expected_folder))
def test_from_user_full_name(self):
user = Mock()
user.first_name = "Jane"
user.last_name = "Doe"
user.id = 54321
message = Mock()
message.forward_from = None
message.forward_from_chat = None
message.from_user = user
result = get_user_folder(message)
expected_folder = os.path.join(self.tmp_path, "telegram", "Jane_Doe")
self.assertEqual(result, expected_folder)
self.assertTrue(os.path.exists(expected_folder))
def test_from_user_id(self):
user = Mock()
user.first_name = None
user.last_name = None
user.id = 54321
message = Mock()
message.forward_from = None
message.forward_from_chat = None
message.from_user = user
result = get_user_folder(message)
expected_folder = os.path.join(self.tmp_path, "telegram", "54321")
self.assertEqual(result, expected_folder)
self.assertTrue(os.path.exists(expected_folder))
# class TestHandleMediaMessageContents(unittest.IsolatedAsyncioTestCase):
# def setUp(self):
# # Create a temporary directory for each test
# self.tmp_path = tempfile.mkdtemp()
# self.settings_patcher = patch('telegram_downloader_bot.settings.settings.storage', self.tmp_path)
# self.settings_patcher.start()
# def tearDown(self):
# # Stop patching settings.storage
# self.settings_patcher.stop()
# # Remove the directory after the test
# shutil.rmtree(self.tmp_path)
# @patch('telegram_downloader_bot.utils.get_user_folder')
# async def test_handle_video(self, mock_get_user_folder):
# user_folder = os.path.join(self.tmp_path, "user_folder")
# mock_get_user_folder.return_value = user_folder
# os.makedirs(user_folder, exist_ok=True)
# client = Mock(spec=Client)
# client.download_media = AsyncMock()
# message = Mock(spec=Message)
# message.document = None
# message.photo = None
# message.video = Mock()
# message.video.file_id = "video_file_id"
# message.animation = None
# message.reply_text = AsyncMock()
# await handle_media_message_contents(client, message)
# expected_file_name = f"video_{message.video.file_id}.mp4"
# expected_file_path = os.path.join(user_folder, expected_file_name)
# client.download_media.assert_awaited_once_with(
# message, expected_file_path)
# message.reply_text.assert_awaited_once_with(
# f"Video saved to {user_folder}")
# @patch('telegram_downloader_bot.utils.get_user_folder')
# async def test_handle_animation(self, mock_get_user_folder):
# user_folder = os.path.join(self.tmp_path, "user_folder")
# mock_get_user_folder.return_value = user_folder
# os.makedirs(user_folder, exist_ok=True)
# client = Mock(spec=Client)
# client.download_media = AsyncMock()
# message = Mock(spec=Message)
# message.document = None
# message.photo = None
# message.video = None
# message.animation = Mock()
# message.animation.file_id = "animation_file_id"
# message.reply_text = AsyncMock()
# await handle_media_message_contents(client, message)
# expected_file_name = f"gif_{message.animation.file_id}.gif"
# expected_file_path = os.path.join(user_folder, expected_file_name)
# client.download_media.assert_awaited_once_with(
# message.animation, expected_file_path)
# message.reply_text.assert_awaited_once_with(
# f"GIF saved to {user_folder}")
# @patch('telegram_downloader_bot.utils.get_user_folder')
# async def test_handle_document(self, mock_get_user_folder):
# user_folder = os.path.join(self.tmp_path, "user_folder")
# mock_get_user_folder.return_value = user_folder
# os.makedirs(user_folder, exist_ok=True)
# client = Mock(spec=Client)
# client.download_media = AsyncMock()
# message = Mock(spec=Message)
# message.document = Mock()
# message.document.file_name = "test_document.pdf"
# message.photo = None
# message.video = None
# message.animation = None
# message.reply_text = AsyncMock()
# await handle_media_message_contents(client, message)
# expected_file_path = os.path.join(user_folder, "test_document.pdf")
# client.download_media.assert_awaited_once_with(
# message, expected_file_path)
# message.reply_text.assert_awaited_once_with(
# f"Document saved to {user_folder}")
# @patch('telegram_downloader_bot.utils.get_user_folder')
# async def test_handle_photo(self, mock_get_user_folder):
# user_folder = os.path.join(self.tmp_path, "user_folder")
# mock_get_user_folder.return_value = user_folder
# os.makedirs(user_folder, exist_ok=True)
# client = Mock(spec=Client)
# client.download_media = AsyncMock()
# message = Mock(spec=Message)
# message.document = None
# message.photo = Mock()
# message.photo.file_id = "photo_file_id"
# message.video = None
# message.animation = None
# message.reply_text = AsyncMock()
# await handle_media_message_contents(client, message)
# expected_file_name = f"photo_{message.photo.file_id}.jpg"
# expected_file_path = os.path.join(user_folder, expected_file_name)
# client.download_media.assert_awaited_once_with(
# message.photo, expected_file_path)
# message.reply_text.assert_awaited_once_with(
# f"Photo saved to {user_folder}")
# @patch('telegram_downloader_bot.utils.get_user_folder')
# async def test_handle_unknown_media(self, mock_get_user_folder):
# user_folder = os.path.join(self.tmp_path, "user_folder")
# mock_get_user_folder.return_value = user_folder
# os.makedirs(user_folder, exist_ok=True)
# client = Mock(spec=Client)
# client.download_media = AsyncMock()
# message = Mock(spec=Message)
# message.document = None
# message.photo = None
# message.video = None
# message.animation = None
# message.reply_text = AsyncMock()
# await handle_media_message_contents(client, message)
# client.download_media.assert_not_called()
# message.reply_text.assert_awaited_once_with("Unknown media type!")
# class TestDownloadTTVideo(unittest.TestCase):
# def setUp(self):
# # Create a temporary directory for each test
# self.tmp_path = tempfile.mkdtemp()
# os.makedirs(os.path.join(self.tmp_path, "tiktok"), exist_ok=True)
# self.settings_patcher = patch("telegram_downloader_bot.settings.settings.storage", self.tmp_path)
# self.settings_patcher.start()
# # Paths to the valid and invalid video files
# self.valid_video_path = os.path.join(self.tmp_path, "valid.mp4")
# with open(self.valid_video_path, 'wb') as f:
# f.write(b'valid mp4 content')
# self.invalid_video_path = os.path.join(self.tmp_path, "invalid.mp4")
# with open(self.invalid_video_path, 'wb') as f:
# f.write(b'invalid mp4 content')
# def tearDown(self):
# self.settings_patcher.stop()
# # Remove the directory after the test
# shutil.rmtree(self.tmp_path)
# @patch('telegram_downloader_bot.utils.snaptik')
# @patch('telegram_downloader_bot.utils.datetime')
# def test_download_tt_video_with_valid_video(self, mock_datetime, mock_snaptik):
# # Mock datetime
# mock_now = datetime(2023, 1, 1, 12, 0, 0)
# mock_datetime.now.return_value = mock_now
# # Read the content of valid.mp4
# with open(self.valid_video_path, 'rb') as f:
# valid_video_content = f.read()
# # Mock snaptik to return a video that returns valid.mp4 content
# mock_video = Mock()
# mock_video.download.return_value.getbuffer.return_value = valid_video_content
# mock_snaptik.return_value = [mock_video]
# # Call the function
# download_tt_video("http://tiktok.com/video123")
# # Verify that the file was saved correctly
# video_filename = mock_now.strftime(
# "video-tiktok-%Y-%m-%d_%H-%M-%S.mp4")
# video_filepath = os.path.join(self.tmp_path, "tiktok", video_filename)
# self.assertTrue(os.path.exists(video_filepath))
# with open(video_filepath, 'rb') as f:
# content = f.read()
# self.assertEqual(content, valid_video_content)
# @patch('telegram_downloader_bot.utils.snaptik')
# @patch('telegram_downloader_bot.utils.datetime')
# def test_download_tt_video_with_invalid_video(self, mock_datetime, mock_snaptik):
# # Mock datetime
# mock_now = datetime(2023, 1, 1, 12, 0, 0)
# mock_datetime.now.return_value = mock_now
# # Read the content of invalid.mp4
# with open(self.invalid_video_path, 'rb') as f:
# invalid_video_content = f.read()
# # Mock snaptik to return a video that returns invalid.mp4 content
# mock_video = Mock()
# mock_video.download.return_value.getbuffer.return_value = invalid_video_content
# mock_snaptik.return_value = [mock_video]
# # Call the function
# download_tt_video("http://tiktok.com/video123")
# # Verify that the file was saved
# video_filename = mock_now.strftime(
# "video-tiktok-%Y-%m-%d_%H-%M-%S.mp4")
# video_filepath = os.path.join(self.tmp_path, "tiktok", video_filename)
# self.assertTrue(os.path.exists(video_filepath))
# with open(video_filepath, 'rb') as f:
# content = f.read()
# self.assertEqual(content, invalid_video_content)
# @patch('telegram_downloader_bot.utils.snaptik')
# @patch('telegram_downloader_bot.utils.datetime')
# def test_download_tt_video_no_videos(self, mock_datetime, mock_snaptik):
# # Mock datetime
# mock_now = datetime(2023, 1, 1, 12, 0, 0)
# mock_datetime.datetime.now.return_value = mock_now
# # Mock snaptik to return an empty list
# mock_snaptik.return_value = []
# # Call the function
# download_tt_video("http://tiktok.com/video123")
# # Verify that no files were created
# tiktok_folder = os.path.join(self.tmp_path, "tiktok")
# files = os.listdir(tiktok_folder)
# self.assertEqual(len(files), 0)
# class TestMakeFS(unittest.TestCase):
# def setUp(self):
# self.tmp_path = tempfile.mkdtemp()
# def tearDown(self):
# shutil.rmtree(self.tmp_path)
# def test_make_fs(self):
# make_fs(self.tmp_path)
# self.assertTrue(os.path.exists(os.path.join(self.tmp_path, "tiktok")))
# self.assertTrue(os.path.exists(
# os.path.join(self.tmp_path, "telegram")))
# class TestExtractURLs(unittest.TestCase):
# def test_no_urls(self):
# text = "This is some text without any URLs."
# result = extract_urls(text)
# self.assertEqual(result, [])
# def test_single_url(self):
# text = "Check out this link: http://example.com"
# result = extract_urls(text)
# self.assertEqual(result, ["http://example.com"])
# def test_multiple_urls(self):
# text = "Here are some links: http://example.com and https://test.com/page"
# result = extract_urls(text)
# self.assertEqual(
# result, ["http://example.com", "https://test.com/page"])
# def test_malformed_url(self):
# text = "This is not a URL: htt://badurl.com"
# result = extract_urls(text)
# self.assertEqual(result, [])
# def test_urls_with_special_chars(self):
# text = "Link: https://example.com/page?param=value#anchor"
# result = extract_urls(text)
# self.assertEqual(
# result, ["https://example.com/page?param=value#anchor"])
# class TestFilterTTURLs(unittest.TestCase):
# def test_empty_list(self):
# urls = []
# result = filter_tt_urls(urls)
# self.assertEqual(result, [])
# def test_no_tiktok_urls(self):
# urls = ["http://example.com", "https://test.com/page"]
# result = filter_tt_urls(urls)
# self.assertEqual(result, [])
# def test_mixed_urls(self):
# urls = [
# "http://example.com",
# "https://www.tiktok.com/@user/video/123",
# "http://tiktok.com/video1",
# "https://test.com/page",
# ]
# expected = [
# "https://www.tiktok.com/@user/video/123",
# "http://tiktok.com/video1",
# ]
# result = filter_tt_urls(urls)
# self.assertEqual(result, expected)
# def test_tiktok_in_query_params(self):
# urls = ["http://example.com?watch=tiktok", "https://other.com/path"]
# expected = ["http://example.com?watch=tiktok"]
# result = filter_tt_urls(urls)
# self.assertEqual(result, expected)

17
tox.ini
View File

@@ -1,9 +1,12 @@
[tox] [tox]
envlist = py311, flake8, bandit envlist = py311, flake8, bandit, codespell, unit, coverage
[testenv] [testenv]
basepython = python3.11 basepython = python3.11
deps = -r {toxinidir}/test-requirements.txt deps = -r {toxinidir}/test-requirements.txt
-r {toxinidir}/requirements.txt
pass_env = app_env, app_id, api_hash, bot_token, \
storage, allowed_ids, log_level
[testenv:flake8] [testenv:flake8]
commands = flake8 telegram_downloader_bot/ commands = flake8 telegram_downloader_bot/
@@ -12,4 +15,12 @@ commands = flake8 telegram_downloader_bot/
commands = bandit -r telegram_downloader_bot/ commands = bandit -r telegram_downloader_bot/
[testenv:codespell] [testenv:codespell]
commands = codespell telegram_downloader_bot/ commands = codespell telegram_downloader_bot/
[testenv:unit]
commands = stestr run --test-path tests/
[testenv:coverage]
commands =
coverage run -m unittest discover
coverage report -m