Source code for eaarl.io.waveforms.tld

# -*- coding: utf-8 -*-
# vim: set fileencoding=utf-8 :
# pylint: disable = no-member
'''Handling for TLD data'''

# Boilerplate for cross-compatibility of Python 2/3
from __future__ import unicode_literals
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
from future.builtins import * # pylint: disable=wildcard-import
import future.standard_library
future.standard_library.install_aliases()

import copy
import struct

from ._types import TLDRecordHeader, TLDRasterHeader, TLDPulseHeader

[docs]def read(f, offset, count, progress=None): '''Read records from file Parameters f : filehandle Open readable filehandle offset : int Offset into file to start reading at count : int Number of records to read progress : tqdm.tqdm or compatible or None or False If provided, then progress.update() will be called after each raster is loaded. This is intended to support display of a progress bar via tqdm. ''' f.seek(offset) records = [] for _ in range(count): records.append(_read_record(f)) if progress: progress.update() return records
def _read_record(f): '''Read and return a raster record''' raw = f.read(TLDRecordHeader._size) record_header = TLDRecordHeader._unpack(raw)._asdict() data_length = record_header['record_length'] - TLDRecordHeader._size raw = f.read(data_length) if record_header['record_type'] != 5: return None raster = TLDRasterHeader._unpack_from(raw, 0)._asdict() raster['pulse'] = [] offset = TLDRasterHeader._size for _ in range(raster['pulse_count']): pulse, offset = _read_pulse(raw, offset) pulse['time'] = raster['time'] + pulse['time_offset'] del pulse['time_offset'] raster['pulse'].append(pulse) return raster def _read_pulse(raw, offset): '''read and return a pulse record''' pulse = TLDPulseHeader._unpack_from(raw, offset)._asdict() offset += TLDPulseHeader._size data_length = struct.unpack_from('<H', raw, offset)[0] offset += 2 _read_waveforms(raw[offset:offset+data_length], pulse) offset += data_length return (pulse, offset) def _read_waveforms(raw, pulse): '''Read and return a set of waveforms''' offset = 0 length = struct.unpack_from('<B', raw, offset)[0] offset += 1 pulse['tx'] = _wf(raw[offset:offset+length]) offset += length pulse['rx'] = [] for _ in range(pulse['waveform_count']): length = struct.unpack_from('<H', raw, offset)[0] offset += 2 pulse['rx'].append(_wf(raw[offset:offset+length])) offset += length def _wf(raw): '''Converts a buffer into an integer sequence''' return [_ for _ in bytes(raw)]
[docs]def write(f, rasters): '''Writes a series of rasters to an open filehandle''' for raster in rasters: f.write(_encode_record(raster))
def _encode_record(raster): '''Encode a raster into a buffer''' # Build up a bytestring with the data, then write it # Can't write as its generated since the record length comes first # Avoiding random-access in case f is write-only # Some modifications will be made later, ensure they don't alter the # originals raster = copy.deepcopy(raster) # Allocate the space for the record header, but wait to populate it buffer = bytearray(TLDRecordHeader._size) # Add the raster header raster['pulse_count'] = len(raster['pulse']) buffer += TLDRasterHeader._fromdict(copy.deepcopy(raster))._pack() for pulse in raster['pulse']: pulse['time_offset'] = pulse['time'] - raster['time'] buffer += _encode_pulse(pulse) header = {'record_type': 5, 'record_length': len(buffer)} TLDRecordHeader._fromdict(header)._pack_into(buffer, 0) return buffer def _encode_pulse(pulse): '''Encode a pulse into a buffer''' pulse['waveform_count'] = min(4, len(pulse['rx'])) header = TLDPulseHeader._fromdict(pulse)._pack() waveforms = _encode_waveforms(pulse) return header + struct.pack('<H', len(waveforms)) + waveforms def _encode_waveforms(pulse): '''Encode a set of waveforms into a buffer''' buffer = bytearray() buffer += struct.pack('<B', len(pulse['tx'])) buffer += bytearray(pulse['tx']) for rx in pulse['rx']: buffer += struct.pack('<H', len(rx)) buffer += bytearray(rx) return buffer