from __future__ import unicode_literals import struct from awdb.compat import byte_from_int, int_from_byte, int_from_bytes from awdb.errors import InvalidDatabaseError class Decoder(object): def __init__(self, database_buffer, pointer_base=0, pointer_test=False): self._pointer_test = pointer_test self._buffer = database_buffer self._pointer_base = pointer_base def _decode_array(self, size, offset): array = [] for _ in range(size): (value, offset) = self.decode(offset) array.append(value) return array, offset def _decode_boolean(self, size, offset): return size != 0, offset def _decode_bytes(self, size, offset): new_offset = offset + size return self._buffer[offset:new_offset], new_offset def _decode_double(self, size, offset): self._verify_size(size, 8) new_offset = offset + size packed_bytes = self._buffer[offset:new_offset] (value, ) = struct.unpack(b'!d', packed_bytes) return value, new_offset def _decode_float(self, size, offset): self._verify_size(size, 4) new_offset = offset + size packed_bytes = self._buffer[offset:new_offset] (value, ) = struct.unpack(b'!f', packed_bytes) return value, new_offset def _decode_int32(self, size, offset): if size == 0: return 0, offset new_offset = offset + size packed_bytes = self._buffer[offset:new_offset] if size != 4: packed_bytes = packed_bytes.rjust(4, b'\x00') (value, ) = struct.unpack(b'!i', packed_bytes) return value, new_offset def _decode_map(self, size, offset): container = {} for _ in range(size): (key, offset) = self.decode(offset) (value, offset) = self.decode(offset) if key == value: container[key] = bytes(value, 'utf-8') else: container[key] = value # print("###") # print(container) # print("###") return container, offset def _decode_pointer(self, size, offset): pointer_size = (size >> 3) + 1 buf = self._buffer[offset:offset + pointer_size] new_offset = offset + pointer_size if pointer_size == 1: buf = byte_from_int(size & 0x7) + buf pointer = struct.unpack(b'!H', buf)[0] + self._pointer_base elif pointer_size == 2: buf = b'\x00' + byte_from_int(size & 0x7) + buf pointer = struct.unpack(b'!I', buf)[0] + 2048 + self._pointer_base elif pointer_size == 3: buf = byte_from_int(size & 0x7) + buf pointer = struct.unpack(b'!I', buf)[0] + 526336 + self._pointer_base else: pointer = struct.unpack(b'!I', buf)[0] + self._pointer_base if self._pointer_test: return pointer, new_offset (value, _) = self.decode(pointer) return value, new_offset def _decode_uint(self, size, offset): new_offset = offset + size uint_bytes = self._buffer[offset:new_offset] return int_from_bytes(uint_bytes), new_offset def _decode_utf8_string(self, size, offset): new_offset = offset + size return self._buffer[offset:new_offset].decode('utf-8'), new_offset _type_decoder = { 1: _decode_pointer, 2: _decode_utf8_string, 3: _decode_double, 4: _decode_bytes, 5: _decode_uint, 6: _decode_uint, 7: _decode_map, 8: _decode_int32, 9: _decode_uint, 10: _decode_uint, 11: _decode_array, 14: _decode_boolean, 15: _decode_float, } def decode(self, offset): new_offset = offset + 1 ctrl_byte = int_from_byte(self._buffer[offset]) type_num = ctrl_byte >> 5 if not type_num: (type_num, new_offset) = self._read_extended(new_offset) try: decoder = self._type_decoder[type_num] except KeyError: raise InvalidDatabaseError('Unexpected type number ({type}) ' 'encountered'.format(type=type_num)) (size, new_offset) = self._size_from_ctrl_byte(ctrl_byte, new_offset, type_num) return decoder(self, size, new_offset) def _read_extended(self, offset): next_byte = int_from_byte(self._buffer[offset]) type_num = next_byte + 7 if type_num < 7: raise InvalidDatabaseError( 'Something went horribly wrong in the decoder. An ' 'extended type resolved to a type number < 8 ' '({type})'.format(type=type_num)) return type_num, offset + 1 def _verify_size(self, expected, actual): if expected != actual: raise InvalidDatabaseError( 'The AW DB file\'s data section contains bad data ' '(unknown data type or corrupt data)') def _size_from_ctrl_byte(self, ctrl_byte, offset, type_num): size = ctrl_byte & 0x1f if type_num == 1 or size < 29: return size, offset if size == 29: size = 29 + int_from_byte(self._buffer[offset]) return size, offset + 1 if size == 30: new_offset = offset + 2 size_bytes = self._buffer[offset:new_offset] size = 285 + struct.unpack(b'!H', size_bytes)[0] return size, new_offset new_offset = offset + 3 size_bytes = self._buffer[offset:new_offset] size = struct.unpack(b'!I', b'\x00' + size_bytes)[0] + 65821 return size, new_offset