Я немного исследовал, как создать мою докеризованную услугу для загрузки файлов из Telegram, у меня есть личная группа для личного использования, и я использую это, что у меня есть Сервис, что у меня будет сервис. Файлы достигают максимальной скорости загрузки 5 МБ/с, слишком медленной, для загрузки, которую я использую Телетон+FastteLethonHelper, и я достигаю более высоких скоростей загрузки, хотя я должен ограничить его, потому что в противном случае телеграмма блокирует его, даже так и станет скоростью примерно 25 МБ/с. не идет выше 5 или 6 МБ/с. И вы скажете ... почему вы не используете FastteLethonHelper для загрузки, если у вас все в порядке с загрузкой? Я попробовал это, но даже не превышает 5 МБ/с, это даже хуже, чем использовать только телетон ... может быть, какая-то плохая конфигурация, я не знаю, я не нашел решение, и я вернулся к использованию чистого телетона, которая даже идет на 1-2 МБ/с. Cryptg> = 0.5
Я попытался загрузить из контейнера, если он по какой -то причине ограничен, но он загружает файлы на скорости выше 5 МБ/с. (Это тест Hetzner Speed, загружающий файл 100 МБ). < /P>
root@x:/app# curl -L -o /dev/null -w '%{speed_download}\n' https://nbg1-speed.hetzner.com/100MB.bin
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 100M 100 100M 0 0 27.9M 0 0:00:03 0:00:03 --:--:-- 27.9M
29356696
< /code>
Это максимальная скорость, с которой вы можете скачать документы? Есть ли другой способ скачать с более высокой скоростью? Идея состоит в том, чтобы достичь не менее 20-25 МБ/с. Спасибо, я прочитал вас.import os
import re
import asyncio
import logging
import time
from pathlib import Path
from telethon import TelegramClient, events
from telethon.tl.types import MessageMediaDocument
from telethon.crypto import AES
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
TELEGRAM_API_ID = int(os.getenv("TELEGRAM_API_ID"))
TELEGRAM_API_HASH = os.getenv("TELEGRAM_API_HASH")
TELEGRAM_SESSION_NAME = os.path.join('session', os.getenv('TELEGRAM_SESSION_NAME', 'bot_session'))
TELEGRAM_GROUP_ID = int(os.getenv("GROUP_CHAT_ID"))
TOPIC_IDS = {
'Doc 1': 137,
}
TOPIC_ID_TO_CATEGORY = {
137: 'doc 1',
}
CATEGORY_TO_DIRECTORY = {
'doc 1': '/mnt/disco1/test',
}
class TelegramDownloader:
def __init__(self, client):
self.client = client
self.downloaded_files = set()
self.load_downloaded_files()
self.current_download = None
def _create_download_progress_logger(self, filename):
"""Devuelve un callback que SOLO informa si ha pasado
al menos un 5 % de progreso O 10 s desde la última vez."""
start_time = time.time()
last_logged_time = start_time
last_percent_reported = -5
MIN_STEP = 5
MIN_INTERVAL = 10
def progress_bar_function(done_bytes, total_bytes):
nonlocal last_logged_time, last_percent_reported
current_time = time.time()
percent_now = int((done_bytes / total_bytes) * 100)
# ¿Ha cambiado lo suficiente?
if (percent_now - last_percent_reported < MIN_STEP and
current_time - last_logged_time < MIN_INTERVAL):
return ""
# Actualiza marcadores
last_percent_reported = percent_now
last_logged_time = current_time
# Mensaje “humano” que solo se envía cuando toca
speed = done_bytes / 1024 / 1024 / (current_time - start_time or 1)
msg = (f"
f"Progreso: {percent_now}% | "
f"Velocidad: {speed:.2f} MB/s | "
f"{done_bytes/1024/1024:.1f}/{total_bytes/1024/1024:.1f} MB")
logging.info(msg)
return msg
return progress_bar_function
async def _process_download(self, message, metadata, filename, dest_path):
try:
self.current_download = filename
logging.info(f"
#progress_logger = self._create_download_progress_logger(filename)
#
#temp_path = dest_path.with_name(metadata['file_name_telegram'])
#
#await fast_download(
# client=self.client,
# msg=message,
# reply="",
# download_folder=str(dest_path.parent) + '/',
# progress_bar_function=progress_logger
#)
progress_logger = self._create_download_progress_logger(filename)
temp_path = dest_path.with_name(metadata['file_name_telegram'])
await self.client.download_file(
message.media.document,
file=temp_path,
part_size_kb=512,
file_size=message.media.document.size,
progress_callback=progress_logger
)
if not temp_path.exists():
raise FileNotFoundError("No se encontró el archivo descargado")
temp_path.rename(dest_path)
logging.info(f"
self.save_downloaded_file(str(message.id))
logging.info(f"
except Exception as e:
logging.error(f"
if 'temp_path' in locals() and temp_path.exists():
try:
temp_path.unlink()
except:
pass
if dest_path.exists():
try:
dest_path.unlink()
except:
pass
raise
finally:
self.current_download = None
def load_downloaded_files(self):
try:
if os.path.exists('/app/data/downloaded.log'):
with open('/app/data/downloaded.log', 'r', encoding='utf-8') as f:
self.downloaded_files = set(line.strip() for line in f if line.strip())
except Exception as e:
logging.error(f"Error cargando archivos descargados: {str(e)}")
def save_downloaded_file(self, file_id):
try:
with open('/app/data/downloaded.log', 'a', encoding='utf-8') as f:
f.write(f"{file_id}\n")
self.downloaded_files.add(file_id)
except Exception as e:
logging.error(f"Error guardando archivo descargado: {str(e)}")
def parse_metadata(self, caption):
metadata = {}
try:
if not caption:
logging.info(f"
return None
pattern = r'^(\w[\w\s]*):\s*(.*?)(?=\n\w|\Z)'
matches = re.findall(pattern, caption, re.MULTILINE)
for key, value in matches:
logging.info(f"
key = key.strip().lower().replace(' ', '_')
metadata[key] = value.strip()
required_fields = [
'type', 'tmdb_id', 'file_name_telegram',
'file_name', 'folder_name', 'season_folder'
]
if not all(field in metadata for field in required_fields):
return None
if 'season' in metadata:
metadata['season'] = int(metadata['season'])
if 'episode' in metadata:
metadata['episode'] = int(metadata['episode'])
return metadata
except Exception as e:
logging.error(f"Error parseando metadata: {str(e)}")
return None
def get_destination_path(self, message, metadata):
try:
topic_id = message.reply_to.reply_to_msg_id if message.reply_to else None
if not topic_id:
logging.warning("No se pudo determinar el topic ID del mensaje")
return None
category = TOPIC_ID_TO_CATEGORY.get(topic_id)
if not category:
logging.warning(f"No se encontró categoría para el topic ID: {topic_id}")
return None
base_dir = CATEGORY_TO_DIRECTORY.get(category)
if not base_dir:
logging.warning(f"No hay directorio configurado para la categoría: {category}")
return None
filename = metadata.get('file_name')
if not filename:
logging.warning("Campo 'file_name' no encontrado en metadatos")
return None
if metadata['type'] == 'movie':
folder_name = f"{metadata['folder_name']}"
dest_dir = Path(base_dir) / folder_name
return dest_dir / filename
elif metadata['type'] == 'tv':
folder_name = f"{metadata['folder_name']}"
season_folder = metadata.get('season_folder', 'Season 01')
dest_dir = Path(base_dir) / folder_name / season_folder
return dest_dir / filename
else:
logging.warning(f"Tipo de contenido no soportado: {metadata['type']}")
return None
except Exception as e:
logging.error(f"Error determinando ruta de destino: {str(e)}")
return None
async def download_file(self, message):
try:
await asyncio.sleep(2)
if not isinstance(message.media, MessageMediaDocument):
return
if str(message.id) in self.downloaded_files:
logging.info(f"Archivo ya descargado (msg_id: {message.id})")
return
metadata = self.parse_metadata(message.message)
if not metadata:
logging.warning("No se pudieron extraer metadatos válidos")
return
if 'file_name' not in metadata or not metadata['file_name']:
logging.warning("El campo 'file_name' es obligatorio en los metadatos")
return
dest_path = self.get_destination_path(message, metadata)
if not dest_path:
return
dest_path.parent.mkdir(parents=True, exist_ok=True)
if dest_path.exists():
logging.info(f"Archivo ya existe: {dest_path}")
self.save_downloaded_file(str(message.id))
return
await self._process_download(message, metadata, metadata['file_name'], dest_path)
except Exception as e:
logging.error(f"Error descargando archivo: {str(e)}", exc_info=True)
if 'dest_path' in locals() and dest_path.exists():
try:
dest_path.unlink()
except:
pass
async def process_topic(self, topic_id, limit=None):
try:
logging.info(f"
async for message in self.client.iter_messages(
TELEGRAM_GROUP_ID,
limit=limit,
reply_to=topic_id,
wait_time=30 # Añade un tiempo de espera entre mensajes
):
try:
if message.media and isinstance(message.media, MessageMediaDocument):
await self.download_file(message)
except FloodWaitError as e:
wait_time = e.seconds + 5 # Añade 5 segundos de margen
logging.warning(f"
await asyncio.sleep(wait_time)
continue
except Exception as e:
logging.error(f"Error procesando mensaje: {str(e)}", exc_info=True)
continue
except Exception as e:
logging.error(f"Error procesando topic {topic_id}: {str(e)}", exc_info=True)
async def process_all_topics(self):
for topic_name, topic_id in TOPIC_IDS.items():
await self.process_topic(topic_id)
async def main():
try:
test_data = os.urandom(1024)
key = os.urandom(32)
iv = os.urandom(32)
encrypted = AES.encrypt_ige(test_data, key, iv)
decrypted = AES.decrypt_ige(encrypted, key, iv)
if decrypted != test_data:
raise RuntimeError("
logging.info("
except Exception as e:
logging.critical(f"
raise SystemExit(1)
os.makedirs('session', exist_ok=True)
client = TelegramClient(
TELEGRAM_SESSION_NAME,
TELEGRAM_API_ID,
TELEGRAM_API_HASH,
connection_retries=10,
auto_reconnect=True,
timeout=720,
request_retries=10,
flood_sleep_threshold=120,
system_version="4.16.30-vxCUSTOM",
device_model="HighSpeedDownloader",
lang_code="es",
system_lang_code="es",
use_ipv6=False
)
try:
await client.start()
downloader = TelegramDownloader(client)
await downloader.process_all_topics()
except Exception as e:
logging.error(f"Error on main: {str(e)}", exc_info=True)
finally:
await client.disconnect()
if __name__ == "__main__":
asyncio.run(main())
Подробнее здесь: https://stackoverflow.com/questions/796 ... prox-5mb-s