Source code for grx.GRX1090

import datetime
import logging
import math
import time

import grpc
from google.protobuf.empty_pb2 import Empty

from . import Monitord_pb2_grpc
from . import Receiverd_pb2, Receiverd_pb2_grpc
from . import Spectrumd_pb2_grpc

logger = logging.getLogger('GRX1090')


[docs] class GRX1090: """ This class provides a simple wrapper for the device's protobuf-based GRPC interface. """
[docs] def __init__(self, addr, receiverd_port=5303, monitord_port=5305, spectrumd_port=5306): """ Constructor :param addr: network address (IP or hostname) of the device :param receiverd_port: port for the receiverd GRPC interface (default: 5303) :param monitord_port: port for the monitord GRPC interface (default: 5305) :param spectrumd_port: port for the spectrumd GRPC interface (default: 5306) """ # connect to device logger.debug("Connecting to device at {}:{} and {}:{}.".format(addr, monitord_port, addr, receiverd_port)) # create monitord stub self._monitord_channel = grpc.insecure_channel("{}:{}".format(addr, monitord_port)) self._monitord_stub = Monitord_pb2_grpc.MonitordStub(self._monitord_channel) # create receiverd stub self._receiverd_channel = grpc.insecure_channel("{}:{}".format(addr, receiverd_port)) self._receiverd_stub = Receiverd_pb2_grpc.ReceiverdStub(self._receiverd_channel) # create spectrum stub self._spectrumd_channel = grpc.insecure_channel("{}:{}".format(addr, spectrumd_port)) self._spectrumd_stub = Spectrumd_pb2_grpc.SpectrumdStub(self._spectrumd_channel)
[docs] def get_gnss_info(self): """ Use this function to retrieve the device's location, fix status and quality, and GNSS hardware status. :return: dictionary containing the current status of the GNSS (GPS) synchronization """ logger.debug("Retrieving GPS status.") return self._monitord_stub.GetGNSSInformation(Empty())
[docs] def get_radio_status(self): """ The Mode S receivers internal status consists of internal gain values, gain control settings and DC offset calibration status and outcome. :return: dictionary containing information about the Mode S receivers current internal status """ logger.debug("Retrieving radio front-end status.") return self._receiverd_stub.GetRadioFrontEndStatus(Empty())
[docs] def get_tracked_aircraft(self): """ Use this to get a convenient list of all aircraft hat are currently tracked by the receiver including most important tracking information (location, velocity, etc) and meta information such as average frame rate and signal strength. :return: list containing tracked aircraft states """ logger.debug("Retrieving tracked aircraft") return self._receiverd_stub.GetStateVectors(Empty())
[docs] def subscribe_modes(self, downlink_formats=None, sample_subscriptions=None): """ Subscribe to a stream of incoming Mode S messages/squitters. The method returns a tuple of (subscription, stream). The subscription object has a "cancel()" method which will stop message streaming. The stream is an iterator with messages. It will print warnings if the network bandwidth is exhausted and the device drops data. :param downlink_formats: A list of downlink formats to subscribe to. Only messages matching one of the provided DFs will be provided in the stream. If the argument is None (default), all DFs will be subscribed. :param sample_subscriptions: A list of SampleFilter objects to retrieve IQ samples. The default (None or missing) behavior is to provide no samples. See documentation of SampleFilter class for more info.<br><br> <b>Important:</b> I/Q samples have a high data volume. Subscribing to too many samples may cause data loss. :return (subscription, stream): call subscription.cancel() to stop message streaming. The stream is an iterator of messages according to the specified filters. """ downlink_formats = list(range(25)) if downlink_formats is None else downlink_formats sample_subscriptions = [] if sample_subscriptions is None else sample_subscriptions req = Receiverd_pb2.GetModeSDownlinkFramesRequest() req.downlink_formats.extend(downlink_formats) for sub in sample_subscriptions: rule = req.sample_enable_rules.add() if sub.address is None: rule.address.aq = Receiverd_pb2.QualifiedAddressModeS.AddressQualifier.MATCH_ANY_ADDRESS else: rule.address.aq = Receiverd_pb2.QualifiedAddressModeS.AddressQualifier.MATCH_ANY_QUALIFIER rule.address.address = sub.address rule.downlink_formats.extend(sub.downlink_formats) reply_stream = self._receiverd_stub.GetModeSDownlinkFrames(req) def generator(): last_dropped_frames = 0 while reply_stream.is_active(): try: reply = reply_stream.next() except Exception as e: # done logger.debug("Stopped iterating replies due to %s: %s", e.__class__.__name__, e) return if reply.frames_dropped > last_dropped_frames: logger.warning("Network overload! Receiver dropped {} frames." .format(reply.frames_dropped-last_dropped_frames)) last_dropped_frames = reply.frames_dropped yield reply return reply_stream, generator()
[docs] def get_fft_parameters(self): """ Request the parameters used to generate the aggregated FFT streams used for the waterfall plots. Parameters include center frequency, samples rate and fft size and more. See the Spectrumd.proto file for more details. :return: dictionary containing the parameters needed to interpret the FFT data steams """ logger.debug("Retrieving FFT parameters.") return self._spectrumd_stub.GetAggregatedFFTProperties(Empty())
[docs] def subscribe_fft(self): """ Subscribe to a stream of aggregated FFT blocks. This can be used to monitor and view the RF band around 1090 MHz. Use get_fft_parameters() to retrieve all information needed to interpret the data stream. See the Spectrumd.proto file for more information how the single FFT results are aggregated (average and peak) into one block. :return: (subscription, stream): call subscription.cancel() to stop message streaming. The stream is an iterator of FFT blocks aggregated according to the parameters returned by get_fft_parameters(). """ logger.debug("Subscribing to aggregated FFT stream.") fft_stream = self._spectrumd_stub.GetAggregatedFFTBlockStream(Empty()) def generator(): while fft_stream.is_active(): try: block = fft_stream.next() except Exception as e: # done logger.debug("Stopped receiving FFT blocks due to %s: %s", e.__class__.__name__, e) return yield block return fft_stream, generator()
[docs] def get_low_level_api (self): """ Use this to access the underlying stubs with direct access to the RPC calls defined in the Receiverd.proto, Monitord.proto, and Spectrumd.proto files. :return (receiverd, monitord, spectrumd): the three stubs that provide all API calls as defined in the respective .proto definitions. """ return self._receiverd_stub, self._monitord_stub, self._spectrumd_stub
[docs] def close(self): """ Use this to cancel the respective subscription. Call this if you want to change the current subscription and then just re-subscribe with the modified subscription settings. """ self._receiverd_channel.close() self._monitord_channel.close() self._spectrumd_channel.close()
[docs] @staticmethod def timestamp_to_datetime(timestamp, gps_offset=0): """ This function converts GPS nanosecond of the week timestamps provided by the device to Python's datetime objects. It assumes that the message was just received and determines the start of week based on the current system timestamp.<br><br> <b>Note:</b> timestamps are provided in GPS time, i.e., you'll have to add leap seconds to make it UTC. :param timestamp: GPS nanosecond of the week timestamp :param gps_offset: offset between GPS time and UTC :return: datetime object """ # we have to add 345600s because unix timestamp 0 was a Thursday ref = time.time() + 345600 start_of_week = math.floor(ref / 604800) * 604800 - 345600 - gps_offset return datetime.datetime.fromtimestamp(start_of_week + timestamp / 1e9)
[docs] class SampleFilter: """ Filter to specify for which aircraft and downlink formats, IQ samples should be delivered Use icao ID 0xffffffff to enable I/Q collection for all transponder IDs. In addition, you must specify a list of downlink formats for the matching transponders. The filter applies logical AND on transponder ID and downlink formats, i.e., IQ samples are only added for messages which match both rules. """
[docs] def __init__(self, address=None, downlink_formats=None): """ This class holds filter settings for I/Q samples subscriptions :param icao: transponder address to subscribe to; None (default) will subscribe to all addresses :param downlink_formats: list of downlink formats (decimal) to subscribe to; None (default) will subscribe to all downlink formats """ if downlink_formats is None: downlink_formats = list(range(32)) self.address = address self.downlink_formats = downlink_formats