|
- #!/usr/bin/env python3
- import asyncio
- import base64
- import datetime
- import functools
- import json
- import os
- import re
- import sys
- import telethon
- try:
- import tqdm
- except ImportError:
- tqdm = None
-
-
- API_ID = os.environ['TELEGRAM_API_ID']
- API_HASH = os.environ['TELEGRAM_API_HASH']
- BOT_TOKEN = os.environ['TELEGRAM_BOT_TOKEN']
- URL_PATTERN = re.compile(r'^https?://t\.me/(?:s/)?(?P<channel>[^/]+)/(?P<message>\d+)$')
-
-
- def stuff_to_json(o):
- if isinstance(o, datetime.datetime):
- return o.isoformat()
- if isinstance(o, bytes):
- return f'binary data: {base64.b64encode(o).decode("ascii")}'
- raise TypeError(f'Object of type {type(o)} is not JSON serializable')
-
-
- def download_callback(current, total, bar = None):
- if bar is None:
- return
- bar.total = total #FIXME: Accesses undocumented attribute of tqdm
- bar.update(current - bar.n) #FIXME: https://github.com/tqdm/tqdm/issues/1264
-
-
- async def main():
- # Parse URLs
- targets = []
- for url in sys.argv[1:]:
- m = URL_PATTERN.match(url)
- if not m:
- print(f'Error: {url} is not a recognised Telegram URL', file = sys.stderr)
- sys.exit(1)
- targets.append((m['channel'], int(m['message'])))
- if not targets:
- print(f'Usage: telegram-dl.py URL [URL...]', file = sys.stderr)
- sys.exit(1)
- channelName = targets[0][0]
- if not all(x[0] == channelName for x in targets[1:]):
- print(f'Error: all URLs must be of the same channel', file = sys.stderr)
- sys.exit(1)
- ids = [x[1] for x in targets]
-
- # Let's go...
- client = telethon.TelegramClient('telegram-dl', API_ID, API_HASH)
- print('Connecting', file = sys.stderr)
- await client.start(bot_token = BOT_TOKEN)
- print('Fetching messages', file = sys.stderr)
- messages = await client.get_messages(channelName, ids = ids)
- for message in messages:
- if not message:
- continue
- print(f'Processing message {message.id}', file = sys.stderr)
- with open(f'{channelName}_{message.id}.json', 'x') as fp:
- json.dump(message.to_dict(), fp, default = stuff_to_json)
-
- if message.media and tqdm:
- bar = tqdm.tqdm(unit = 'iB', unit_divisor = 1024, unit_scale = True)
- else:
- bar = None
- try:
- await client.download_media(message, progress_callback = functools.partial(download_callback, bar = bar))
- finally:
- if bar is not None:
- bar.close()
-
-
- asyncio.run(main())
|