Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
252 changes: 112 additions & 140 deletions opendis/dis7.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@
#This code is licensed under the BSD software license
#

from typing import Sequence

from .record import (
AntennaPatternRecord,
ModulationType,
ModulationParametersRecord,
UnknownRadio,
UnknownAntennaPattern,
)
from .stream import DataInputStream, DataOutputStream
from .types import (
enum8,
enum16,
Expand All @@ -17,7 +27,6 @@
struct16,
struct32,
)
from .record import SpreadSpectrum


class DataQueryDatumSpecification:
Expand Down Expand Up @@ -669,22 +678,6 @@ def parse(self, inputStream):
self.stationNumber = inputStream.read_unsigned_short()


class ModulationParameters:
"""Section 6.2.58

Modulation parameters associated with a specific radio system. INCOMPLETE.
"""

def __init__(self):
pass

def serialize(self, outputStream):
"""serialize the class"""

def parse(self, inputStream):
"""Parse a message. This may recursively call embedded objects."""


class EulerAngles:
"""Section 6.2.33

Expand Down Expand Up @@ -951,21 +944,28 @@ class VariableTransmitterParameters:
Relates to radios. NOT COMPLETE.
"""

def __init__(self, recordType: enum32 = 0, recordLength: uint16 = 4):
def __init__(self, recordType: enum32 = 0, data: bytes = b""):
self.recordType = recordType # [UID 66] Variable Parameter Record Type
"""Type of VTP. Enumeration from EBV"""
self.recordLength = recordLength
"""Length, in bytes"""
self.data = data

def marshalledSize(self) -> int:
return 6 + len(self.data)

@property
def recordLength(self) -> uint16:
return self.marshalledSize()

def serialize(self, outputStream):
def serialize(self, outputStream: DataOutputStream) -> None:
"""serialize the class"""
outputStream.write_unsigned_int(self.recordType)
outputStream.write_unsigned_int(self.recordLength)
outputStream.write_uint32(self.recordType)
outputStream.write_uint16(self.recordLength)
outputStream.write_bytes(self.data)

def parse(self, inputStream):
def parse(self, inputStream: DataInputStream) -> None:
"""Parse a message. This may recursively call embedded objects."""
self.recordType = inputStream.read_unsigned_int()
self.recordLength = inputStream.read_unsigned_int()
self.recordType = inputStream.read_uint32()
recordLength = inputStream.read_uint16()
self.data = inputStream.read_bytes(recordLength)


class Attribute:
Expand Down Expand Up @@ -2017,41 +2017,6 @@ def parse(self, inputStream):
self.communicationsNodeID.parse(inputStream)


class ModulationType:
"""Section 6.2.59

Information about the type of modulation used for radio transmission.
"""

def __init__(self,
spreadSpectrum: SpreadSpectrum | None = None, # See RPR Enumerations
majorModulation: enum16 = 0, # [UID 155]
detail: enum16 = 0, # [UID 156-162]
radioSystem: enum16 = 0): # [UID 163]
self.spreadSpectrum = spreadSpectrum or SpreadSpectrum()
"""This field shall indicate the spread spectrum technique or combination of spread spectrum techniques in use. Bit field. 0=freq hopping, 1=psuedo noise, time hopping=2, reamining bits unused"""
self.majorModulation = majorModulation
"""the major classification of the modulation type."""
self.detail = detail
"""provide certain detailed information depending upon the major modulation type"""
self.radioSystem = radioSystem
"""the radio system associated with this Transmitter PDU and shall be used as the basis to interpret other fields whose values depend on a specific radio system."""

def serialize(self, outputStream):
"""serialize the class"""
outputStream.write_unsigned_short(self.spreadSpectrum)
outputStream.write_unsigned_short(self.majorModulation)
outputStream.write_unsigned_short(self.detail)
outputStream.write_unsigned_short(self.radioSystem)

def parse(self, inputStream):
"""Parse a message. This may recursively call embedded objects."""
self.spreadSpectrum = inputStream.read_unsigned_short()
self.majorModulation = inputStream.read_unsigned_short()
self.detail = inputStream.read_unsigned_short()
self.radioSystem = inputStream.read_unsigned_short()


class LinearSegmentParameter:
"""Section 6.2.52

Expand Down Expand Up @@ -5460,20 +5425,18 @@ def __init__(self,
radioEntityType: "EntityType | None" = None,
transmitState: enum8 = 0, # [UID 164]
inputSource: enum8 = 0, # [UID 165]
variableTransmitterParameterCount: uint16 = 0,
antennaLocation: "Vector3Double | None" = None,
relativeAntennaLocation: "Vector3Float | None" = None,
antennaPatternType: enum16 = 0, # [UID 167]
antennaPatternCount: uint16 = 0, # in bytes
frequency: uint64 = 0, # in Hz
transmitFrequencyBandwidth: float32 = 0.0, # in Hz
power: float32 = 0.0, # in decibel-milliwatts
modulationType: "ModulationType | None" = None,
cryptoSystem: enum16 = 0, # [UID 166]
cryptoKeyId: struct16 = 0, # See Table 175
modulationParameterCount: uint8 = 0, # in bytes
modulationParametersList=None,
antennaPatternList=None):
modulationParameters: ModulationParametersRecord | None = None,
antennaPattern: AntennaPatternRecord | None = None,
variableTransmitterParameters: Sequence[VariableTransmitterParameters] | None = None):
super(TransmitterPdu, self).__init__()
self.radioReferenceID = radioReferenceID or EntityID()
"""ID of the entity that is the source of the communication"""
Expand All @@ -5486,107 +5449,116 @@ def __init__(self,
self.relativeAntennaLocation = relativeAntennaLocation or Vector3Float(
)
self.antennaPatternType = antennaPatternType
self.antennaPatternCount = antennaPatternCount
self.frequency = frequency
self.transmitFrequencyBandwidth = transmitFrequencyBandwidth
self.power = power
self.modulationType = modulationType or ModulationType()
self.cryptoSystem = cryptoSystem
self.cryptoKeyId = cryptoKeyId
# FIXME: Refactor modulation parameters into its own record class
self.modulationParameterCount = modulationParameterCount
self.padding2 = 0
self.padding3 = 0
self.modulationParametersList = modulationParametersList or []
self.antennaPatternList = antennaPatternList or []
# TODO: zero or more Variable Transmitter Parameters records (see 6.2.95)
self.modulationParameters = modulationParameters
self.antennaPattern = antennaPattern
self.variableTransmitterParameters = variableTransmitterParameters or []

@property
def antennaPatternLength(self) -> uint16:
if self.antennaPattern:
return self.antennaPattern.marshalledSize()
else:
return 0

@property
def modulationParametersLength(self) -> uint8:
if self.modulationParameters:
return self.modulationParameters.marshalledSize()
else:
return 0

@property
def variableTransmitterParameterCount(self) -> uint16:
"""How many variable transmitter parameters are in the variable length list.
In earlier versions of DIS these were known as articulation parameters.
"""
return len(self.modulationParametersList)
return len(self.variableTransmitterParameters)

def serialize(self, outputStream):
"""serialize the class"""
def serialize(self, outputStream: DataOutputStream) -> None:
super(TransmitterPdu, self).serialize(outputStream)
self.radioReferenceID.serialize(outputStream)
outputStream.write_unsigned_short(self.radioNumber)
outputStream.write_uint16(self.radioNumber)
self.radioEntityType.serialize(outputStream)
outputStream.write_unsigned_byte(self.transmitState)
outputStream.write_unsigned_byte(self.inputSource)
outputStream.write_unsigned_short(
self.variableTransmitterParameterCount)
outputStream.write_uint8(self.transmitState)
outputStream.write_uint8(self.inputSource)
outputStream.write_uint16(self.variableTransmitterParameterCount)
self.antennaLocation.serialize(outputStream)
self.relativeAntennaLocation.serialize(outputStream)
outputStream.write_unsigned_short(self.antennaPatternType)
outputStream.write_unsigned_short(len(self.antennaPatternList))
outputStream.write_long(self.frequency)
outputStream.write_float(self.transmitFrequencyBandwidth)
outputStream.write_float(self.power)
outputStream.write_uint16(self.antennaPatternType)
outputStream.write_uint16(self.antennaPatternLength)
outputStream.write_uint64(self.frequency)
outputStream.write_float32(self.transmitFrequencyBandwidth)
outputStream.write_float32(self.power)
self.modulationType.serialize(outputStream)
outputStream.write_unsigned_short(self.cryptoSystem)
outputStream.write_unsigned_short(self.cryptoKeyId)
outputStream.write_unsigned_byte(len(self.modulationParametersList))
outputStream.write_unsigned_short(self.padding2)
outputStream.write_unsigned_byte(self.padding3)
for anObj in self.modulationParametersList:
anObj.serialize(outputStream)
outputStream.write_uint16(self.cryptoSystem)
outputStream.write_uint16(self.cryptoKeyId)
outputStream.write_uint8(self.modulationParametersLength)
outputStream.write_uint16(self.padding2)
outputStream.write_uint8(self.padding3)

for anObj in self.antennaPatternList:
anObj.serialize(outputStream)
# Serialize parameter records

def parse(self, inputStream):
## Modulation Parameters
if self.modulationParameters:
self.modulationParameters.serialize(outputStream)

## Antenna Pattern
if self.antennaPattern:
self.antennaPattern.serialize(outputStream)

## Variable Transmitter Parameters
for vtp in self.variableTransmitterParameters:
vtp.serialize(outputStream)

def parse(self, inputStream: DataInputStream) -> None:
"""Parse a message. This may recursively call embedded objects."""
super(TransmitterPdu, self).parse(inputStream)
self.radioReferenceID.parse(inputStream)
self.radioNumber = inputStream.read_unsigned_short()
self.radioNumber = inputStream.read_uint16()
self.radioEntityType.parse(inputStream)
self.transmitState = inputStream.read_unsigned_byte()
self.inputSource = inputStream.read_unsigned_byte()
variableTransmitterParameterCount = inputStream.read_unsigned_short(
)
self.transmitState = inputStream.read_uint8()
self.inputSource = inputStream.read_uint8()
variableTransmitterParameterCount = inputStream.read_uint16()
self.antennaLocation.parse(inputStream)
self.relativeAntennaLocation.parse(inputStream)
self.antennaPatternType = inputStream.read_unsigned_short()
self.antennaPatternCount = inputStream.read_unsigned_short()
self.frequency = inputStream.read_long()
self.transmitFrequencyBandwidth = inputStream.read_float()
self.power = inputStream.read_float()
self.antennaPatternType = inputStream.read_uint16()
antennaPatternLength = inputStream.read_uint16()
self.frequency = inputStream.read_uint64()
self.transmitFrequencyBandwidth = inputStream.read_float32()
self.power = inputStream.read_float32()
self.modulationType.parse(inputStream)
self.cryptoSystem = inputStream.read_unsigned_short()
self.cryptoKeyId = inputStream.read_unsigned_short()
self.modulationParameterCount = inputStream.read_unsigned_byte()
self.padding2 = inputStream.read_unsigned_short()
self.padding3 = inputStream.read_unsigned_byte()
"""Vendor product MACE from BattleSpace Inc, only uses 1 byte per modulation param"""
"""SISO Spec dictates it should be 2 bytes"""
"""Instead of dumping the packet we can make an assumption that some vendors use 1 byte per param"""
"""Although we will still send out 2 bytes per param as per spec"""
endsize = self.antennaPatternCount * 39
mod_bytes = 2

if (self.modulationParameterCount > 0):
curr = inputStream.stream.tell()
remaining = inputStream.stream.read(None)
mod_bytes = (len(remaining) -
endsize) / self.modulationParameterCount
inputStream.stream.seek(curr, 0)

if (mod_bytes > 2):
print("Malformed Packet")
self.cryptoSystem = inputStream.read_uint16()
self.cryptoKeyId = inputStream.read_uint16()
modulationParametersLength = inputStream.read_uint8()
self.padding2 = inputStream.read_uint16()
self.padding3 = inputStream.read_uint8()

# Parse parameter records

## Modulation Parameters
if modulationParametersLength > 0:
radio = UnknownRadio()
radio.parse(inputStream, bytelength=modulationParametersLength)
self.modulationParameters = radio
else:
for idx in range(0, self.modulationParameterCount):
if mod_bytes == 2:
element = inputStream.read_unsigned_short()
else:
element = inputStream.read_unsigned_byte()
self.modulationParametersList.append(element)
for idx in range(0, self.antennaPatternCount):
element = BeamAntennaPattern()
element.parse(inputStream)
self.antennaPatternList.append(element)
self.modulationParameters = None

## Antenna Pattern
if antennaPatternLength > 0:
self.antennaPattern = UnknownAntennaPattern()
self.antennaPattern.parse(
inputStream,
bytelength=antennaPatternLength
)
else:
self.antennaPattern = None




class ElectromagneticEmissionsPdu(DistributedEmissionsFamilyPdu):
Expand Down
Loading