import datetime
import logging
import math
import time
import grpc
from google.protobuf.empty_pb2 import Empty
from . import Monitord_pb2_grpc
from . import Receiverd_pb2, Receiverd_pb2_grpc
from . import Spectrumd_pb2_grpc
logger = logging.getLogger('GRX1090')
[docs]
class GRX1090:
"""
This class provides a simple wrapper for the device's protobuf-based GRPC interface.
"""
[docs]
def __init__(self, addr, receiverd_port=5303, monitord_port=5305, spectrumd_port=5306):
"""
Constructor
:param addr: network address (IP or hostname) of the device
:param receiverd_port: port for the receiverd GRPC interface (default: 5303)
:param monitord_port: port for the monitord GRPC interface (default: 5305)
:param spectrumd_port: port for the spectrumd GRPC interface (default: 5306)
"""
# connect to device
logger.debug("Connecting to device at {}:{} and {}:{}.".format(addr, monitord_port, addr, receiverd_port))
# create monitord stub
self._monitord_channel = grpc.insecure_channel("{}:{}".format(addr, monitord_port))
self._monitord_stub = Monitord_pb2_grpc.MonitordStub(self._monitord_channel)
# create receiverd stub
self._receiverd_channel = grpc.insecure_channel("{}:{}".format(addr, receiverd_port))
self._receiverd_stub = Receiverd_pb2_grpc.ReceiverdStub(self._receiverd_channel)
# create spectrum stub
self._spectrumd_channel = grpc.insecure_channel("{}:{}".format(addr, spectrumd_port))
self._spectrumd_stub = Spectrumd_pb2_grpc.SpectrumdStub(self._spectrumd_channel)
[docs]
def get_gnss_info(self):
"""
Use this function to retrieve the device's location, fix status and quality, and GNSS hardware status.
:return: dictionary containing the current status of the GNSS (GPS) synchronization
"""
logger.debug("Retrieving GPS status.")
return self._monitord_stub.GetGNSSInformation(Empty())
[docs]
def get_radio_status(self):
"""
The Mode S receivers internal status consists of internal gain values, gain control settings and
DC offset calibration status and outcome.
:return: dictionary containing information about the Mode S receivers current internal status
"""
logger.debug("Retrieving radio front-end status.")
return self._receiverd_stub.GetRadioFrontEndStatus(Empty())
[docs]
def get_tracked_aircraft(self):
"""
Use this to get a convenient list of all aircraft hat are currently tracked by the receiver
including most important tracking information (location, velocity, etc) and meta information
such as average frame rate and signal strength.
:return: list containing tracked aircraft states
"""
logger.debug("Retrieving tracked aircraft")
return self._receiverd_stub.GetStateVectors(Empty())
[docs]
def subscribe_modes(self, downlink_formats=None, sample_subscriptions=None):
"""
Subscribe to a stream of incoming Mode S messages/squitters. The method returns a tuple of
(subscription, stream). The subscription object has a "cancel()" method which will stop message streaming.
The stream is an iterator with messages. It will print warnings if the network bandwidth is
exhausted and the device drops data.
:param downlink_formats: A list of downlink formats to subscribe to. Only messages matching one of the provided
DFs will be provided in the stream. If the argument is None (default), all DFs will be
subscribed.
:param sample_subscriptions: A list of SampleFilter objects to retrieve IQ samples. The default (None or missing)
behavior is to provide no samples. See documentation of SampleFilter class for more
info.<br><br>
<b>Important:</b> I/Q samples have a high data volume. Subscribing to
too many samples may cause data loss.
:return (subscription, stream): call subscription.cancel() to stop message streaming. The stream is an iterator
of messages according to the specified filters.
"""
downlink_formats = list(range(25)) if downlink_formats is None else downlink_formats
sample_subscriptions = [] if sample_subscriptions is None else sample_subscriptions
req = Receiverd_pb2.GetModeSDownlinkFramesRequest()
req.downlink_formats.extend(downlink_formats)
for sub in sample_subscriptions:
rule = req.sample_enable_rules.add()
if sub.address is None:
rule.address.aq = Receiverd_pb2.QualifiedAddressModeS.AddressQualifier.MATCH_ANY_ADDRESS
else:
rule.address.aq = Receiverd_pb2.QualifiedAddressModeS.AddressQualifier.MATCH_ANY_QUALIFIER
rule.address.address = sub.address
rule.downlink_formats.extend(sub.downlink_formats)
reply_stream = self._receiverd_stub.GetModeSDownlinkFrames(req)
def generator():
last_dropped_frames = 0
while reply_stream.is_active():
try:
reply = reply_stream.next()
except Exception as e: # done
logger.debug("Stopped iterating replies due to %s: %s", e.__class__.__name__, e)
return
if reply.frames_dropped > last_dropped_frames:
logger.warning("Network overload! Receiver dropped {} frames."
.format(reply.frames_dropped-last_dropped_frames))
last_dropped_frames = reply.frames_dropped
yield reply
return reply_stream, generator()
[docs]
def get_fft_parameters(self):
"""
Request the parameters used to generate the aggregated FFT streams used for the
waterfall plots. Parameters include center frequency, samples rate and fft size
and more. See the Spectrumd.proto file for more details.
:return: dictionary containing the parameters needed to interpret the FFT data steams
"""
logger.debug("Retrieving FFT parameters.")
return self._spectrumd_stub.GetAggregatedFFTProperties(Empty())
[docs]
def subscribe_fft(self):
"""
Subscribe to a stream of aggregated FFT blocks. This can be used to monitor and view
the RF band around 1090 MHz. Use get_fft_parameters() to retrieve all information
needed to interpret the data stream. See the Spectrumd.proto file for more
information how the single FFT results are aggregated (average and peak) into one block.
:return: (subscription, stream): call subscription.cancel() to stop message streaming.
The stream is an iterator of FFT blocks aggregated according
to the parameters returned by get_fft_parameters().
"""
logger.debug("Subscribing to aggregated FFT stream.")
fft_stream = self._spectrumd_stub.GetAggregatedFFTBlockStream(Empty())
def generator():
while fft_stream.is_active():
try:
block = fft_stream.next()
except Exception as e: # done
logger.debug("Stopped receiving FFT blocks due to %s: %s", e.__class__.__name__, e)
return
yield block
return fft_stream, generator()
[docs]
def get_low_level_api (self):
"""
Use this to access the underlying stubs with direct access to the RPC calls defined
in the Receiverd.proto, Monitord.proto, and Spectrumd.proto files.
:return (receiverd, monitord, spectrumd): the three stubs that provide all API calls
as defined in the respective .proto definitions.
"""
return self._receiverd_stub, self._monitord_stub, self._spectrumd_stub
[docs]
def close(self):
"""
Use this to cancel the respective subscription. Call this if you want to change the current subscription
and then just re-subscribe with the modified subscription settings.
"""
self._receiverd_channel.close()
self._monitord_channel.close()
self._spectrumd_channel.close()
[docs]
@staticmethod
def timestamp_to_datetime(timestamp, gps_offset=0):
"""
This function converts GPS nanosecond of the week timestamps provided by the device to Python's datetime
objects. It assumes that the message was just received and determines the start of week based on the current
system timestamp.<br><br>
<b>Note:</b> timestamps are provided in GPS time, i.e., you'll have to add leap seconds to make it UTC.
:param timestamp: GPS nanosecond of the week timestamp
:param gps_offset: offset between GPS time and UTC
:return: datetime object
"""
# we have to add 345600s because unix timestamp 0 was a Thursday
ref = time.time() + 345600
start_of_week = math.floor(ref / 604800) * 604800 - 345600 - gps_offset
return datetime.datetime.fromtimestamp(start_of_week + timestamp / 1e9)
[docs]
class SampleFilter:
"""
Filter to specify for which aircraft and downlink formats, IQ samples should be delivered
Use icao ID 0xffffffff to enable I/Q collection for all transponder IDs. In addition, you must specify a list of
downlink formats for the matching transponders. The filter applies logical AND on transponder ID and downlink
formats, i.e., IQ samples are only added for messages which match both rules.
"""
[docs]
def __init__(self, address=None, downlink_formats=None):
"""
This class holds filter settings for I/Q samples subscriptions
:param icao: transponder address to subscribe to; None (default) will subscribe to all addresses
:param downlink_formats: list of downlink formats (decimal) to subscribe to; None (default) will subscribe to
all downlink formats
"""
if downlink_formats is None:
downlink_formats = list(range(32))
self.address = address
self.downlink_formats = downlink_formats