|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179 |
- #!/usr/bin/env python3
- import enum
- import functools
- import hashlib
- import operator
- import sys
-
-
- class ParserState(enum.Enum):
- NONE = 0
- DICTIONARY = 1
- LIST = 2
-
-
- class Placeholder:
- def __init__(self, s):
- self._s = s
-
- def __repr__(self):
- return self._s
-
-
- dictEntry = Placeholder('dict')
- listEntry = Placeholder('list')
-
-
- class CopyingFileReader:
- '''All reads to the underlying file-like object are copied to write, which must be a callable accepting the data.'''
-
- def __init__(self, fp, write):
- self.fp = fp
- self.write = write
-
- def read(self, *args, **kwargs):
- data = self.fp.read(*args, **kwargs)
- self.write(data)
- return data
-
-
- def read_str(fp, c):
- '''Reads a string from the current position with c being the first character of the length (having been read already)'''
- length = read_int(fp, c, end = b':')
- s = fp.read(length)
- if len(s) != length:
- raise ValueError
- try:
- s = s.decode('utf-8')
- except UnicodeDecodeError:
- pass
- return s
-
-
- def read_int(fp, c = b'', end = b'e'):
- '''Reads an int from the current position with c optionally being the first digit'''
- i = c
- while True:
- c = fp.read(1)
- if c == end:
- break
- elif c in b'-0123456789':
- i += c
- else:
- raise ValueError
- i = int(i.decode('ascii'))
- return i
-
-
- def read_or_stack_value(fp, stateStack, c = b''):
- if not c:
- c = fp.read(1)
- if c == b'l':
- stateStack.append(ParserState.LIST)
- return listEntry
- elif c == b'd':
- stateStack.append(ParserState.DICTIONARY)
- return dictEntry
- elif c == b'i':
- return read_int(fp)
- elif c in b'0123456789': # String value
- return read_str(fp, c)
- else:
- raise ValueError
-
-
- def bdecode(fp, display = False, infoWrite = None):
- c = fp.read(1)
- if c != b'd':
- raise ValueError
- stateStack = [ParserState.NONE, ParserState.DICTIONARY]
- idxStack = [None, None]
- out = {}
- get_o = lambda: functools.reduce(operator.getitem, idxStack[2:], out)
- inInfo = False
- print_ = print if display else (lambda *args, **kwargs: None)
- print_(f'(global): {dictEntry}')
- while stateStack:
- state = stateStack[-1]
- indent = ' ' * (len(stateStack) - 1)
- if state == ParserState.DICTIONARY:
- c = fp.read(1)
- if c == b'e': # End of dict
- stateStack.pop(-1)
- idxStack.pop(-1)
- if len(stateStack) == 2 and inInfo and infoWrite:
- inInfo = False
- fp = fp.fp
- continue
- elif c in b'0123456789': # Key
- key = read_str(fp, c)
- if len(stateStack) == 2 and key == 'info' and infoWrite: # If in global dict and this is the 'info' value and a copy is desired...
- inInfo = True
- fp = CopyingFileReader(fp, infoWrite)
- v = read_or_stack_value(fp, stateStack)
- print_(f'{indent}{key!r}: {v!r}')
- if v is dictEntry or v is listEntry:
- get_o()[key] = {} if v is dictEntry else []
- idxStack.append(key)
- else:
- get_o()[key] = v
- else:
- raise ValueError
- elif state == ParserState.LIST:
- c = fp.read(1)
- if c == b'e':
- stateStack.pop(-1)
- idxStack.pop(-1)
- continue
- else:
- v = read_or_stack_value(fp, stateStack, c)
- print_(f'{indent}- {v!r}')
- o = get_o()
- if v is dictEntry or v is listEntry:
- o.append({} if v is dictEntry else [])
- idxStack.append(len(o) - 1)
- else:
- o.append(v)
- elif state == ParserState.NONE:
- assert len(stateStack) == 1
- return out
-
-
- def print_torrent(fp):
- bdecode(fp, display = True)
-
-
- def get_info_hash(fp):
- hasher = hashlib.sha1()
- bdecode(fp, infoWrite = hasher.update)
- return hasher.hexdigest()
-
-
- def print_files(fp):
- o = bdecode(fp)
- if 'files' in o['info']:
- for f in o['info']['files']:
- print('/'.join(f['path']))
- else:
- print(o['info']['name'])
-
-
- def main():
- if len(sys.argv) < 3 or sys.argv[1] not in ('print', 'infohash', 'files'):
- print('Usage: torrent-tiny MODE FILE [FILE...]', file = sys.stderr)
- print('MODEs: print, infohash, files', file = sys.stderr)
- sys.exit(1)
-
- mode = sys.argv[1]
- for fn in sys.argv[2:]:
- with open(fn, 'rb') as fp:
- if mode == 'print':
- print_torrent(fp)
- elif mode == 'infohash':
- print(get_info_hash(fp))
- elif mode == 'files':
- print_files(fp)
-
-
- if __name__ == '__main__':
- main()
|