diff --git a/opendis/dis7.py b/opendis/dis7.py index 1a7e9bb..765f20d 100644 --- a/opendis/dis7.py +++ b/opendis/dis7.py @@ -2,6 +2,16 @@ #This code is licensed under the BSD software license # +from typing import Sequence + +from .record import ( + AntennaPatternRecord, + ModulationType, + ModulationParametersRecord, + UnknownRadio, + UnknownAntennaPattern, +) +from .stream import DataInputStream, DataOutputStream from .types import ( enum8, enum16, @@ -17,7 +27,6 @@ struct16, struct32, ) -from .record import SpreadSpectrum class DataQueryDatumSpecification: @@ -669,22 +678,6 @@ def parse(self, inputStream): self.stationNumber = inputStream.read_unsigned_short() -class ModulationParameters: - """Section 6.2.58 - - Modulation parameters associated with a specific radio system. INCOMPLETE. - """ - - def __init__(self): - pass - - def serialize(self, outputStream): - """serialize the class""" - - def parse(self, inputStream): - """Parse a message. This may recursively call embedded objects.""" - - class EulerAngles: """Section 6.2.33 @@ -951,21 +944,28 @@ class VariableTransmitterParameters: Relates to radios. NOT COMPLETE. """ - def __init__(self, recordType: enum32 = 0, recordLength: uint16 = 4): + def __init__(self, recordType: enum32 = 0, data: bytes = b""): self.recordType = recordType # [UID 66] Variable Parameter Record Type - """Type of VTP. Enumeration from EBV""" - self.recordLength = recordLength - """Length, in bytes""" + self.data = data + + def marshalledSize(self) -> int: + return 6 + len(self.data) + + @property + def recordLength(self) -> uint16: + return self.marshalledSize() - def serialize(self, outputStream): + def serialize(self, outputStream: DataOutputStream) -> None: """serialize the class""" - outputStream.write_unsigned_int(self.recordType) - outputStream.write_unsigned_int(self.recordLength) + outputStream.write_uint32(self.recordType) + outputStream.write_uint16(self.recordLength) + outputStream.write_bytes(self.data) - def parse(self, inputStream): + def parse(self, inputStream: DataInputStream) -> None: """Parse a message. This may recursively call embedded objects.""" - self.recordType = inputStream.read_unsigned_int() - self.recordLength = inputStream.read_unsigned_int() + self.recordType = inputStream.read_uint32() + recordLength = inputStream.read_uint16() + self.data = inputStream.read_bytes(recordLength) class Attribute: @@ -2017,41 +2017,6 @@ def parse(self, inputStream): self.communicationsNodeID.parse(inputStream) -class ModulationType: - """Section 6.2.59 - - Information about the type of modulation used for radio transmission. - """ - - def __init__(self, - spreadSpectrum: SpreadSpectrum | None = None, # See RPR Enumerations - majorModulation: enum16 = 0, # [UID 155] - detail: enum16 = 0, # [UID 156-162] - radioSystem: enum16 = 0): # [UID 163] - self.spreadSpectrum = spreadSpectrum or SpreadSpectrum() - """This field shall indicate the spread spectrum technique or combination of spread spectrum techniques in use. Bit field. 0=freq hopping, 1=psuedo noise, time hopping=2, reamining bits unused""" - self.majorModulation = majorModulation - """the major classification of the modulation type.""" - self.detail = detail - """provide certain detailed information depending upon the major modulation type""" - self.radioSystem = radioSystem - """the radio system associated with this Transmitter PDU and shall be used as the basis to interpret other fields whose values depend on a specific radio system.""" - - def serialize(self, outputStream): - """serialize the class""" - outputStream.write_unsigned_short(self.spreadSpectrum) - outputStream.write_unsigned_short(self.majorModulation) - outputStream.write_unsigned_short(self.detail) - outputStream.write_unsigned_short(self.radioSystem) - - def parse(self, inputStream): - """Parse a message. This may recursively call embedded objects.""" - self.spreadSpectrum = inputStream.read_unsigned_short() - self.majorModulation = inputStream.read_unsigned_short() - self.detail = inputStream.read_unsigned_short() - self.radioSystem = inputStream.read_unsigned_short() - - class LinearSegmentParameter: """Section 6.2.52 @@ -5460,20 +5425,18 @@ def __init__(self, radioEntityType: "EntityType | None" = None, transmitState: enum8 = 0, # [UID 164] inputSource: enum8 = 0, # [UID 165] - variableTransmitterParameterCount: uint16 = 0, antennaLocation: "Vector3Double | None" = None, relativeAntennaLocation: "Vector3Float | None" = None, antennaPatternType: enum16 = 0, # [UID 167] - antennaPatternCount: uint16 = 0, # in bytes frequency: uint64 = 0, # in Hz transmitFrequencyBandwidth: float32 = 0.0, # in Hz power: float32 = 0.0, # in decibel-milliwatts modulationType: "ModulationType | None" = None, cryptoSystem: enum16 = 0, # [UID 166] cryptoKeyId: struct16 = 0, # See Table 175 - modulationParameterCount: uint8 = 0, # in bytes - modulationParametersList=None, - antennaPatternList=None): + modulationParameters: ModulationParametersRecord | None = None, + antennaPattern: AntennaPatternRecord | None = None, + variableTransmitterParameters: Sequence[VariableTransmitterParameters] | None = None): super(TransmitterPdu, self).__init__() self.radioReferenceID = radioReferenceID or EntityID() """ID of the entity that is the source of the communication""" @@ -5486,107 +5449,116 @@ def __init__(self, self.relativeAntennaLocation = relativeAntennaLocation or Vector3Float( ) self.antennaPatternType = antennaPatternType - self.antennaPatternCount = antennaPatternCount self.frequency = frequency self.transmitFrequencyBandwidth = transmitFrequencyBandwidth self.power = power self.modulationType = modulationType or ModulationType() self.cryptoSystem = cryptoSystem self.cryptoKeyId = cryptoKeyId - # FIXME: Refactor modulation parameters into its own record class - self.modulationParameterCount = modulationParameterCount self.padding2 = 0 self.padding3 = 0 - self.modulationParametersList = modulationParametersList or [] - self.antennaPatternList = antennaPatternList or [] - # TODO: zero or more Variable Transmitter Parameters records (see 6.2.95) + self.modulationParameters = modulationParameters + self.antennaPattern = antennaPattern + self.variableTransmitterParameters = variableTransmitterParameters or [] + + @property + def antennaPatternLength(self) -> uint16: + if self.antennaPattern: + return self.antennaPattern.marshalledSize() + else: + return 0 + + @property + def modulationParametersLength(self) -> uint8: + if self.modulationParameters: + return self.modulationParameters.marshalledSize() + else: + return 0 @property def variableTransmitterParameterCount(self) -> uint16: - """How many variable transmitter parameters are in the variable length list. - In earlier versions of DIS these were known as articulation parameters. - """ - return len(self.modulationParametersList) + return len(self.variableTransmitterParameters) - def serialize(self, outputStream): - """serialize the class""" + def serialize(self, outputStream: DataOutputStream) -> None: super(TransmitterPdu, self).serialize(outputStream) self.radioReferenceID.serialize(outputStream) - outputStream.write_unsigned_short(self.radioNumber) + outputStream.write_uint16(self.radioNumber) self.radioEntityType.serialize(outputStream) - outputStream.write_unsigned_byte(self.transmitState) - outputStream.write_unsigned_byte(self.inputSource) - outputStream.write_unsigned_short( - self.variableTransmitterParameterCount) + outputStream.write_uint8(self.transmitState) + outputStream.write_uint8(self.inputSource) + outputStream.write_uint16(self.variableTransmitterParameterCount) self.antennaLocation.serialize(outputStream) self.relativeAntennaLocation.serialize(outputStream) - outputStream.write_unsigned_short(self.antennaPatternType) - outputStream.write_unsigned_short(len(self.antennaPatternList)) - outputStream.write_long(self.frequency) - outputStream.write_float(self.transmitFrequencyBandwidth) - outputStream.write_float(self.power) + outputStream.write_uint16(self.antennaPatternType) + outputStream.write_uint16(self.antennaPatternLength) + outputStream.write_uint64(self.frequency) + outputStream.write_float32(self.transmitFrequencyBandwidth) + outputStream.write_float32(self.power) self.modulationType.serialize(outputStream) - outputStream.write_unsigned_short(self.cryptoSystem) - outputStream.write_unsigned_short(self.cryptoKeyId) - outputStream.write_unsigned_byte(len(self.modulationParametersList)) - outputStream.write_unsigned_short(self.padding2) - outputStream.write_unsigned_byte(self.padding3) - for anObj in self.modulationParametersList: - anObj.serialize(outputStream) + outputStream.write_uint16(self.cryptoSystem) + outputStream.write_uint16(self.cryptoKeyId) + outputStream.write_uint8(self.modulationParametersLength) + outputStream.write_uint16(self.padding2) + outputStream.write_uint8(self.padding3) - for anObj in self.antennaPatternList: - anObj.serialize(outputStream) + # Serialize parameter records - def parse(self, inputStream): + ## Modulation Parameters + if self.modulationParameters: + self.modulationParameters.serialize(outputStream) + + ## Antenna Pattern + if self.antennaPattern: + self.antennaPattern.serialize(outputStream) + + ## Variable Transmitter Parameters + for vtp in self.variableTransmitterParameters: + vtp.serialize(outputStream) + + def parse(self, inputStream: DataInputStream) -> None: """Parse a message. This may recursively call embedded objects.""" super(TransmitterPdu, self).parse(inputStream) self.radioReferenceID.parse(inputStream) - self.radioNumber = inputStream.read_unsigned_short() + self.radioNumber = inputStream.read_uint16() self.radioEntityType.parse(inputStream) - self.transmitState = inputStream.read_unsigned_byte() - self.inputSource = inputStream.read_unsigned_byte() - variableTransmitterParameterCount = inputStream.read_unsigned_short( - ) + self.transmitState = inputStream.read_uint8() + self.inputSource = inputStream.read_uint8() + variableTransmitterParameterCount = inputStream.read_uint16() self.antennaLocation.parse(inputStream) self.relativeAntennaLocation.parse(inputStream) - self.antennaPatternType = inputStream.read_unsigned_short() - self.antennaPatternCount = inputStream.read_unsigned_short() - self.frequency = inputStream.read_long() - self.transmitFrequencyBandwidth = inputStream.read_float() - self.power = inputStream.read_float() + self.antennaPatternType = inputStream.read_uint16() + antennaPatternLength = inputStream.read_uint16() + self.frequency = inputStream.read_uint64() + self.transmitFrequencyBandwidth = inputStream.read_float32() + self.power = inputStream.read_float32() self.modulationType.parse(inputStream) - self.cryptoSystem = inputStream.read_unsigned_short() - self.cryptoKeyId = inputStream.read_unsigned_short() - self.modulationParameterCount = inputStream.read_unsigned_byte() - self.padding2 = inputStream.read_unsigned_short() - self.padding3 = inputStream.read_unsigned_byte() - """Vendor product MACE from BattleSpace Inc, only uses 1 byte per modulation param""" - """SISO Spec dictates it should be 2 bytes""" - """Instead of dumping the packet we can make an assumption that some vendors use 1 byte per param""" - """Although we will still send out 2 bytes per param as per spec""" - endsize = self.antennaPatternCount * 39 - mod_bytes = 2 - - if (self.modulationParameterCount > 0): - curr = inputStream.stream.tell() - remaining = inputStream.stream.read(None) - mod_bytes = (len(remaining) - - endsize) / self.modulationParameterCount - inputStream.stream.seek(curr, 0) - - if (mod_bytes > 2): - print("Malformed Packet") + self.cryptoSystem = inputStream.read_uint16() + self.cryptoKeyId = inputStream.read_uint16() + modulationParametersLength = inputStream.read_uint8() + self.padding2 = inputStream.read_uint16() + self.padding3 = inputStream.read_uint8() + + # Parse parameter records + + ## Modulation Parameters + if modulationParametersLength > 0: + radio = UnknownRadio() + radio.parse(inputStream, bytelength=modulationParametersLength) + self.modulationParameters = radio else: - for idx in range(0, self.modulationParameterCount): - if mod_bytes == 2: - element = inputStream.read_unsigned_short() - else: - element = inputStream.read_unsigned_byte() - self.modulationParametersList.append(element) - for idx in range(0, self.antennaPatternCount): - element = BeamAntennaPattern() - element.parse(inputStream) - self.antennaPatternList.append(element) + self.modulationParameters = None + + ## Antenna Pattern + if antennaPatternLength > 0: + self.antennaPattern = UnknownAntennaPattern() + self.antennaPattern.parse( + inputStream, + bytelength=antennaPatternLength + ) + else: + self.antennaPattern = None + + class ElectromagneticEmissionsPdu(DistributedEmissionsFamilyPdu): diff --git a/opendis/record.py b/opendis/record.py index 211cd8a..4631b2b 100644 --- a/opendis/record.py +++ b/opendis/record.py @@ -3,6 +3,7 @@ This module defines classes for various record types used in DIS PDUs. """ +from abc import abstractmethod from collections.abc import Sequence from ctypes import ( _SimpleCData, @@ -16,9 +17,13 @@ from .stream import DataInputStream, DataOutputStream from .types import ( + enum16, bf_enum, bf_int, bf_uint, + uint8, + uint16, + uint32, ) # Type definitions for bitfield field descriptors @@ -103,6 +108,44 @@ def parse(cls, inputStream: DataInputStream) -> "Bitfield": return Bitfield +class ModulationType: + """Section 6.2.59 + + Information about the type of modulation used for radio transmission. + """ + + def __init__(self, + spreadSpectrum: "SpreadSpectrum | None" = None, # See RPR Enumerations + majorModulation: enum16 = 0, # [UID 155] + detail: enum16 = 0, # [UID 156-162] + radioSystem: enum16 = 0): # [UID 163] + self.spreadSpectrum = spreadSpectrum or SpreadSpectrum() + """This field shall indicate the spread spectrum technique or combination of spread spectrum techniques in use. Bit field. 0=freq hopping, 1=psuedo noise, time hopping=2, remaining bits unused""" + self.majorModulation = majorModulation + self.detail = detail + self.radioSystem = radioSystem + + def marshalledSize(self) -> int: + size = 0 + size += self.spreadSpectrum.marshalledSize() + size += 2 # majorModulation + size += 2 # detail + size += 2 # radioSystem + return size + + def serialize(self, outputStream: DataOutputStream) -> None: + self.spreadSpectrum.serialize(outputStream) + outputStream.write_uint16(self.majorModulation) + outputStream.write_uint16(self.detail) + outputStream.write_uint16(self.radioSystem) + + def parse(self, inputStream: DataInputStream) -> None: + self.spreadSpectrum.parse(inputStream) + self.majorModulation = inputStream.read_uint16() + self.detail = inputStream.read_uint16() + self.radioSystem = inputStream.read_uint16() + + class NetId: """Annex C, Table C.5 @@ -198,3 +241,192 @@ def parse(self, inputStream: DataInputStream) -> None: self.frequencyHopping = bool(record_bitfield.frequencyHopping) self.pseudoNoise = bool(record_bitfield.pseudoNoise) self.timeHopping = bool(record_bitfield.timeHopping) + + +class ModulationParametersRecord: + """Base class for modulation parameters records, as defined in Annex C.""" + + @abstractmethod + def marshalledSize(self) -> int: + """Return the size of the record when serialized.""" + raise NotImplementedError() + + @abstractmethod + def serialize(self, outputStream: DataOutputStream) -> None: + """Serialize the record to the output stream.""" + raise NotImplementedError() + + @abstractmethod + def parse(self, inputStream: DataInputStream) -> None: + """Parse the record from the input stream.""" + raise NotImplementedError() + + +class UnknownRadio(ModulationParametersRecord): + """Placeholder for unknown or unimplemented radio types.""" + + def __init__(self, data: bytes = b''): + self.data = data + + def marshalledSize(self) -> int: + return len(self.data) + + def serialize(self, outputStream: DataOutputStream) -> None: + outputStream.write_bytes(self.data) + + def parse(self, inputStream: DataInputStream, bytelength: int = 0) -> None: + self.data = inputStream.read_bytes(bytelength) + + +class GenericRadio(ModulationParametersRecord): + """Annex C.2 Generic Radio record + + There are no other specific Transmitter, Signal, or Receiver PDU + requirements unique to a generic radio. + """ + + def marshalledSize(self) -> int: + return 0 + + def serialize(self, outputStream: DataOutputStream) -> None: + pass + + def parse(self, inputStream: DataInputStream) -> None: + pass + + +class SimpleIntercomRadio(ModulationParametersRecord): + """Annex C.3 Simple Intercom Radio + + A Simple Intercom shall be identified by both the Transmitter PDU + Modulation Type record—Radio System field indicating a system type of + Generic Radio or Simple Intercom (1) and by the Modulation Type + record—Major Modulation field set to No Statement (0). + + This class has specific field requirements for the TransmitterPdu. + """ + + def marshalledSize(self) -> int: + return 0 + + def serialize(self, outputStream: DataOutputStream) -> None: + pass + + def parse(self, inputStream: DataInputStream) -> None: + pass + + +# C.4 HAVE QUICK Radios + +class BasicHaveQuickMP(ModulationParametersRecord): + """Annex C 4.2.2, Table C.3 — Basic HAVE QUICK MP record""" + + def __init__(self, + net_id: NetId | None = None, + mwod_index: uint16 = 1, + reserved16: uint16 = 0, + reserved8_1: uint8 = 0, + reserved8_2: uint8 = 0, + time_of_day: uint32 = 0, + padding: uint32 = 0): + self.net_id = net_id or NetId() + self.mwod_index = mwod_index + self.reserved16 = reserved16 + self.reserved8_1 = reserved8_1 + self.reserved8_2 = reserved8_2 + self.time_of_day = time_of_day + self.padding = padding + + def marshalledSize(self) -> int: + return 16 # bytes + + def serialize(self, outputStream: DataOutputStream) -> None: + self.net_id.serialize(outputStream) + outputStream.write_uint16(self.mwod_index) + outputStream.write_uint16(self.reserved16) + outputStream.write_uint8(self.reserved8_1) + outputStream.write_uint8(self.reserved8_2) + outputStream.write_uint32(self.time_of_day) + outputStream.write_uint32(self.padding) + + def parse(self, inputStream: DataInputStream) -> None: + self.net_id.parse(inputStream) + self.mwod_index = inputStream.read_uint16() + self.reserved16 = inputStream.read_uint16() + self.reserved8_1 = inputStream.read_uint8() + self.reserved8_2 = inputStream.read_uint8() + self.time_of_day = inputStream.read_uint32() + self.padding = inputStream.read_uint32() + + +class ModulationParameters: + """Section 6.2.58 + + Modulation parameters associated with a specific radio system. + + This class is not designed to be instantiated directly with no arguments. + """ + + def __init__(self, + # record must be provided as there is no default value. + record: ModulationParametersRecord): + self.record = record + # ModulationParameters requires padding to 64-bit (8-byte) boundary + self.padding = 8 - (self.record.marshalledSize() % 8) % 8 + + def marshalledSize(self) -> int: + return self.record.marshalledSize() + self.padding + + def serialize(self, outputStream: DataOutputStream) -> None: + self.record.serialize(outputStream) + outputStream.write_bytes(b'\x00' * self.padding) + + def parse(self, inputStream: DataInputStream) -> None: + self.record.parse(inputStream) + bytes_to_read = 8 - (self.record.marshalledSize() % 8) % 8 + if bytes_to_read > 0: + self.padding = int.from_bytes( + inputStream.read_bytes(bytes_to_read), + byteorder='big' + ) + else: + self.padding = 0 + + +class AntennaPatternRecord: + """Section 6.2.8 + + The total length of each record shall be a multiple of 64 bits. + """ + + @abstractmethod + def marshalledSize(self) -> int: + """Return the size of the record when serialized.""" + raise NotImplementedError() + + @abstractmethod + def serialize(self, outputStream: DataOutputStream) -> None: + """Serialize the record to the output stream.""" + raise NotImplementedError() + + @abstractmethod + def parse(self, inputStream: DataInputStream) -> None: + """Parse the record from the input stream.""" + raise NotImplementedError() + + +class UnknownAntennaPattern(AntennaPatternRecord): + """Placeholder for unknown or unimplemented antenna pattern types.""" + + def __init__(self, data: bytes = b''): + self.data = data + + def marshalledSize(self) -> int: + return len(self.data) + + def serialize(self, outputStream: DataOutputStream) -> None: + outputStream.write_bytes(self.data) + + def parse(self, inputStream: DataInputStream, bytelength: int = 0) -> None: + """Parse a message. This may recursively call embedded objects.""" + self.data = inputStream.read_bytes(bytelength)