Add hash checking to TT downloading

This commit is contained in:
Roman Krček
2024-10-13 21:07:56 +02:00
parent 47248f10ab
commit 6d508121b0
3 changed files with 52 additions and 11 deletions

View File

@@ -56,7 +56,7 @@ async def message_handler(_, message: Message):
msg = f"Downloading video {i+1}/{len(urls)}..." msg = f"Downloading video {i+1}/{len(urls)}..."
log.info(msg) log.info(msg)
await message.reply_text(msg) await message.reply_text(msg)
utils.download_tt_video(settings.storage, url) utils.download_tt_video(url)
await message.reply_text("Done.") await message.reply_text("Done.")
@@ -67,7 +67,7 @@ async def media_handler(client, message: Message):
await message.reply_text("Downloading media...") await message.reply_text("Downloading media...")
utils.handle_media_message_contents(settings.storage, client, message) utils.handle_media_message_contents(client, message)
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -7,12 +7,12 @@ from telegram_downloader_bot.settings import settings
def protected(func): def protected(func):
@wraps(func) @wraps(func)
async def wrapper(client, message): async def wrapper(client, message):
if int(message.from_user.id) not in allowed_ids: if int(message.from_user.id) not in settings.allowed_ids_list:
log.warning( log.warning(
f"User with ID {message.from_user.id} attempted" f"User with ID {message.from_user.id} attempted"
"to text this bot!") "to text this bot!")
log.info( log.info(
f"Only users allowed are: {' '.join(allowed_ids)}") f"Only users allowed are: {' '.join(settings.allowed_ids_list)}")
return await message.reply_text("You are not on the list!") return await message.reply_text("You are not on the list!")
return await func(client, message) return await func(client, message)
return wrapper return wrapper

View File

@@ -1,11 +1,15 @@
import os import os
import pickle
import re import re
from datetime import datetime from datetime import datetime
from hashlib import sha256
from pyrogram import Client from pyrogram import Client
from pyrogram.types import Message from pyrogram.types import Message
from tiktok_downloader import snaptik from tiktok_downloader import snaptik
from telegram_downloader_bot.settings import settings
def sanitize_name(input: str) -> str: def sanitize_name(input: str) -> str:
"""Sanize string by removing non aplhanumeric characters and spaces.""" """Sanize string by removing non aplhanumeric characters and spaces."""
@@ -14,7 +18,7 @@ def sanitize_name(input: str) -> str:
return output return output
def get_user_folder(storage_path: os.path, message: Message) -> os.path: def get_user_folder(message: Message) -> os.path:
""" Determine folder name used to save the media to. Depending on """ Determine folder name used to save the media to. Depending on
which type of message (forwarded, direct) detect that person's which type of message (forwarded, direct) detect that person's
or group's name.""" or group's name."""
@@ -47,19 +51,18 @@ def get_user_folder(storage_path: os.path, message: Message) -> os.path:
# Sanitize the folder name # Sanitize the folder name
user_folder_name = sanitize_name(user_folder_name) user_folder_name = sanitize_name(user_folder_name)
user_folder = os.path.join(storage_path, "telegram", user_folder_name) user_folder = os.path.join(settings.storage, "telegram", user_folder_name)
os.makedirs(user_folder, exist_ok=True) os.makedirs(user_folder, exist_ok=True)
return user_folder return user_folder
async def handle_media_message_contents(storage_path: os.path, async def handle_media_message_contents(client: Client,
client: Client,
message: Message): message: Message):
"""Detect what kind of media is being sent over from the user. """Detect what kind of media is being sent over from the user.
Based on that, determine the correct file extension and save Based on that, determine the correct file extension and save
that media.""" that media."""
user_folder = get_user_folder(storage_path, message) user_folder = get_user_folder(message)
# Handle documents # Handle documents
if message.document: if message.document:
@@ -94,7 +97,35 @@ async def handle_media_message_contents(storage_path: os.path,
await message.reply_text("Unknown media type!") await message.reply_text("Unknown media type!")
def download_tt_video(storage_path: str, url: str) -> None: def get_tt_hashes() -> set:
if not os.path.exists(settings.tt_hash_file):
return set()
with open(settings.tt_hash_file, "rb+") as f:
all_tt_hashes: set = pickle.load(f)
print(all_tt_hashes)
return all_tt_hashes
def add_to_hashes(new_hash: str) -> None:
all_tt_hashes = get_tt_hashes()
all_tt_hashes.add(new_hash)
save_tt_hashes(all_tt_hashes)
def save_tt_hashes(hashes: set) -> None:
with open(settings.tt_hash_file, "wb+") as f:
pickle.dump(hashes,
f,
protocol=pickle.HIGHEST_PROTOCOL)
def check_if_tt_downloaded(tt_hash: str) -> bool:
all_tt_hashes = get_tt_hashes()
return tt_hash in all_tt_hashes
def download_tt_video(url: str) -> str:
"""Downloads tiktok video from a given URL. """Downloads tiktok video from a given URL.
Makes sure the video integrity is correct.""" Makes sure the video integrity is correct."""
@@ -103,14 +134,24 @@ def download_tt_video(storage_path: str, url: str) -> None:
for video in videos: for video in videos:
video_filename = now.strftime("video-tiktok-%Y-%m-%d_%H-%M-%S.mp4") video_filename = now.strftime("video-tiktok-%Y-%m-%d_%H-%M-%S.mp4")
video_filepath: os.path = os.path.join(storage_path, video_filepath: os.path = os.path.join(settings.storage,
"tiktok", "tiktok",
video_filename) video_filename)
video_content = video.download().getbuffer() video_content = video.download().getbuffer()
video_hash = sha256(video_content).hexdigest()
print(video_hash)
if check_if_tt_downloaded(video_hash):
return "Already downloaded"
with open(video_filepath, "wb") as f: with open(video_filepath, "wb") as f:
f.write(video_content) f.write(video_content)
add_to_hashes(video_hash)
return "Downloaded ok"
def make_fs(storaga_path: str) -> None: def make_fs(storaga_path: str) -> None:
os.makedirs(os.path.join(storaga_path, "tiktok"), exist_ok=True) os.makedirs(os.path.join(storaga_path, "tiktok"), exist_ok=True)