diff --git a/README.rst b/README.rst index b37fb6f..ae20e3f 100644 --- a/README.rst +++ b/README.rst @@ -3,6 +3,8 @@ libdiana A library for talking to Artemis SBS servers. +Big thanks to https://artemis-nerds.github.io/protocol-docs/ for making this project easier to update. + .. image:: https://badge.fury.io/py/libdiana.svg :target: http://badge.fury.io/py/libdiana diff --git a/diana/encoding.py b/diana/encoding.py index d3290e8..b06b925 100644 --- a/diana/encoding.py +++ b/diana/encoding.py @@ -3,132 +3,167 @@ ENCODERS = {} def struct_encoder_for(char): - format_expr = "<{}".format(char) - def st_encode(fmt, data): - if not data: - raise ValueError("Not enough data") - return (struct.pack(format_expr, data[0]) + - encode(fmt[1:], data[1:])) - return st_encode + format_expr = "<{}".format(char) + def st_encode(fmt, data): + if not data: + raise ValueError("Not enough data") + return (struct.pack(format_expr, data[0]) + + encode(fmt[1:], data[1:])) + return st_encode for base_format in 'bBiIf': - ENCODERS[base_format] = struct_encoder_for(base_format) + ENCODERS[base_format] = struct_encoder_for(base_format) ENCODERS['s'] = struct_encoder_for('h') ENCODERS['S'] = struct_encoder_for('H') def encode_unicode_string(fmt, data): - subject = data[0] - block = subject.encode('utf-16le') - output = struct.pack(':l + "6.3" : ('unknown-5','I'), + "6.4" : ('beacon-creature-type','I'), + "6.5" : ('beacon-mode','B') + }, + #ObjectType.player_ship_upgrade : { + # }, + ObjectType.torpedo : { + "1.1" : ('x','f'), + "1.2" : ('y','f'), + "1.3" : ('z','f'), + "1.4" : ('delta-x','f'), + "1.5" : ('delta-y','f'), + "1.6" : ('delta-z','f'), + "1.7" : ('unknown-1','I'), + "1.8" : ('ordnance-type','I'), + + "2.1" : ('unknown-2','B'), # there's two bytes of bitfields... + "2.2" : ('a','B'), # im lazy, these are unknown/unused fields + "2.3" : ('aa','B'), + "2.4" : ('aaa','B'), + "2.5" : ('aaaa','B'), + "2.6" : ('bbbb','B'), + "2.7" : ('bbb','B'), + "2.8" : ('bb','B') + }, + ObjectType.weapons_console : { + "1.1" : ('homing-count','B'), + "1.2" : ('nuke-count','B'), + "1.3" : ('mine-count','B'), + "1.4" : ('emp-count','B'), + "1.5" : ('pshock-count','B'), + "1.6" : ('beacon-count','B'), + "1.7" : ('probe-count','B'), + "1.8" : ('tag-count','B'), + + "2.1" : ('tube-1-time','f'), + "2.2" : ('tube-2-time','f'), + "2.3" : ('tube-3-time','f'), + "2.4" : ('tube-4-time','f'), + "2.5" : ('tube-5-time','f'), + "2.6" : ('tube-6-time','f'), + "2.7" : ('tube-1-status','B'), + "2.8" : ('tube-2-status','B'), + + "3.1" : ('tube-3-status','B'), + "3.2" : ('tube-4-status','B'), + "3.3" : ('tube-5-status','B'), + "3.4" : ('tube-6-status','B'), + "3.5" : ('tube-1-type','B'), + "3.6" : ('tube-2-type','B'), + "3.7" : ('tube-3-type','B'), + "3.8" : ('tube-4-type','B'), + + "4.1" : ('tube-5-type','B'), + "4.2" : ('tube-6-type','B') + }, + ObjectType.asteroid : { + "1.1" : ('x','f'), + "1.2" : ('y','f'), + "1.3" : ('z','f'), + "1.4" : ('name','u'), + } + } +def itobf(x): + prefix = math.floor(x/8) + 1 + suffix = (x%8) + 1 + return str(prefix)+'.'+str(suffix) + +player_ship_upgrades_list = [ 'infusion_p_coils', 'hydrogen_ram', 'tauron_focusers', + 'carapaction_coils', 'polyphasic_capacitors', 'cetrocite_crystals', 'lateral_array', + 'ecm_starpulse', 'double_agent', 'wartime_production', 'infusion_p_coils_perm', + 'protonic_verniers', 'tauron_focusers_perm', 'regenerative_pau_grids', + 'veteran_damcon_teams', 'cetrocite_heatsinks', 'tachyon_scanners', + 'gridscan_overload', 'override_authorization', 'resupply_imperatives', + 'patrol_group', 'fast_supply', 'vanguard_refit_helm', 'vanguard_refit_weap', + 'vanguard_refit_comm', 'vanguard_refit_station', 'vanguard_refit_eng', + 'vanguard_refit_systems' ] # so many oh my g*sh + +ply_shp_upgrd = {} +i=0 +for field,ftype in (('active_','B'),('count_','B'),('time_','S')): + for upgrade in player_ship_upgrades_list: + ply_shp_upgrd[itobf(i)] = (field+upgrade,ftype) + i+=1 +# no way in hell i was writing that out one-by-one. 84 bitfields! +Object_Properties[ObjectType.player_ship_upgrade] = ply_shp_upgrd + +Object_Properties[ObjectType.mine] = Object_Properties[ObjectType.asteroid] +Object_Properties[ObjectType.blackhole] = Object_Properties[ObjectType.asteroid] def decode_obj_update_packet(packet): - entries = [] - while packet: - update_type = packet[0] - obj = {} - if update_type == 0x00: - break - elif update_type == 0x01: - _id, oid, fields_1, fields_2, fields_3, fields_4, fields_5, packet = unpack('BIBBBBB*', packet) - obj['object'] = oid - obj['type'] = ObjectType.player_vessel - if fields_1 & 0x01: - obj['tgt-weapons'], packet = unpack('I*', packet) - if fields_1 & 0x02: - obj['impulse'], packet = unpack('f*', packet) - if fields_1 & 0x04: - obj['rudder'], packet = unpack('f*', packet) - if fields_1 & 0x08: - obj['top-speed'], packet = unpack('f*', packet) - if fields_1 & 0x10: - obj['turn-rate'], packet = unpack('f*', packet) - if fields_1 & 0x20: - ab, packet = unpack('B*', packet) - obj['auto-beams'] = bool(ab) - if fields_1 & 0x40: - obj['warp'], packet = unpack('B*', packet) - if fields_1 & 0x80: - obj['energy'], packet = unpack('f*', packet) - if fields_2 & 0x01: - obj['shields-state'], packet = unpack('s*', packet) - if fields_2 & 0x02: - obj['index'], packet = unpack('I*', packet) - if fields_2 & 0x04: - obj['vtype'], packet = unpack('I*', packet) - if fields_2 & 0x08: - obj['x'], packet = unpack('f*', packet) - if fields_2 & 0x10: - obj['y'], packet = unpack('f*', packet) - if fields_2 & 0x20: - obj['z'], packet = unpack('f*', packet) - if fields_2 & 0x40: - obj['pitch'], packet = unpack('f*', packet) - if fields_2 & 0x80: - obj['roll'], packet = unpack('f*', packet) - if fields_3 & 0x01: - obj['heading'], packet = unpack('f*', packet) - if fields_3 & 0x02: - obj['speed'], packet = unpack('f*', packet) - if fields_3 & 0x04: - _unk, packet = unpack('S*', packet) - if fields_3 & 0x08: - obj['name'], packet = unpack('u*', packet) - if fields_3 & 0x10: - obj['shields'], packet = unpack('f*', packet) - if fields_3 & 0x20: - obj['shields-max'], packet = unpack('f*', packet) - if fields_3 & 0x40: - obj['shields-aft'], packet = unpack('f*', packet) - if fields_3 & 0x80: - obj['shields-aft-max'], packet = unpack('f*', packet) - if fields_4 & 0x01: - obj['docked'], packet = unpack('I*', packet) - if fields_4 & 0x02: - red_alert, packet = unpack('B*', packet) - obj['red-alert'] = bool(red_alert) - if fields_4 & 0x04: - packet = packet[4:] - if fields_4 & 0x08: - ms, packet = unpack('B*', packet) - obj['main-view'] = MainView(ms) - if fields_4 & 0x10: - obj['beam-frequency'], packet = unpack('B*', packet) - if fields_4 & 0x20: - obj['coolant-avail'], packet = unpack('B*', packet) - if fields_4 & 0x40: - obj['tgt-science'], packet = unpack('I*', packet) - if fields_4 & 0x80: - obj['tgt-captain'], packet = unpack('I*', packet) - if fields_5 & 0x01: - dt, packet = unpack('B*', packet) - obj['drive-type'] = DriveType(dt) - if fields_5 & 0x02: - obj['tgt-scan'], packet = unpack('I*', packet) - if fields_5 & 0x04: - obj['scan-progress'], packet = unpack('f*', packet) - if fields_5 & 0x08: - rv, packet = unpack('B*', packet) - obj['reverse'] = bool(rv) - if fields_5 & 0x10: - packet = packet[4:] - if fields_5 & 0x20: - packet = packet[1:] - if fields_5 & 0x40: - packet = packet[4:] - if fields_5 & 0x80: - raise ValueError('Unknown data keys for player vessel') - elif update_type == 0x02: - _id, oid, fields_1, fields_2, fields_3, packet = unpack('BIBBB*', packet) - obj['object'] = oid - obj['type'] = ObjectType.weapons_console - if fields_1 & 0x01: # TODO: use the enum here - obj['store-missile'], packet = unpack('B*', packet) - if fields_1 & 0x02: - obj['store-nuke'], packet = unpack('B*', packet) - if fields_1 & 0x04: - obj['store-mine'], packet = unpack('B*', packet) - if fields_1 & 0x08: - obj['store-emp'], packet = unpack('B*', packet) - if fields_1 & 0x10: - packet = packet[1:] - if fields_1 & 0x20: - obj['load-time-0'], packet = unpack('f*', packet) - if fields_1 & 0x40: - obj['load-time-1'], packet = unpack('f*', packet) - if fields_1 & 0x80: - obj['load-time-2'], packet = unpack('f*', packet) - if fields_2 & 0x01: - obj['load-time-3'], packet = unpack('f*', packet) - if fields_2 & 0x02: - obj['load-time-4'], packet = unpack('f*', packet) - if fields_2 & 0x04: - obj['load-time-5'], packet = unpack('f*', packet) - if fields_2 & 0x08: - ts, packet = unpack('B*', packet) - obj['status-0'] = TubeStatus(ts) - if fields_2 & 0x10: - ts, packet = unpack('B*', packet) - obj['status-1'] = TubeStatus(ts) - if fields_2 & 0x20: - ts, packet = unpack('B*', packet) - obj['status-2'] = TubeStatus(ts) - if fields_2 & 0x40: - ts, packet = unpack('B*', packet) - obj['status-3'] = TubeStatus(ts) - if fields_2 & 0x80: - ts, packet = unpack('B*', packet) - obj['status-4'] = TubeStatus(ts) - if fields_3 & 0x01: - ts, packet = unpack('B*', packet) - obj['status-5'] = TubeStatus(ts) - if fields_3 & 0x02: - ot, packet = unpack('B*', packet) - obj['contents-0'] = OrdnanceType(ot) - if fields_3 & 0x04: - ot, packet = unpack('B*', packet) - obj['contents-1'] = OrdnanceType(ot) - if fields_3 & 0x08: - ot, packet = unpack('B*', packet) - obj['contents-2'] = OrdnanceType(ot) - if fields_3 & 0x10: - ot, packet = unpack('B*', packet) - obj['contents-3'] = OrdnanceType(ot) - if fields_3 & 0x20: - ot, packet = unpack('B*', packet) - obj['contents-4'] = OrdnanceType(ot) - if fields_3 & 0x40: - ot, packet = unpack('B*', packet) - obj['contents-5'] = OrdnanceType(ot) - if fields_3 & 0x80: - raise ValueError('Unknown fields for weapons console') - elif update_type == 0x03: - _id, oid, fields_heat, fields_enrg, fields_coolant, fields_unk, packet = unpack('BIBBBB*', packet) - obj['object'] = oid - obj['type'] = ObjectType.engineering_console - if fields_unk: - raise ValueError('Undecodable fields in engineering status') - systems = (('beams', 0x01), - ('torps', 0x02), - ('sensors', 0x04), - ('maneuvering', 0x08), - ('impulse', 0x10), - ('warp', 0x20), - ('shields', 0x40), - ('shields-aft', 0x80)) - types = (('heat', fields_heat, 'f'), - ('energy', fields_enrg, 'f'), - ('coolant', fields_coolant, 'B')) - for status, mask, fmt in types: - for syst, flag in systems: - if fields_heat & flag: - obj['{}-{}'.format(status, syst)], packet = unpack(fmt + '*', packet) - elif update_type == 0x04: - _id, oid, fields_1, fields_2, fields_3, fields_4, fields_5, fields_6, packet = unpack('BIBBBBBB*', packet) - obj['object'] = oid - obj['type'] = ObjectType.other_ship - if fields_1 & 0x01: - obj['name'], packet = unpack('u*', packet) - if fields_1 & 0x02: - packet = packet[4:] - if fields_1 & 0x04: - obj['rudder'], packet = unpack('f*', packet) - if fields_1 & 0x08: - obj['max-impulse'], packet = unpack('f*', packet) - if fields_1 & 0x10: - obj['max-turn-rate'], packet = unpack('f*', packet) - if fields_1 & 0x20: - fef, packet = unpack('I*', packet) - obj['iff-friendly'] = not bool(fef) - if fields_1 & 0x40: - obj['vtype'], packet = unpack('I*', packet) - if fields_1 & 0x80: - obj['x'], packet = unpack('f*', packet) - if fields_2 & 0x01: - obj['y'], packet = unpack('f*', packet) - if fields_2 & 0x02: - obj['z'], packet = unpack('f*', packet) - if fields_2 & 0x04: - obj['pitch'], packet = unpack('f*', packet) - if fields_2 & 0x08: - obj['roll'], packet = unpack('f*', packet) - if fields_2 & 0x10: - obj['heading'], packet = unpack('f*', packet) - if fields_2 & 0x20: - obj['speed'], packet = unpack('f*', packet) - if fields_2 & 0x40: - surr, packet = unpack('B*', packet) - obj['surrender'] = bool(surr) - if fields_2 & 0x80: - packet = packet[2:] - if fields_3 & 0x01: - obj['shields'], packet = unpack('f*', packet) - if fields_3 & 0x02: - obj['shields-max'], packet = unpack('f*', packet) - if fields_3 & 0x04: - obj['shields-aft'], packet = unpack('f*', packet) - if fields_3 & 0x08: - obj['shields-aft-max'], packet = unpack('f*', packet) - if fields_3 & 0x10: - packet = packet[2:] - if fields_3 & 0x20: - packet = packet[1:] - if fields_3 & 0x40: - elt, packet = unpack('I*', packet) - obj['elite'] = unscramble_elites(elt) - if fields_3 & 0x80: - elt, packet = unpack('I*', packet) - obj['elite-active'] = unscramble_elites(elt) - if fields_4 & 0x01: - scn, packet = unpack('I*', packet) - obj['scanned'] = bool(scn) - if fields_4 & 0x02: - obj['iff-side'], packet = unpack('I*', packet) - if fields_4 & 0x04: - packet = packet[4:] - if fields_4 & 0x08: - packet = packet[1:] - if fields_4 & 0x10: - packet = packet[1:] - if fields_4 & 0x20: - packet = packet[1:] - if fields_4 & 0x40: - packet = packet[1:] - if fields_4 & 0x80: - packet = packet[4:] - if fields_5 & 0x01: - packet = packet[4:] - if fields_5 & 0x02: - packet = packet[4:] - if fields_5 & 0x04: - obj['damage-beams'], packet = unpack('f*', packet) - if fields_5 & 0x08: - obj['damage-tubes'], packet = unpack('f*', packet) - if fields_5 & 0x10: - obj['damage-sensors'], packet = unpack('f*', packet) - if fields_5 & 0x20: - obj['damage-maneuvering'], packet = unpack('f*', packet) - if fields_5 & 0x40: - obj['damage-impulse'], packet = unpack('f*', packet) - if fields_5 & 0x80: - obj['damage-warp'], packet = unpack('f*', packet) - if fields_6 & 0x01: - obj['damage-shields'], packet = unpack('f*', packet) - if fields_6 & 0x02: - obj['damage-shields'], packet = unpack('f*', packet) - if fields_6 & 0x04: - obj['shields-0'], packet = unpack('f*', packet) - if fields_6 & 0x08: - obj['shields-1'], packet = unpack('f*', packet) - if fields_6 & 0x10: - obj['shields-2'], packet = unpack('f*', packet) - if fields_6 & 0x20: - obj['shields-3'], packet = unpack('f*', packet) - if fields_6 & 0x40: - obj['shields-4'], packet = unpack('f*', packet) - if fields_6 & 0x80: - raise ValueError('Unknown data key for NPC') - elif update_type == 0x05: - _id, oid, fields_1, fields_2, packet = unpack('BIBB*', packet) - obj['object'] = oid - obj['type'] = ObjectType.base - if fields_1 & 0x01: - obj['name'], packet = unpack('u*', packet) - if fields_1 & 0x02: - obj['shields'], packet = unpack('f*', packet) - if fields_1 & 0x04: - obj['shields-aft'], packet = unpack('f*', packet) - if fields_1 & 0x08: - obj['index'], packet = unpack('I*', packet) - if fields_1 & 0x10: - obj['vtype'], packet = unpack('I*', packet) - if fields_1 & 0x20: - obj['x'], packet = unpack('f*', packet) - if fields_1 & 0x40: - obj['y'], packet = unpack('f*', packet) - if fields_1 & 0x80: - obj['z'], packet = unpack('f*', packet) - if fields_2 & 0x01: - packet = packet[4:] - if fields_2 & 0x02: - packet = packet[4:] - if fields_2 & 0x04: - packet = packet[4:] - if fields_2 & 0x08: - packet = packet[4:] - if fields_2 & 0x10: - packet = packet[1:] - if fields_2 & 0x20: - packet = packet[1:] - if fields_2 & 0xc0: - raise ValueError('Unknown data keys for base') - elif update_type == 0x06: - _id, oid, fields, packet = unpack('BIB*', packet) - obj['object'] = oid - obj['type'] = ObjectType.mine - if fields & 0x01: - obj['x'], packet = unpack('f*', packet) - if fields & 0x02: - obj['y'], packet = unpack('f*', packet) - if fields & 0x04: - obj['z'], packet = unpack('f*', packet) - if fields & 0x08: - packet = packet[4:] - if fields & 0x10: - packet = packet[4:] - if fields & 0x20: - packet = packet[4:] - if fields & 0x40: - packet = packet[4:] - if fields & 0x80: - packet = packet[4:] - elif update_type == 0x07: - _id, oid, fields, packet = unpack('BIB*', packet) - obj['object'] = oid - obj['type'] = ObjectType.anomaly - if fields & 0x01: - obj['x'], packet = unpack('f*', packet) - if fields & 0x02: - obj['y'], packet = unpack('f*', packet) - if fields & 0x04: - obj['z'], packet = unpack('f*', packet) - if fields & 0x08: - obj['name'], packet = unpack('u*', packet) - if fields & 0x10: - packet = packet[4:] - if fields & 0x20: - packet = packet[4:] - if fields & 0x40: - packet = packet[4:] - if fields & 0x80: - packet = packet[4:] - elif update_type == 0x09: - _id, oid, fields, packet = unpack('BIB*', packet) - obj['object'] = oid - obj['type'] = ObjectType.nebula - if fields & 0x01: - obj['x'], packet = unpack('f*', packet) - if fields & 0x02: - obj['y'], packet = unpack('f*', packet) - if fields & 0x04: - obj['z'], packet = unpack('f*', packet) - if fields & 0x08: - obj['red'], packet = unpack('f*', packet) - if fields & 0x10: - obj['green'], packet = unpack('f*', packet) - if fields & 0x20: - obj['blue'], packet = unpack('f*', packet) - if fields & 0x40: - packet = packet[4:] - if fields & 0x80: - packet = packet[4:] - elif update_type == 0x0a: - _id, oid, fields, packet = unpack('BIB*', packet) - obj['object'] = oid - obj['type'] = ObjectType.torpedo - if fields & 0x01: - obj['x'], packet = unpack('f*', packet) - if fields & 0x02: - obj['y'], packet = unpack('f*', packet) - if fields & 0x04: - obj['z'], packet = unpack('f*', packet) - if fields & 0x08: - packet = packet[4:] - if fields & 0x10: - packet = packet[4:] - if fields & 0x20: - packet = packet[4:] - if fields & 0x40: - packet = packet[4:] - if fields & 0x80: - packet = packet[4:] - elif update_type == 0x0b: - _id, oid, fields, packet = unpack('BIB*', packet) - obj['object'] = oid - obj['type'] = ObjectType.blackhole - if fields & 0x01: - obj['x'], packet = unpack('f*', packet) - if fields & 0x02: - obj['y'], packet = unpack('f*', packet) - if fields & 0x04: - obj['z'], packet = unpack('f*', packet) - if fields & 0x08: - packet = packet[4:] - if fields & 0x10: - packet = packet[4:] - if fields & 0x20: - packet = packet[4:] - if fields & 0x40: - packet = packet[4:] - if fields & 0x80: - packet = packet[4:] - elif update_type == 0x0c: - _id, oid, fields, packet = unpack('BIB*', packet) - obj['object'] = oid - obj['type'] = ObjectType.asteroid - if fields & 0x01: - obj['x'], packet = unpack('f*', packet) - if fields & 0x02: - obj['y'], packet = unpack('f*', packet) - if fields & 0x04: - obj['z'], packet = unpack('f*', packet) - if fields & 0x08: - packet = packet[4:] - if fields & 0x10: - packet = packet[4:] - if fields & 0x20: - packet = packet[4:] - if fields & 0x40: - packet = packet[4:] - if fields & 0x80: - packet = packet[4:] - elif update_type == 0x0e: - _id, oid, fields, packet = unpack('BIB*', packet) - obj['object'] = oid - obj['type'] = ObjectType.monster - if fields & 0x01: - obj['x'], packet = unpack('f*', packet) - if fields & 0x02: - obj['y'], packet = unpack('f*', packet) - if fields & 0x04: - obj['z'], packet = unpack('f*', packet) - if fields & 0x08: - obj['name'], packet = unpack('u*', packet) - if fields & 0x10: - packet = packet[4:] - if fields & 0x20: - packet = packet[4:] - if fields & 0x40: - packet = packet[4:] - if fields & 0x80: - packet = packet[4:] - elif update_type == 0x0f: - _id, oid, fields_1, fields_2, packet = unpack('BIBB*', packet) - obj['object'] = oid - obj['type'] = ObjectType.whale - if fields_1 & 0x01: - obj['name'], packet = unpack('u*', packet) - if fields_1 & 0x02: - packet = packet[4:] - if fields_1 & 0x04: - packet = packet[4:] - if fields_1 & 0x08: - obj['x'], packet = unpack('f*', packet) - if fields_1 & 0x10: - obj['y'], packet = unpack('f*', packet) - if fields_1 & 0x20: - obj['z'], packet = unpack('f*', packet) - if fields_1 & 0x40: - obj['pitch'], packet = unpack('f*', packet) - if fields_1 & 0x80: - obj['roll'], packet = unpack('f*', packet) - if fields_2 & 0x01: - obj['heading'], packet = unpack('f*', packet) - if fields_2 & 0x02: - packet = packet[4:] - if fields_2 & 0x04: - packet = packet[4:] - if fields_2 & 0x08: - packet = packet[4:] - if fields_2 & 0x10: - packet = packet[4:] - if fields_2 & 0xe0: - raise ValueError('Unknown data keys for whale') - elif update_type == 0x10: - _id, oid, fields_1, fields_2, packet = unpack('BIBB*', packet) - obj['object'] = oid - obj['type'] = ObjectType.drone - if fields_1 & 0x01: - packet = packet[4:] - if fields_1 & 0x02: - obj['x'], packet = unpack('f*', packet) - if fields_1 & 0x04: - packet = packet[4:] - if fields_1 & 0x08: - obj['z'], packet = unpack('f*', packet) - if fields_1 & 0x10: - packet = packet[4:] - if fields_1 & 0x20: - obj['y'], packet = unpack('f*', packet) - if fields_1 & 0x40: - obj['heading'], packet = unpack('f*', packet) - if fields_1 & 0x80: - packet = packet[4:] - if fields_2: - raise ValueError('Unknown data keys for drone') - else: - raise ValueError('Unknown object type {}'.format(update_type)) - entries.append(obj) - return entries + entries = [] + while packet: + update_type = packet[0] + obj = {} + if update_type == 0x00: + break + if ObjectType(update_type) in Object_Properties.keys(): + properties = Object_Properties[ObjectType(update_type)] + fieldcount = max([int(fk.split('.')[0]) for fk in properties.keys()]) + unpacked = list(unpack('BIfbs*'.replace('fbs','B'*fieldcount),packet)) + # _id = unpacked[0] + # oid = unpacked[1] + # fields_k = unpacked[1+k] + # packet = unpacked[2+fieldcount] + obj['object'] = unpacked[1] # oid + obj['type'] = ObjectType(update_type) + for fdotb in properties: + prop = properties[fdotb] + f,b = [int(k) for k in fdotb.split(".")] # "byte.bit" -> (byte, bit) + propname, proptype = prop + if unpacked[1+f] & 2**(b-1): # field_f & (bit b) + value,pktremaining = unpack( + '%s*'%proptype, unpacked[2+fieldcount] + ) + obj[propname] = value + unpacked[2+fieldcount] = pktremaining + packet = unpacked[2+fieldcount] + else: + raise ValueError('Unknown object type {}'.format(update_type)) + entries.append(obj) + return entries + + # From here-on is the old object_update code + + while packet: + update_type = packet[0] + obj = {} + if update_type == 0x00: # end of ObjectUpdatePacket + break + elif update_type == ObjectType.player_vessel.value: # player ship + _id, oid, fields_1, fields_2, fields_3, fields_4, fields_5, fields_6, packet = unpack('BIBBBBBB*', packet) + obj['object'] = oid + obj['type'] = ObjectType.player_vessel + if fields_1 & 0x01: + obj['tgt-weapons'], packet = unpack('I*', packet) + if fields_1 & 0x02: + obj['impulse'], packet = unpack('f*', packet) + if fields_1 & 0x04: + obj['rudder'], packet = unpack('f*', packet) + if fields_1 & 0x08: + obj['top-speed'], packet = unpack('f*', packet) + if fields_1 & 0x10: + obj['turn-rate'], packet = unpack('f*', packet) + if fields_1 & 0x20: + ab, packet = unpack('B*', packet) + obj['auto-beams'] = bool(ab) + if fields_1 & 0x40: + obj['warp'], packet = unpack('B*', packet) + if fields_1 & 0x80: + obj['energy'], packet = unpack('f*', packet) + + if fields_2 & 0x01: + obj['shields-state'], packet = unpack('S*', packet) + if fields_2 & 0x02: + packet = packet[4:] #now unknown + #obj['index'], packet = unpack('I*', packet) + if fields_2 & 0x04: + obj['vtype'], packet = unpack('I*', packet) + if fields_2 & 0x08: + obj['x'], packet = unpack('f*', packet) + if fields_2 & 0x10: + obj['y'], packet = unpack('f*', packet) + if fields_2 & 0x20: + obj['z'], packet = unpack('f*', packet) + if fields_2 & 0x40: + obj['pitch'], packet = unpack('f*', packet) + if fields_2 & 0x80: + obj['roll'], packet = unpack('f*', packet) + + if fields_3 & 0x01: + obj['heading'], packet = unpack('f*', packet) + if fields_3 & 0x02: + obj['speed'], packet = unpack('f*', packet) + if fields_3 & 0x04: + obj['in-a-nebula'], packet = unpack('S*', packet) + if fields_3 & 0x08: + obj['name'], packet = unpack('u*', packet) + if fields_3 & 0x10: + obj['shields'], packet = unpack('f*', packet) + if fields_3 & 0x20: + obj['shields-max'], packet = unpack('f*', packet) + if fields_3 & 0x40: + obj['shields-aft'], packet = unpack('f*', packet) + if fields_3 & 0x80: + obj['shields-aft-max'], packet = unpack('f*', packet) + + if fields_4 & 0x01: + obj['docked'], packet = unpack('I*', packet) + if fields_4 & 0x02: + red_alert, packet = unpack('B*', packet) + obj['red-alert'] = bool(red_alert) + if fields_4 & 0x04: + packet = packet[4:] # unknown + if fields_4 & 0x08: + ms, packet = unpack('B*', packet) + obj['main-view'] = MainView(ms) + if fields_4 & 0x10: + obj['beam-frequency'], packet = unpack('B*', packet) + if fields_4 & 0x20: + obj['coolant-avail'], packet = unpack('B*', packet) + if fields_4 & 0x40: + obj['tgt-science'], packet = unpack('I*', packet) + if fields_4 & 0x80: + obj['tgt-captain'], packet = unpack('I*', packet) + + if fields_5 & 0x01: + dt, packet = unpack('B*', packet) + obj['drive-type'] = DriveType(dt) + if fields_5 & 0x02: + obj['tgt-scan'], packet = unpack('I*', packet) + if fields_5 & 0x04: + obj['scan-progress'], packet = unpack('f*', packet) + if fields_5 & 0x08: + rv, packet = unpack('B*', packet) + obj['reverse'] = bool(rv) + if fields_5 & 0x10: + packet = packet[4:] # float + if fields_5 & 0x20: + packet = packet[1:] + if fields_5 & 0x40: + packet = packet[4:] # int + if fields_5 & 0x80: + obj['ship-index?'], packet = unpack('B*', packet) + + if fields_6 & 0x01: + obj['capital-ship-obj-id'], packet = unpack('I*', packet) + if fields_6 & 0x02: + obj['accent-colour'], packet = unpack('f*', packet) + if fields_6 & 0x04: + packet = packet[4:] # unknown + if fields_6 & 0x08: + obj['beacon-creature-type'], packet = unpack('I*', packet) + if fields_6 & 0x10: + obj['beacon-mode'], packet = unpack('B*', packet) + if fields_6 & 0x20: + raise ValueError('Unknown data keys for player vessel') + if fields_6 & 0x40: + raise ValueError('Unknown data keys for player vessel') + if fields_6 & 0x80: + raise ValueError('Unknown data keys for player vessel') + elif update_type == ObjectType.weapons_console.value: # weapons console + _id, oid, fields_1, fields_2, fields_3, packet = unpack('BIBBB*', packet) + obj['object'] = oid + obj['type'] = ObjectType.weapons_console + if fields_1 & 0x01: # TODO: use the enum here + obj['store-missile'], packet = unpack('B*', packet) + if fields_1 & 0x02: + obj['store-nuke'], packet = unpack('B*', packet) + if fields_1 & 0x04: + obj['store-mine'], packet = unpack('B*', packet) + if fields_1 & 0x08: + obj['store-emp'], packet = unpack('B*', packet) + if fields_1 & 0x10: + packet = packet[1:] + if fields_1 & 0x20: + obj['load-time-0'], packet = unpack('f*', packet) + if fields_1 & 0x40: + obj['load-time-1'], packet = unpack('f*', packet) + if fields_1 & 0x80: + obj['load-time-2'], packet = unpack('f*', packet) + if fields_2 & 0x01: + obj['load-time-3'], packet = unpack('f*', packet) + if fields_2 & 0x02: + obj['load-time-4'], packet = unpack('f*', packet) + if fields_2 & 0x04: + obj['load-time-5'], packet = unpack('f*', packet) + if fields_2 & 0x08: + ts, packet = unpack('B*', packet) + obj['status-0'] = TubeStatus(ts) + if fields_2 & 0x10: + ts, packet = unpack('B*', packet) + obj['status-1'] = TubeStatus(ts) + if fields_2 & 0x20: + ts, packet = unpack('B*', packet) + obj['status-2'] = TubeStatus(ts) + if fields_2 & 0x40: + ts, packet = unpack('B*', packet) + obj['status-3'] = TubeStatus(ts) + if fields_2 & 0x80: + ts, packet = unpack('B*', packet) + obj['status-4'] = TubeStatus(ts) + if fields_3 & 0x01: + ts, packet = unpack('B*', packet) + obj['status-5'] = TubeStatus(ts) + if fields_3 & 0x02: + ot, packet = unpack('B*', packet) + obj['contents-0'] = OrdnanceType(ot) + if fields_3 & 0x04: + ot, packet = unpack('B*', packet) + obj['contents-1'] = OrdnanceType(ot) + if fields_3 & 0x08: + ot, packet = unpack('B*', packet) + obj['contents-2'] = OrdnanceType(ot) + if fields_3 & 0x10: + ot, packet = unpack('B*', packet) + obj['contents-3'] = OrdnanceType(ot) + if fields_3 & 0x20: + ot, packet = unpack('B*', packet) + obj['contents-4'] = OrdnanceType(ot) + if fields_3 & 0x40: + ot, packet = unpack('B*', packet) + obj['contents-5'] = OrdnanceType(ot) + if fields_3 & 0x80: + raise ValueError('Unknown fields for weapons console') + elif update_type == ObjectType.engineering_console.value: # engineering console + _id, oid, fields_heat, fields_enrg, fields_coolant, fields_unk, packet = unpack('BIBBBB*', packet) + obj['object'] = oid + obj['type'] = ObjectType.engineering_console + if fields_unk: + raise ValueError('Undecodable fields in engineering status') + systems = (('beams', 0x01), + ('torps', 0x02), + ('sensors', 0x04), + ('maneuvering', 0x08), + ('impulse', 0x10), + ('warp', 0x20), + ('shields', 0x40), + ('shields-aft', 0x80)) + types = (('heat', fields_heat, 'f'), + ('energy', fields_enrg, 'f'), + ('coolant', fields_coolant, 'B')) + for status, mask, fmt in types: + for syst, flag in systems: + if fields_heat & flag: + obj['{}-{}'.format(status, syst)], packet = unpack(fmt + '*', packet) + elif update_type == ObjectType.player_ship_upgrade.value: # player ship upgrades + _id, oid, fields, packet = unpack('BIB*', packet) + obj['object'] = oid + obj['type'] = ObjectType.player_ship_upgrade + if fields & 0x01: + obj['x'], packet = unpack('f*', packet) + if fields & 0x02: + obj['y'], packet = unpack('f*', packet) + if fields & 0x04: + obj['z'], packet = unpack('f*', packet) + if fields & 0x08: + obj['upgrade'], packet = unpack('I*', packet) + if fields & 0x10: + packet = packet[4:] + if fields & 0x20: + packet = packet[4:] + if fields & 0x40: + packet = packet[4:] + if fields & 0x80: + packet = packet[4:] + elif update_type == ObjectType.other_ship.value: # NPC ship + _id, oid, fields_1, fields_2, fields_3, fields_4, fields_5, fields_6, packet = unpack('BIBBBBBB*', packet) + obj['object'] = oid + obj['type'] = ObjectType.other_ship + if fields_1 & 0x01: + obj['name'], packet = unpack('u*', packet) + if fields_1 & 0x02: + packet = packet[4:] + if fields_1 & 0x04: + obj['rudder'], packet = unpack('f*', packet) + if fields_1 & 0x08: + obj['max-impulse'], packet = unpack('f*', packet) + if fields_1 & 0x10: + obj['max-turn-rate'], packet = unpack('f*', packet) + if fields_1 & 0x20: + fef, packet = unpack('I*', packet) + obj['iff-friendly'] = not bool(fef) + if fields_1 & 0x40: + obj['vtype'], packet = unpack('I*', packet) + if fields_1 & 0x80: + obj['x'], packet = unpack('f*', packet) + if fields_2 & 0x01: + obj['y'], packet = unpack('f*', packet) + if fields_2 & 0x02: + obj['z'], packet = unpack('f*', packet) + if fields_2 & 0x04: + obj['pitch'], packet = unpack('f*', packet) + if fields_2 & 0x08: + obj['roll'], packet = unpack('f*', packet) + if fields_2 & 0x10: + obj['heading'], packet = unpack('f*', packet) + if fields_2 & 0x20: + obj['speed'], packet = unpack('f*', packet) + if fields_2 & 0x40: + surr, packet = unpack('B*', packet) + obj['surrender'] = bool(surr) + if fields_2 & 0x80: + packet = packet[2:] + if fields_3 & 0x01: + obj['shields'], packet = unpack('f*', packet) + if fields_3 & 0x02: + obj['shields-max'], packet = unpack('f*', packet) + if fields_3 & 0x04: + obj['shields-aft'], packet = unpack('f*', packet) + if fields_3 & 0x08: + obj['shields-aft-max'], packet = unpack('f*', packet) + if fields_3 & 0x10: + packet = packet[2:] + if fields_3 & 0x20: + packet = packet[1:] + if fields_3 & 0x40: + elt, packet = unpack('I*', packet) + obj['elite'] = unscramble_elites(elt) + if fields_3 & 0x80: + elt, packet = unpack('I*', packet) + obj['elite-active'] = unscramble_elites(elt) + if fields_4 & 0x01: + scn, packet = unpack('I*', packet) + obj['scanned'] = bool(scn) + if fields_4 & 0x02: + obj['iff-side'], packet = unpack('I*', packet) + if fields_4 & 0x04: + packet = packet[4:] + if fields_4 & 0x08: + packet = packet[1:] + if fields_4 & 0x10: + packet = packet[1:] + if fields_4 & 0x20: + packet = packet[1:] + if fields_4 & 0x40: + packet = packet[1:] + if fields_4 & 0x80: + packet = packet[4:] + if fields_5 & 0x01: + packet = packet[4:] + if fields_5 & 0x02: + packet = packet[4:] + if fields_5 & 0x04: + obj['damage-beams'], packet = unpack('f*', packet) + if fields_5 & 0x08: + obj['damage-tubes'], packet = unpack('f*', packet) + if fields_5 & 0x10: + obj['damage-sensors'], packet = unpack('f*', packet) + if fields_5 & 0x20: + obj['damage-maneuvering'], packet = unpack('f*', packet) + if fields_5 & 0x40: + obj['damage-impulse'], packet = unpack('f*', packet) + if fields_5 & 0x80: + obj['damage-warp'], packet = unpack('f*', packet) + if fields_6 & 0x01: + obj['damage-shields'], packet = unpack('f*', packet) + if fields_6 & 0x02: + obj['damage-shields'], packet = unpack('f*', packet) + if fields_6 & 0x04: + obj['shields-0'], packet = unpack('f*', packet) + if fields_6 & 0x08: + obj['shields-1'], packet = unpack('f*', packet) + if fields_6 & 0x10: + obj['shields-2'], packet = unpack('f*', packet) + if fields_6 & 0x20: + obj['shields-3'], packet = unpack('f*', packet) + if fields_6 & 0x40: + obj['shields-4'], packet = unpack('f*', packet) + if fields_6 & 0x80: + raise ValueError('Unknown data key for NPC') + elif update_type == ObjectType.base.value: # base + _id, oid, fields_1, fields_2, packet = unpack('BIBB*', packet) + obj['object'] = oid + obj['type'] = ObjectType.base + if fields_1 & 0x01: + obj['name'], packet = unpack('u*', packet) + if fields_1 & 0x02: + obj['shields'], packet = unpack('f*', packet) + if fields_1 & 0x04: + obj['shields-aft'], packet = unpack('f*', packet) + if fields_1 & 0x08: + obj['index'], packet = unpack('I*', packet) + if fields_1 & 0x10: + obj['vtype'], packet = unpack('I*', packet) + if fields_1 & 0x20: + obj['x'], packet = unpack('f*', packet) + if fields_1 & 0x40: + obj['y'], packet = unpack('f*', packet) + if fields_1 & 0x80: + obj['z'], packet = unpack('f*', packet) + if fields_2 & 0x01: + packet = packet[4:] + if fields_2 & 0x02: + packet = packet[4:] + if fields_2 & 0x04: + packet = packet[4:] + if fields_2 & 0x08: + packet = packet[4:] + if fields_2 & 0x10: + packet = packet[1:] + if fields_2 & 0x20: + packet = packet[1:] + if fields_2 & 0xc0: + raise ValueError('Unknown data keys for base') + elif update_type == ObjectType.mine.value: # mine + _id, oid, fields, packet = unpack('BIB*', packet) + obj['object'] = oid + obj['type'] = ObjectType.mine + if fields & 0x01: + obj['x'], packet = unpack('f*', packet) + if fields & 0x02: + obj['y'], packet = unpack('f*', packet) + if fields & 0x04: + obj['z'], packet = unpack('f*', packet) + if fields & 0x08: + packet = packet[4:] + if fields & 0x10: + packet = packet[4:] + if fields & 0x20: + packet = packet[4:] + if fields & 0x40: + packet = packet[4:] + if fields & 0x80: + packet = packet[4:] + elif update_type == ObjectType.anomaly.value: # anomaly + _id, oid, fields, packet = unpack('BIB*', packet) + obj['object'] = oid + obj['type'] = ObjectType.anomaly + if fields & 0x01: + obj['x'], packet = unpack('f*', packet) + if fields & 0x02: + obj['y'], packet = unpack('f*', packet) + if fields & 0x04: + obj['z'], packet = unpack('f*', packet) + if fields & 0x08: + obj['upgrade'], packet = unpack('I*', packet) + if fields & 0x10: + packet = packet[4:] + if fields & 0x20: + packet = packet[4:] + if fields & 0x40: + packet = packet[4:] + if fields & 0x80: + packet = packet[4:] + elif update_type == 0x09: # unused + _id, oid, fields, packet = unpack('BIB*', packet) + obj['object'] = oid + obj['type'] = ObjectType.anomaly + if fields & 0x01: + obj['x'], packet = unpack('f*', packet) + if fields & 0x02: + obj['y'], packet = unpack('f*', packet) + if fields & 0x04: + obj['z'], packet = unpack('f*', packet) + if fields & 0x08: + obj['name'], packet = unpack('u*', packet) + if fields & 0x10: + packet = packet[4:] + if fields & 0x20: + packet = packet[4:] + if fields & 0x40: + packet = packet[4:] + if fields & 0x80: + packet = packet[4:] + elif update_type == ObjectType.nebula.value: # nebula + _id, oid, fields, packet = unpack('BIB*', packet) + obj['object'] = oid + obj['type'] = ObjectType.nebula + if fields & 0x01: + obj['x'], packet = unpack('f*', packet) + if fields & 0x02: + obj['y'], packet = unpack('f*', packet) + if fields & 0x04: + obj['z'], packet = unpack('f*', packet) + if fields & 0x08: + obj['red'], packet = unpack('f*', packet) + if fields & 0x10: + obj['green'], packet = unpack('f*', packet) + if fields & 0x20: + obj['blue'], packet = unpack('f*', packet) + if fields & 0x40: + packet = packet[4:] + if fields & 0x80: + packet = packet[4:] + elif update_type == ObjectType.torpedo.value: # torpedo + _id, oid, fields, packet = unpack('BIB*', packet) + obj['object'] = oid + obj['type'] = ObjectType.torpedo + if fields & 0x01: + obj['x'], packet = unpack('f*', packet) + if fields & 0x02: + obj['y'], packet = unpack('f*', packet) + if fields & 0x04: + obj['z'], packet = unpack('f*', packet) + if fields & 0x08: + packet = packet[4:] + if fields & 0x10: + packet = packet[4:] + if fields & 0x20: + packet = packet[4:] + if fields & 0x40: + packet = packet[4:] + if fields & 0x80: + packet = packet[4:] + elif update_type == ObjectType.blackhole.value: # black hole + _id, oid, fields, packet = unpack('BIB*', packet) + obj['object'] = oid + obj['type'] = ObjectType.blackhole + if fields & 0x01: + obj['x'], packet = unpack('f*', packet) + if fields & 0x02: + obj['y'], packet = unpack('f*', packet) + if fields & 0x04: + obj['z'], packet = unpack('f*', packet) + if fields & 0x08: + packet = packet[4:] + if fields & 0x10: + packet = packet[4:] + if fields & 0x20: + packet = packet[4:] + if fields & 0x40: + packet = packet[4:] + if fields & 0x80: + packet = packet[4:] + elif update_type == ObjectType.asteroid.value: # asteroid + _id, oid, fields, packet = unpack('BIB*', packet) + obj['object'] = oid + obj['type'] = ObjectType.asteroid + if fields & 0x01: + obj['x'], packet = unpack('f*', packet) + if fields & 0x02: + obj['y'], packet = unpack('f*', packet) + if fields & 0x04: + obj['z'], packet = unpack('f*', packet) + if fields & 0x08: + packet = packet[4:] + if fields & 0x10: + packet = packet[4:] + if fields & 0x20: + packet = packet[4:] + if fields & 0x40: + packet = packet[4:] + if fields & 0x80: + packet = packet[4:] + elif update_type == ObjectType.mesh.value: # generic mesh + _id, oid, fields, packet = unpack('BIB*', packet) + obj['object'] = oid + obj['type'] = ObjectType.mesh + if fields & 0x01: + obj['x'], packet = unpack('f*', packet) + if fields & 0x02: + obj['y'], packet = unpack('f*', packet) + if fields & 0x04: + obj['z'], packet = unpack('f*', packet) + if fields & 0x08: + packet = packet[4:] + if fields & 0x10: + packet = packet[4:] + if fields & 0x20: + packet = packet[4:] + if fields & 0x40: + packet = packet[4:] + if fields & 0x80: + packet = packet[4:] + elif update_type == ObjectType.creature.value: # creature + _id, oid, fields, packet = unpack('BIB*', packet) + obj['object'] = oid + obj['type'] = ObjectType.monster + if fields & 0x01: + obj['x'], packet = unpack('f*', packet) + if fields & 0x02: + obj['y'], packet = unpack('f*', packet) + if fields & 0x04: + obj['z'], packet = unpack('f*', packet) + if fields & 0x08: + obj['name'], packet = unpack('u*', packet) + if fields & 0x10: + packet = packet[4:] + if fields & 0x20: + packet = packet[4:] + if fields & 0x40: + packet = packet[4:] + if fields & 0x80: + packet = packet[4:] + elif update_type == ObjectType.drone.value: # drone + _id, oid, fields_1, fields_2, packet = unpack('BIBB*', packet) + obj['object'] = oid + obj['type'] = ObjectType.drone + if fields_1 & 0x01: + packet = packet[4:] + if fields_1 & 0x02: + obj['x'], packet = unpack('f*', packet) + if fields_1 & 0x04: + packet = packet[4:] + if fields_1 & 0x08: + obj['z'], packet = unpack('f*', packet) + if fields_1 & 0x10: + packet = packet[4:] + if fields_1 & 0x20: + obj['y'], packet = unpack('f*', packet) + if fields_1 & 0x40: + obj['heading'], packet = unpack('f*', packet) + if fields_1 & 0x80: + packet = packet[4:] + if fields_2: + raise ValueError('Unknown data keys for drone') + else: + raise ValueError('Unknown object type {}'.format(update_type)) + entries.append(obj) + return entries diff --git a/diana/packet.py b/diana/packet.py index c1ba68f..0211d29 100644 --- a/diana/packet.py +++ b/diana/packet.py @@ -6,882 +6,882 @@ from .enumerations import * def pack(fmt, *args): - return base_pack(fmt, args) + return base_pack(fmt, args) class SoftDecodeFailure(RuntimeError): - pass + pass PACKETS = {} def packet(n): - def wrapper(cls): - PACKETS[n] = cls - cls.packet_id = n - return cls - return wrapper + def wrapper(cls): + PACKETS[n] = cls + cls.packet_id = n + return cls + return wrapper class UndecodedPacket: - def __init__(self, packet_id, data): - self.packet_id = packet_id - self.data = data + def __init__(self, packet_id, data): + self.packet_id = packet_id + self.data = data - def encode(self): - return self.data + def encode(self): + return self.data - @classmethod - def decode(cls, data): - return cls(0, data) + @classmethod + def decode(cls, data): + return cls(0, data) - def __str__(self): - return "".format(self.packet_id, self.data) + def __str__(self): + return "".format(self.packet_id, self.data) @packet(0x6d04b3da) class WelcomePacket: - def __init__(self, message=''): - self.message = message + def __init__(self, message=''): + self.message = message - def encode(self): - encoded_message = self.message.encode('ascii') - return struct.pack('".format(self.message) + def __str__(self): + return "".format(self.message) @packet(0xe548e74a) class VersionPacket: - def __init__(self, major, minor, patch): - self.major = major - self.minor = minor - self.patch = patch + def __init__(self, major, minor, patch): + self.major = major + self.minor = minor + self.patch = patch - def encode(self): - return pack('IfIII', 0, - float('{}.{}'.format(self.major, self.minor)), - self.major, self.minor, self.patch) + def encode(self): + return pack('IfIII', 0, + float('{}.{}'.format(self.major, self.minor)), + self.major, self.minor, self.patch) - @classmethod - def decode(cls, packet): - unknown_1, legacy_version, major, minor, patch = unpack('IfIII', packet) - return cls(major, minor, patch) + @classmethod + def decode(cls, packet): + unknown_1, legacy_version, major, minor, patch = unpack('IfIII', packet) + return cls(major, minor, patch) - def __str__(self): - return "".format(self.major, self.minor, self.patch) + def __str__(self): + return "".format(self.major, self.minor, self.patch) @packet(0x3de66711) class DifficultyPacket: - def __init__(self, difficulty, game_type): - self.difficulty = difficulty - self.game_type = game_type + def __init__(self, difficulty, game_type): + self.difficulty = difficulty + self.game_type = game_type - def encode(self): - return pack('II', self.difficulty, self.game_type.value) + def encode(self): + return pack('II', self.difficulty, self.game_type.value) - @classmethod - def decode(cls, packet): - difficulty, game_type_raw = unpack('II', packet) - return cls(difficulty, GameType(game_type_raw)) + @classmethod + def decode(cls, packet): + difficulty, game_type_raw = unpack('II', packet) + return cls(difficulty, GameType(game_type_raw)) - def __str__(self): - return "".format(self.difficulty, self.game_type) + def __str__(self): + return "".format(self.difficulty, self.game_type) @packet(0x19c6e2d4) class ConsoleStatusPacket: - def __init__(self, ship, consoles): - self.consoles = {key: consoles.get(key, ConsoleStatus.available) for key in Console} - self.ship = ship - - def encode(self): - return pack('I[B]', self.ship, - [(self.consoles[console].value,) for console in Console]) - - @classmethod - def decode(cls, packet): - ship, body = unpack('I[B]', packet) - body = [x[0] for x in body] - if len(body) != len(Console): - raise ValueError("Incorrect console count ({}, should be {})".format(len(body), len(Console))) - consoles = {console: ConsoleStatus(body[console.value]) for console in Console} - return cls(ship, consoles) - - def __str__(self): - return ''.format(self.ship, - {console: status - for console, status in self.consoles.items() - if status != ConsoleStatus.available}) + def __init__(self, ship, consoles): + self.consoles = {key: consoles.get(key, ConsoleStatus.available) for key in Console} + self.ship = ship + + def encode(self): + return pack('I[B]', self.ship, + [(self.consoles[console].value,) for console in Console]) + + @classmethod + def decode(cls, packet): + ship, body = unpack('I[B]', packet) + body = [x[0] for x in body] + if len(body) != len(Console): + raise ValueError("Incorrect console count ({}, should be {})".format(len(body), len(Console))) + consoles = {console: ConsoleStatus(body[console.value]) for console in Console} + return cls(ship, consoles) + + def __str__(self): + return ''.format(self.ship, + {console: status + for console, status in self.consoles.items() + if status != ConsoleStatus.available}) @packet(0xf5821226) class HeartbeatPacket: - def encode(self): - return b'' + def encode(self): + return b'' - @classmethod - def decode(cls, packet): - if packet != b'': - raise ValueError('Payload in heartbeat') - return cls() + @classmethod + def decode(cls, packet): + if packet != b'': + raise ValueError('Payload in heartbeat') + return cls() - def __str__(self): - return "" + def __str__(self): + return "" @packet(0xee665279) class IntelPacket: - def __init__(self, object, intel): - self.object = object - self.intel = intel + def __init__(self, object, intel): + self.object = object + self.intel = intel - def encode(self): - return pack('Ibu', self.object, 3, self.intel) + def encode(self): + return pack('Ibu', self.object, 3, self.intel) - @classmethod - def decode(cls, packet): - object, _unk, intel = unpack('Ibu', packet) - return cls(object, intel) + @classmethod + def decode(cls, packet): + object, _unk, intel = unpack('Ibu', packet) + return cls(object, intel) - def __str__(self): - return ''.format(self.object, self.intel) + def __str__(self): + return ''.format(self.object, self.intel) @packet(0xd672c35f) class CommsIncomingPacket: - def __init__(self, priority, sender, message): - self.priority = priority - self.sender = sender - self.message = message + def __init__(self, priority, sender, message): + self.priority = priority + self.sender = sender + self.message = message - def encode(self): - return pack('Iuu', self.priority, self.sender, self.message.replace('\n', '^')) + def encode(self): + return pack('Suu', self.priority, self.sender, self.message.replace('\n', '^')) - @classmethod - def decode(cls, packet): - prio, sender, message = unpack('Iuu', packet) - return cls(prio, sender, message.replace('^', '\n')) + @classmethod + def decode(cls, packet): + prio, sender, message = unpack('Suu', packet) + return cls(prio, sender, message.replace('^', '\n')) - def __str__(self): - return ''.format(self.priority, self.sender, self.message) + def __str__(self): + return ''.format(self.priority, self.sender, self.message) @packet(0x80803df9) class ObjectUpdatePacket: - def __init__(self, raw_data): - self.raw_data = raw_data - - @property - def _records(self): - return decode_obj_update_packet(self.raw_data) - - @property - def records(self): - try: - return self._records - except Exception: - return [] - - @classmethod - def decode(cls, packet): - if packet == b'\x00\x00\x00\x00': - return NoisePacket() - return cls(packet) - - def encode(self): - return self.raw_data - - def __str__(self): - try: - records = repr(self._records) - return ''.format(records) - except Exception as e: - return ''.format(self.raw_data, e) + def __init__(self, raw_data): + self.raw_data = raw_data + + @property + def _records(self): + return decode_obj_update_packet(self.raw_data) + + @property + def records(self): + try: + return self._records + except Exception: + return [] + + @classmethod + def decode(cls, packet): + if packet == b'\x00\x00\x00\x00': + return NoisePacket() + return cls(packet) + + def encode(self): + return self.raw_data + + def __str__(self): + try: + records = repr(self._records) + return ''.format(records) + except Exception as e: + return ''.format(self.raw_data, e) class NoisePacket: - def __init__(self): - self.packet_id = 0x80803df9 + def __init__(self): + self.packet_id = 0x80803df9 - def encode(self): - return b'\x00\x00\x00\x00' + def encode(self): + return b'\x00\x00\x00\x00' - def __str__(self): - return '' + def __str__(self): + return '' @packet(0xcc5a3e30) class DestroyObjectPacket: - def __init__(self, type, object): - self.type = type - self.object = object + def __init__(self, type, object): + self.type = type + self.object = object - def encode(self): - return pack('BI', self.type.value, self.object) + def encode(self): + return pack('BI', self.type.value, self.object) - @classmethod - def decode(cls, packet): - type, object = unpack('BI', packet) - return cls(type=ObjectType(type), object=object) + @classmethod + def decode(cls, packet): + type, object = unpack('BI', packet) + return cls(type=ObjectType(type), object=object) - def __str__(self): - return ''.format(self.type, self.object) + def __str__(self): + return ''.format(self.type, self.object) @packet(0xf754c8fe) class GameMessagePacket: - @classmethod - def decode(cls, packet): - if not packet: - raise ValueError('No payload in game message') - subtype_index = packet[0] - if subtype_index == 0: - return GameStartPacket.decode(packet) - if subtype_index == 6: - return GameEndPacket.decode(packet) - if subtype_index == 9: - return SkyboxPacket.decode(packet) - if subtype_index == 10: - return PopupPacket.decode(packet) - if subtype_index == 11: - return AutonomousDamconPacket.decode(packet) - if subtype_index == 12: - return JumpStartPacket.decode(packet) - if subtype_index == 13: - return JumpEndPacket.decode(packet) - if subtype_index == 15: - return AllShipSettingsPacket.decode(packet) - if subtype_index == 16: - return DmxPacket.decode(packet) - raise SoftDecodeFailure() + @classmethod + def decode(cls, packet): + if not packet: + raise ValueError('No payload in game message') + subtype_index = packet[0] + if subtype_index == 0: + return GameStartPacket.decode(packet) + if subtype_index == 6: + return GameEndPacket.decode(packet) + if subtype_index == 9: + return SkyboxPacket.decode(packet) + if subtype_index == 10: + return PopupPacket.decode(packet) + if subtype_index == 11: + return AutonomousDamconPacket.decode(packet) + if subtype_index == 12: + return JumpStartPacket.decode(packet) + if subtype_index == 13: + return JumpEndPacket.decode(packet) + if subtype_index == 15: + return AllShipSettingsPacket.decode(packet) + if subtype_index == 16: + return DmxPacket.decode(packet) + raise SoftDecodeFailure() class GameStartPacket(GameMessagePacket): - def encode(self): - return b'\x00\x00\x00\x00\x0a\x00\x00\x00\x00\x00\x00\x00' + def encode(self): + return b'\x00\x00\x00\x00\x0a\x00\x00\x00\x00\x00\x00\x00' - @classmethod - def decode(cls, packet): - if len(packet) != 12: - raise ValueError('Wrong packet length') - return cls() + @classmethod + def decode(cls, packet): + if len(packet) != 12: + raise ValueError('Wrong packet length') + return cls() - def __str__(self): - return '' + def __str__(self): + return '' class GameEndPacket(GameMessagePacket): - def encode(self): - return b'\x06\x00\x00\x00' + def encode(self): + return b'\x06\x00\x00\x00' - @classmethod - def decode(cls, packet): - if len(packet) != 4: - raise ValueError('Wrong packet length') - return cls() + @classmethod + def decode(cls, packet): + if len(packet) != 4: + raise ValueError('Wrong packet length') + return cls() - def __str__(self): - return '' + def __str__(self): + return '' class AllShipSettingsPacket(GameMessagePacket): - def __init__(self, ships): - self.ships = list(ships) - if len(self.ships) != 8: - raise ValueError('Must be 8 ships, {} given'.format(len(self.ships))) + def __init__(self, ships): + self.ships = list(ships) + if len(self.ships) != 8: + raise ValueError('Must be 8 ships, {} given'.format(len(self.ships))) - def encode(self): - return pack('I[IIIu]', 15, - [(ship.drive.value, ship.type.value, 1, ship.name) - for ship in self.ships]) + def encode(self): + return pack('I[IIIu]', 15, + [(ship.drive.value, ship.type.value, 1, ship.name) + for ship in self.ships]) - @classmethod - def decode(cls, packet): - _id, records = unpack('I[IIIu]', packet) - return cls(ShipSettingsRecord(DriveType(drv), ShipType(typ), name) - for drv, typ, _what, name in records) + @classmethod + def decode(cls, packet): + _id, records = unpack('I[IIfIu]', packet) + return cls(ShipSettingsRecord(DriveType(drv), ShipType(typ), name) + for drv, typ, _accent, _what, name in records) - def __str__(self): - return ''.format(self.ships) + def __str__(self): + return ''.format(self.ships) class JumpStartPacket(GameMessagePacket): - def encode(self): - return b'\x0c\x00\x00\x00' + def encode(self): + return b'\x0c\x00\x00\x00' - @classmethod - def decode(cls, packet): - if len(packet) != 4: - raise ValueError('Wrong packet length') - return cls() + @classmethod + def decode(cls, packet): + if len(packet) != 4: + raise ValueError('Wrong packet length') + return cls() - def __str__(self): - return '' + def __str__(self): + return '' class JumpEndPacket(GameMessagePacket): - def encode(self): - return b'\x0d\x00\x00\x00' + def encode(self): + return b'\x0d\x00\x00\x00' - @classmethod - def decode(cls, packet): - if len(packet) != 4: - raise ValueError('Wrong packet length') - return cls() + @classmethod + def decode(cls, packet): + if len(packet) != 4: + raise ValueError('Wrong packet length') + return cls() - def __str__(self): - return '' + def __str__(self): + return '' class DmxPacket(GameMessagePacket): - def __init__(self, flag, state): - self.flag = flag - self.state = state + def __init__(self, flag, state): + self.flag = flag + self.state = state - def encode(self): - return pack('IuI', 0x10, self.flag, int(self.state)) + def encode(self): + return pack('IuI', 0x10, self.flag, int(self.state)) - @classmethod - def decode(cls, packet): - _id, flag, state = unpack('IuI', packet) - return cls(flag, state) + @classmethod + def decode(cls, packet): + _id, flag, state = unpack('IuI', packet) + return cls(flag, state) - def __str__(self): - return ''.format(self.flag, self.state) + def __str__(self): + return ''.format(self.flag, self.state) class SkyboxPacket(GameMessagePacket): - def __init__(self, skybox): - self.skybox = skybox + def __init__(self, skybox): + self.skybox = skybox - def encode(self): - return pack('II', 9, self.skybox) + def encode(self): + return pack('II', 9, self.skybox) - @classmethod - def decode(cls, packet): - _id, skybox = unpack('II', packet) - return cls(skybox) + @classmethod + def decode(cls, packet): + _id, skybox = unpack('II', packet) + return cls(skybox) - def __str__(self): - return ''.format(self.skybox) + def __str__(self): + return ''.format(self.skybox) class PopupPacket(GameMessagePacket): - def __init__(self, message): - self.message = message + def __init__(self, message): + self.message = message - def encode(self): - return pack('Iu', 0x0a, self.message) + def encode(self): + return pack('Iu', 0x0a, self.message) - @classmethod - def decode(cls, packet): - _id, message = unpack('Iu', packet) - return cls(message) + @classmethod + def decode(cls, packet): + _id, message = unpack('Iu', packet) + return cls(message) - def __str__(self): - return ''.format(self.message) + def __str__(self): + return ''.format(self.message) class AutonomousDamconPacket(GameMessagePacket): - def __init__(self, autonomy): - self.autonomy = autonomy + def __init__(self, autonomy): + self.autonomy = autonomy - def encode(self): - return pack('II', 0x0b, int(self.autonomy)) + def encode(self): + return pack('II', 0x0b, int(self.autonomy)) - @classmethod - def decode(cls, packet): - _id, autonomy = unpack('II', packet) - return cls(bool(autonomy)) + @classmethod + def decode(cls, packet): + _id, autonomy = unpack('II', packet) + return cls(bool(autonomy)) - def __str__(self): - return ''.format(self.autonomy) + def __str__(self): + return ''.format(self.autonomy) @packet(0x4c821d3c) class ShipAction1Packet: - @classmethod - def decode(cls, packet): - if not packet: - raise ValueError('No payload in game message') - subtype_index = packet[0] - if subtype_index == 0: - return HelmSetWarpPacket.decode(packet) - if subtype_index == 1: - return SetMainScreenPacket.decode(packet) - if subtype_index == 2: - return SetWeaponsTargetPacket.decode(packet) - if subtype_index == 3: - return ToggleAutoBeamsPacket.decode(packet) - if subtype_index == 4: - return ToggleShieldsPacket.decode(packet) - if subtype_index == 7: - return HelmRequestDockPacket.decode(packet) - if subtype_index == 10: - return ToggleRedAlertPacket.decode(packet) - if subtype_index == 11: - return SetBeamFreqPacket.decode(packet) - if subtype_index == 13: - return SetShipPacket.decode(packet) - if subtype_index == 14: - return SetConsolePacket.decode(packet) - if subtype_index == 15: - return ReadyPacket.decode(packet) - if subtype_index == 16: - return SciSelectPacket.decode(packet) - if subtype_index == 17: - return CaptainSelectPacket.decode(packet) - if subtype_index == 18: - return GameMasterSelectPacket.decode(packet) - if subtype_index == 19: - return SciScanPacket.decode(packet) - if subtype_index == 22: - return SetShipSettingsPacket.decode(packet) - if subtype_index == 24: - return HelmToggleReversePacket.decode(packet) - if subtype_index == 25: - return Ready2Packet.decode(packet) - if subtype_index == 26: - return TogglePerspectivePacket.decode(packet) - if subtype_index == 27: - return ClimbDivePacket.decode(packet) - raise SoftDecodeFailure() + @classmethod + def decode(cls, packet): + if not packet: + raise ValueError('No payload in game message') + subtype_index = packet[0] + if subtype_index == 0: + return HelmSetWarpPacket.decode(packet) + if subtype_index == 1: + return SetMainScreenPacket.decode(packet) + if subtype_index == 2: + return SetWeaponsTargetPacket.decode(packet) + if subtype_index == 3: + return ToggleAutoBeamsPacket.decode(packet) + if subtype_index == 4: + return ToggleShieldsPacket.decode(packet) + if subtype_index == 7: + return HelmRequestDockPacket.decode(packet) + if subtype_index == 10: + return ToggleRedAlertPacket.decode(packet) + if subtype_index == 11: + return SetBeamFreqPacket.decode(packet) + if subtype_index == 13: + return SetShipPacket.decode(packet) + if subtype_index == 14: + return SetConsolePacket.decode(packet) + if subtype_index == 15: + return ReadyPacket.decode(packet) + if subtype_index == 16: + return SciSelectPacket.decode(packet) + if subtype_index == 17: + return CaptainSelectPacket.decode(packet) + if subtype_index == 18: + return GameMasterSelectPacket.decode(packet) + if subtype_index == 19: + return SciScanPacket.decode(packet) + if subtype_index == 22: + return SetShipSettingsPacket.decode(packet) + if subtype_index == 24: + return HelmToggleReversePacket.decode(packet) + if subtype_index == 25: + return Ready2Packet.decode(packet) + if subtype_index == 26: + return TogglePerspectivePacket.decode(packet) + if subtype_index == 27: + return ClimbDivePacket.decode(packet) + raise SoftDecodeFailure() class SciScanPacket(ShipAction1Packet): - def __init__(self, target): - self.target = target + def __init__(self, target): + self.target = target - def encode(self): - return pack('II', 19, self.target) + def encode(self): + return pack('II', 19, self.target) - @classmethod - def decode(cls, packet): - _idx, tgt = unpack('II', packet) - return cls(tgt) + @classmethod + def decode(cls, packet): + _idx, tgt = unpack('II', packet) + return cls(tgt) - def __str__(self): - return "".format(self.target) + def __str__(self): + return "".format(self.target) class CaptainSelectPacket(ShipAction1Packet): - def __init__(self, object): - self.object = object - - def encode(self): - if self.object is not None: - return pack('II', 17, self.object) - else: - return pack('II', 17, 1) - - @classmethod - def decode(cls, packet): - _idx, tgt = unpack('II', packet) - if tgt != 1: - return cls(tgt) - else: - return cls(None) - - def __str__(self): - return "".format(self.object) + def __init__(self, object): + self.object = object + + def encode(self): + if self.object is not None: + return pack('II', 17, self.object) + else: + return pack('II', 17, 1) + + @classmethod + def decode(cls, packet): + _idx, tgt = unpack('II', packet) + if tgt != 1: + return cls(tgt) + else: + return cls(None) + + def __str__(self): + return "".format(self.object) class GameMasterSelectPacket(ShipAction1Packet): - def __init__(self, object): - self.object = object - - def encode(self): - if self.object is not None: - return pack('II', 18, self.object) - else: - return pack('II', 18, 1) - - @classmethod - def decode(cls, packet): - _idx, tgt = unpack('II', packet) - if tgt != 1: - return cls(tgt) - else: - return cls(None) - - def __str__(self): - return "".format(self.object) + def __init__(self, object): + self.object = object + + def encode(self): + if self.object is not None: + return pack('II', 18, self.object) + else: + return pack('II', 18, 1) + + @classmethod + def decode(cls, packet): + _idx, tgt = unpack('II', packet) + if tgt != 1: + return cls(tgt) + else: + return cls(None) + + def __str__(self): + return "".format(self.object) class SciSelectPacket(ShipAction1Packet): - def __init__(self, object): - self.object = object - - def encode(self): - if self.object is not None: - return pack('II', 16, self.object) - else: - return pack('II', 16, 1) - - @classmethod - def decode(cls, packet): - _idx, tgt = unpack('II', packet) - if tgt != 1: - return cls(tgt) - else: - return cls(None) - - def __str__(self): - return "".format(self.object) + def __init__(self, object): + self.object = object + + def encode(self): + if self.object is not None: + return pack('II', 16, self.object) + else: + return pack('II', 16, 1) + + @classmethod + def decode(cls, packet): + _idx, tgt = unpack('II', packet) + if tgt != 1: + return cls(tgt) + else: + return cls(None) + + def __str__(self): + return "".format(self.object) class SetWeaponsTargetPacket(ShipAction1Packet): - def __init__(self, object): - self.object = object - - def encode(self): - if self.object is not None: - return pack('II', 2, self.object) - else: - return pack('II', 2, 1) - - @classmethod - def decode(cls, packet): - _idx, tgt = unpack('II', packet) - if tgt != 1: - return cls(tgt) - else: - return cls(None) - - def __str__(self): - return "".format(self.object) + def __init__(self, object): + self.object = object + + def encode(self): + if self.object is not None: + return pack('II', 2, self.object) + else: + return pack('II', 2, 1) + + @classmethod + def decode(cls, packet): + _idx, tgt = unpack('II', packet) + if tgt != 1: + return cls(tgt) + else: + return cls(None) + + def __str__(self): + return "".format(self.object) class SetBeamFreqPacket(ShipAction1Packet): - def __init__(self, freq): - self.freq = freq + def __init__(self, freq): + self.freq = freq - def encode(self): - return pack('II', 11, self.freq) + def encode(self): + return pack('II', 11, self.freq) - @classmethod - def decode(cls, packet): - _idx, freq = unpack('II', packet) - return cls(freq) + @classmethod + def decode(cls, packet): + _idx, freq = unpack('II', packet) + return cls(freq) - def __str__(self): - return "".format(self.freq) + def __str__(self): + return "".format(self.freq) class HelmToggleReversePacket(ShipAction1Packet): - def encode(self): - return b'\x18\x00\x00\x00\x00\x00\x00\x00' + def encode(self): + return b'\x18\x00\x00\x00\x00\x00\x00\x00' - @classmethod - def decode(cls, packet): - if packet != b'\x18\x00\x00\x00\x00\x00\x00\x00': - raise ValueError('Unexpected payload in reverse packet') - return cls() + @classmethod + def decode(cls, packet): + if packet != b'\x18\x00\x00\x00\x00\x00\x00\x00': + raise ValueError('Unexpected payload in reverse packet') + return cls() - def __str__(self): - return '' + def __str__(self): + return '' class ReadyPacket(ShipAction1Packet): - def encode(self): - return b'\x0f\x00\x00\x00\x00\x00\x00\x00' + def encode(self): + return b'\x0f\x00\x00\x00\x00\x00\x00\x00' - @classmethod - def decode(cls, packet): - if packet != b'\x0f\x00\x00\x00\x00\x00\x00\x00': - raise ValueError('Unexpected payload in ready packet') - return cls() + @classmethod + def decode(cls, packet): + if packet != b'\x0f\x00\x00\x00\x00\x00\x00\x00': + raise ValueError('Unexpected payload in ready packet') + return cls() - def __str__(self): - return '' + def __str__(self): + return '' class Ready2Packet(ShipAction1Packet): - def encode(self): - return b'\x19\x00\x00\x00\x00\x00\x00\x00' + def encode(self): + return b'\x19\x00\x00\x00\x00\x00\x00\x00' - @classmethod - def decode(cls, packet): - if packet != b'\x19\x00\x00\x00\x00\x00\x00\x00': - raise ValueError('Unexpected payload in ready2 packet') - return cls() + @classmethod + def decode(cls, packet): + if packet != b'\x19\x00\x00\x00\x00\x00\x00\x00': + raise ValueError('Unexpected payload in ready2 packet') + return cls() - def __str__(self): - return '' + def __str__(self): + return '' class SetShipSettingsPacket(ShipAction1Packet): - def __init__(self, drive, type, name): - self.drive = drive - self.type = type - self.name = name + def __init__(self, drive, type, name): + self.drive = drive + self.type = type + self.name = name - def encode(self): - return pack('IIIIu', 0x16, self.drive.value, self.type.value, 1, self.name) + def encode(self): + return pack('IIIIu', 0x16, self.drive.value, self.type.value, 1, self.name) - @classmethod - def decode(cls, packet): - _id, drv, typ, _unk, name = unpack('IIIIu', packet) - return cls(drive=DriveType(drv), - type=ShipType(typ), - name=name) + @classmethod + def decode(cls, packet): + _id, drv, typ, _unk, name = unpack('IIIIu', packet) + return cls(drive=DriveType(drv), + type=ShipType(typ), + name=name) - def __str__(self): - return ''.format(self.drive, self.type, self.name) + def __str__(self): + return ''.format(self.drive, self.type, self.name) class HelmRequestDockPacket(ShipAction1Packet): - def encode(self): - return b'\x07\x00\x00\x00\x00\x00\x00\x00' + def encode(self): + return b'\x07\x00\x00\x00\x00\x00\x00\x00' - @classmethod - def decode(cls, data): - if data != b'\x07\x00\x00\x00\x00\x00\x00\x00': - raise SoftDecodeFailure() - return cls() + @classmethod + def decode(cls, data): + if data != b'\x07\x00\x00\x00\x00\x00\x00\x00': + raise SoftDecodeFailure() + return cls() - def __str__(self): - return '' + def __str__(self): + return '' class ToggleShieldsPacket(ShipAction1Packet): - def encode(self): - return b'\x04\x00\x00\x00\x00\x00\x00\x00' + def encode(self): + return b'\x04\x00\x00\x00\x00\x00\x00\x00' - @classmethod - def decode(cls, data): - if data != b'\x04\x00\x00\x00\x00\x00\x00\x00': - raise SoftDecodeFailure() - return cls() + @classmethod + def decode(cls, data): + if data != b'\x04\x00\x00\x00\x00\x00\x00\x00': + raise SoftDecodeFailure() + return cls() - def __str__(self): - return '' + def __str__(self): + return '' class ToggleRedAlertPacket(ShipAction1Packet): - def encode(self): - return b'\x0a\x00\x00\x00\x00\x00\x00\x00' + def encode(self): + return b'\x0a\x00\x00\x00\x00\x00\x00\x00' - @classmethod - def decode(cls, data): - if data != b'\x0a\x00\x00\x00\x00\x00\x00\x00': - raise SoftDecodeFailure() - return cls() + @classmethod + def decode(cls, data): + if data != b'\x0a\x00\x00\x00\x00\x00\x00\x00': + raise SoftDecodeFailure() + return cls() - def __str__(self): - return '' + def __str__(self): + return '' class ToggleAutoBeamsPacket(ShipAction1Packet): - def encode(self): - return b'\x03\x00\x00\x00\x00\x00\x00\x00' + def encode(self): + return b'\x03\x00\x00\x00\x00\x00\x00\x00' - @classmethod - def decode(cls, data): - if data != b'\x03\x00\x00\x00\x00\x00\x00\x00': - raise SoftDecodeFailure() - return cls() + @classmethod + def decode(cls, data): + if data != b'\x03\x00\x00\x00\x00\x00\x00\x00': + raise SoftDecodeFailure() + return cls() - def __str__(self): - return '' + def __str__(self): + return '' class TogglePerspectivePacket(ShipAction1Packet): - def encode(self): - return b'\x1a\x00\x00\x00\x00\x00\x00\x00' + def encode(self): + return b'\x1a\x00\x00\x00\x00\x00\x00\x00' - @classmethod - def decode(cls, data): - if data != b'\x1a\x00\x00\x00\x00\x00\x00\x00': - raise SoftDecodeFailure() - return cls() + @classmethod + def decode(cls, data): + if data != b'\x1a\x00\x00\x00\x00\x00\x00\x00': + raise SoftDecodeFailure() + return cls() - def __str__(self): - return '' + def __str__(self): + return '' class ClimbDivePacket(ShipAction1Packet): - def __init__(self, direction): - self.direction = direction + def __init__(self, direction): + self.direction = direction - def encode(self): - return pack('Ii', 27, self.direction) + def encode(self): + return pack('Ii', 27, self.direction) - @classmethod - def decode(cls, packet): - _id, direction = unpack('Ii', packet) - return cls(direction) + @classmethod + def decode(cls, packet): + _id, direction = unpack('Ii', packet) + return cls(direction) - def __str__(self): - return "".format(self.direction) + def __str__(self): + return "".format(self.direction) class SetMainScreenPacket(ShipAction1Packet): - def __init__(self, screen): - self.screen = screen + def __init__(self, screen): + self.screen = screen - def encode(self): - return pack('II', 1, self.screen.value) + def encode(self): + return pack('II', 1, self.screen.value) - @classmethod - def decode(cls, packet): - _idx, screen_id = unpack('II', packet) - return cls(MainView(screen_id)) + @classmethod + def decode(cls, packet): + _idx, screen_id = unpack('II', packet) + return cls(MainView(screen_id)) - def __str__(self): - return "".format(self.screen) + def __str__(self): + return "".format(self.screen) class SetConsolePacket(ShipAction1Packet): - def __init__(self, console, selected): - self.console = console - self.selected = selected + def __init__(self, console, selected): + self.console = console + self.selected = selected - def encode(self): - return pack('III', 0x0e, self.console.value, 1 if self.selected else 0) + def encode(self): + return pack('III', 0x0e, self.console.value, 1 if self.selected else 0) - @classmethod - def decode(cls, packet): - _idx, console_id, selected = unpack('III', packet) - return cls(Console(console_id), bool(selected)) + @classmethod + def decode(cls, packet): + _idx, console_id, selected = unpack('III', packet) + return cls(Console(console_id), bool(selected)) - def __str__(self): - return "".format(self.console, self.selected) + def __str__(self): + return "".format(self.console, self.selected) class HelmSetWarpPacket(ShipAction1Packet): - def __init__(self, warp): - self.warp = warp + def __init__(self, warp): + self.warp = warp - def encode(self): - return pack('II', 0, self.warp) + def encode(self): + return pack('II', 0, self.warp) - @classmethod - def decode(cls, packet): - _idx, warp = unpack('II', packet) - return cls(warp) + @classmethod + def decode(cls, packet): + _idx, warp = unpack('II', packet) + return cls(warp) - def __str__(self): - return "".format(self.warp) + def __str__(self): + return "".format(self.warp) class SetShipPacket(ShipAction1Packet): - def __init__(self, ship): - self.ship = ship + def __init__(self, ship): + self.ship = ship - def encode(self): - return pack('II', 0x0d, self.ship) + def encode(self): + return pack('II', 0x0d, self.ship) - @classmethod - def decode(cls, packet): - _idx, ship = unpack('II', packet) - return cls(ship) + @classmethod + def decode(cls, packet): + _idx, ship = unpack('II', packet) + return cls(ship) - def __str__(self): - return "".format(self.ship) + def __str__(self): + return "".format(self.ship) @packet(0x0351a5ac) class ShipAction3Packet: - @classmethod - def decode(cls, packet): - if not packet: - raise ValueError('No payload in game message') - subtype_index = packet[0] - if subtype_index == 0: - return HelmSetImpulsePacket.decode(packet) - if subtype_index == 1: - return HelmSetSteeringPacket.decode(packet) - if subtype_index == 5: - return HelmJumpPacket.decode(packet) - raise SoftDecodeFailure() + @classmethod + def decode(cls, packet): + if not packet: + raise ValueError('No payload in game message') + subtype_index = packet[0] + if subtype_index == 0: + return HelmSetImpulsePacket.decode(packet) + if subtype_index == 1: + return HelmSetSteeringPacket.decode(packet) + if subtype_index == 5: + return HelmJumpPacket.decode(packet) + raise SoftDecodeFailure() class HelmSetSteeringPacket(ShipAction3Packet): - def __init__(self, rudder): - self.rudder = rudder + def __init__(self, rudder): + self.rudder = rudder - def encode(self): - return pack('If', 1, self.rudder) + def encode(self): + return pack('If', 1, self.rudder) - @classmethod - def decode(cls, packet): - _idx, rudder = unpack('If', packet) - return cls(rudder) + @classmethod + def decode(cls, packet): + _idx, rudder = unpack('If', packet) + return cls(rudder) - def __str__(self): - return ''.format(self.rudder) + def __str__(self): + return ''.format(self.rudder) class HelmSetImpulsePacket(ShipAction3Packet): - def __init__(self, impulse): - self.impulse = impulse + def __init__(self, impulse): + self.impulse = impulse - def encode(self): - return pack('If', 0, self.impulse) + def encode(self): + return pack('If', 0, self.impulse) - @classmethod - def decode(cls, packet): - _idx, impulse = unpack('If', packet) - return cls(impulse) + @classmethod + def decode(cls, packet): + _idx, impulse = unpack('If', packet) + return cls(impulse) - def __str__(self): - return ''.format(self.impulse) + def __str__(self): + return ''.format(self.impulse) class HelmJumpPacket(ShipAction3Packet): - def __init__(self, bearing, distance): - self.bearing = bearing - self.distance = distance + def __init__(self, bearing, distance): + self.bearing = bearing + self.distance = distance - def encode(self): - return pack('Iff', 5, self.bearing / (math.pi * 2), self.distance / 50) + def encode(self): + return pack('Iff', 5, self.bearing / (math.pi * 2), self.distance / 50) - @classmethod - def decode(cls, packet): - _idx, bearing, distance = unpack('Iff', packet) - return cls(bearing * (math.pi * 2), distance * 50) + @classmethod + def decode(cls, packet): + _idx, bearing, distance = unpack('Iff', packet) + return cls(bearing * (math.pi * 2), distance * 50) - def __str__(self): - return ''.format(self.bearing, self.distance) + def __str__(self): + return ''.format(self.bearing, self.distance) @packet(0xb83fd2c4) class BeamFiredPacket: - def __init__(self, object, port, origin, target, x, y, z, auto): - self.object = object - self.port = port - self.origin = origin - self.target = target - self.x = x - self.y = y - self.z = z - self.auto = auto - - def encode(self): - return pack('IIIIIIIIfffI', - self.object, 0, 1200, - self.port, - 1, 1, - self.origin, self.target, - self.x, self.y, self.z, - 0 if self.auto else 1) - - @classmethod - def decode(cls, packet): - object, _unk1, _unk2, port, _unk3, _unk4, origin, target, x, y, z, auto = unpack('IIIIIIIIfffI', packet) - return cls(object, port, origin, target, x, y, z, [True, False][auto]) - - def __str__(self): - return ''.format(**self.__dict__) + def __init__(self, object, port, origin, target, x, y, z, auto): + self.object = object + self.port = port + self.origin = origin + self.target = target + self.x = x + self.y = y + self.z = z + self.auto = auto + + def encode(self): + return pack('IIIIIIIIIfffI', + self.object, 1, 1200, + self.port, + 1, 1, 0, + self.origin, self.target, + self.x, self.y, self.z, + 0 if self.auto else 1) + + @classmethod + def decode(cls, packet): + object, _unk1, _unk2, port, origintype, targettype, _unk3, origin, target, x, y, z, auto = unpack('IIIIIIIIIfffI', packet) + return cls(object, port, origin, target, x, y, z, [True, False][auto]) + + def __str__(self): + return ''.format(**self.__dict__) def encode(packet, provenance=PacketProvenance.client): - encoded_block = packet.encode() - block_len = len(encoded_block) - return (struct.pack(' 0: - sys.stderr.write("WARNING: skipping {} bytes of stream to resync\n".format(de_index)) - sys.stderr.flush() - packet = packet[de_index:] - elif de_index == -1: - # wtf? - return [], b'' - buffer_len = len(packet) - if buffer_len < 24: - return [], packet - header, packet_len, origin, padding, remaining, ptype = struct.unpack(' 0: + sys.stderr.write("WARNING: skipping {} bytes of stream to resync\n".format(de_index)) + sys.stderr.flush() + packet = packet[de_index:] + elif de_index == -1: + # wtf? + return [], b'' + buffer_len = len(packet) + if buffer_len < 24: + return [], packet + header, packet_len, origin, padding, remaining, ptype = struct.unpack(' not scanned? + #"distance":None # calculate on demand + } + else: + try: + self.known_objects[obj_id]['scan-level'] = self._tracker.objects[obj_id]['scan-level?'] + except: + pass + + # print list of known objects + if len(self.known_objects.keys()) > 0: + #print("ScienceAI update:") + #print("\t%d objects known"%len(self.known_objects.keys())) + #print("\t ObjID\t Info") + #for k,v in self.known_objects.items(): + # print("\t",k,"\t",v) + print("\tSCANNING_STATE:\t",self.is_scanning()) + if(self.is_scanning()): + print("\tSCAN_PROGRESS:\t", + "%d"%(100*(self._tracker.player_ship['scanning-progress']),) + "%" + ) + else: + return # nothing to do anyway + + # if not scanning, scan! + if not self.is_scanning(): + unscanned = [oid for oid in self.known_objects.keys() + if self.known_objects[oid]['scan-level'] < 1] + if not len(unscanned): # no more to scan left = just stop + return + self.try_scan(unscanned[0]) + + +if __name__ == "__main__": + + import tkinter as tk + + tk_root = tk.Tk() + tk_w = tk.Canvas(tk_root, width=800, height=600) + tk_w.pack() + + SciAI = ScienceAI(tx,rx,tracker) # instantiate science AI instance + tracker.bind_to_updates(SciAI.tracker_callback) # call this func on obj updates + while True: + for packet in rx: # stream packets from the server + tracker.rx(packet) # Update the tracker with new information + tk_w.delete(tk.ALL) # clear canvas + for oid,odata in tracker.objects.items(): + try: + x = odata['x']/150 + z = odata['z']/150# - tracker.player_ship['z'] + #print("(%d,%d)"%(x,z)) + tk_w.create_rectangle(x,z,x+2,z+2,fill='blue') + except: pass + tk_root.update() + + + + #pp.pprint(packet) + #pp.pprint(tracker.objects) + #print("") + #print("name\t enemy?\t side?") + for oid,odata in tracker.objects.items(): + try: + print(odata["name"],"\t",odata["enemy?"],"\t",odata["side?"]) + except: pass + #for k,v in tracker.player_ship.items(): + # if k in ["science-target","scanning-id","scanning-progress"]: + # print(k,v) + +