GRX1090 Python API
To simplify access to the API, we are providing a high-level Python API which encapsulates the gRPC bindings and provides some useful functions to work with the provided data.
Installation
Before using the Python bindings, you have to install Python’s Protocol Buffers and gRPC libraries. This can
be done, e.g., with pip
as follows:
python -m pip install -U pip
python -m pip install -U grpcio
The GRX1090 Python bindings (including pre-generated Protocol Buffer/gRPC bindings) can be downloaded
here
. The archive contains a Python module called grx
which contains
the following files:
grx
├── __init__.py - package initialization script
├── GRX1090.py - high-level API to the gRPC interface
├── *_pb2.py - generated Python Protocol Buffers bindings
├── *_pb2_grpc.py - generated Python gRPC bindings
└── *_pb2_grpc.py - dependencies of the library (see above)
In order to use the module, you have to make sure the grx
package is in your PYTHONPATH
. You can
achieve that by simply putting the grx
folder into your working directory. Once done, you can use the
GRX1090 API as follows.
Usage
from grx import GRX1090
grx = GRX1090("192.168.3.241") # replace IP address
If you connect to your device via another network, you’ll have to change the IP address in the constructor
accordingly. Once you created a GRX1090
object, retrieving data (received Mode S downlink transmissions) from the
device is as simple as that:
# subscribe to all Mode S messages (without I/Q samples)
subscription, stream = grx.subscribe_modes()
# print all incoming Mode S signals (including metadata)
for reply in stream:
print(reply)
Once you are done with processing or if you want to change the subscription, close the subscription using this method:
# close the stream
subscription.cancel()
For more complex examples, see Examples.
Note
The low level gRPC API that provides access to all methods and information described in the Protocol Buffer schema files
can be accessed through the gRPC stubs returned by the get_low_level_api
method of the GRX1090
class.
API Doc
The simplified API for retrieving data from the GRX1090 receiver is encapsulated in the grx.GRX1090
class. Filters for I/Q samples can be specified using the grx.SampleFilter
class.
- class grx.GRX1090(addr, receiverd_port=5303, monitord_port=5305, spectrumd_port=5306)[source]
This class provides a simple wrapper for the device’s protobuf-based GRPC interface.
- __init__(addr, receiverd_port=5303, monitord_port=5305, spectrumd_port=5306)[source]
Constructor
- Parameters:
addr – network address (IP or hostname) of the device
receiverd_port – port for the receiverd GRPC interface (default: 5303)
monitord_port – port for the monitord GRPC interface (default: 5305)
spectrumd_port – port for the spectrumd GRPC interface (default: 5306)
- close()[source]
Use this to cancel the respective subscription. Call this if you want to change the current subscription and then just re-subscribe with the modified subscription settings.
- get_fft_parameters()[source]
Request the parameters used to generate the aggregated FFT streams used for the waterfall plots. Parameters include center frequency, samples rate and fft size and more. See the Spectrumd.proto file for more details.
- Returns:
dictionary containing the parameters needed to interpret the FFT data steams
- get_gnss_info()[source]
Use this function to retrieve the device’s location, fix status and quality, and GNSS hardware status.
- Returns:
dictionary containing the current status of the GNSS (GPS) synchronization
- get_low_level_api()[source]
Use this to access the underlying stubs with direct access to the RPC calls defined in the Receiverd.proto, Monitord.proto, and Spectrumd.proto files.
- Return (receiverd, monitord, spectrumd):
the three stubs that provide all API calls as defined in the respective .proto definitions.
- get_radio_status()[source]
The Mode S receivers internal status consists of internal gain values, gain control settings and DC offset calibration status and outcome.
- Returns:
dictionary containing information about the Mode S receivers current internal status
- get_tracked_aircraft()[source]
Use this to get a convenient list of all aircraft hat are currently tracked by the receiver including most important tracking information (location, velocity, etc) and meta information such as average frame rate and signal strength.
- Returns:
list containing tracked aircraft states
- subscribe_fft()[source]
Subscribe to a stream of aggregated FFT blocks. This can be used to monitor and view the RF band around 1090 MHz. Use get_fft_parameters() to retrieve all information needed to interpret the data stream. See the Spectrumd.proto file for more information how the single FFT results are aggregated (average and peak) into one block.
- Returns:
(subscription, stream): call subscription.cancel() to stop message streaming. The stream is an iterator of FFT blocks aggregated according to the parameters returned by get_fft_parameters().
- subscribe_modes(downlink_formats=None, sample_subscriptions=None)[source]
Subscribe to a stream of incoming Mode S messages/squitters. The method returns a tuple of (subscription, stream). The subscription object has a “cancel()” method which will stop message streaming. The stream is an iterator with messages. It will print warnings if the network bandwidth is exhausted and the device drops data.
- Parameters:
downlink_formats – A list of downlink formats to subscribe to. Only messages matching one of the provided DFs will be provided in the stream. If the argument is None (default), all DFs will be subscribed.
sample_subscriptions – A list of SampleFilter objects to retrieve IQ samples. The default (None or missing) behavior is to provide no samples. See documentation of SampleFilter class for more info.<br><br> <b>Important:</b> I/Q samples have a high data volume. Subscribing to too many samples may cause data loss.
- Return (subscription, stream):
call subscription.cancel() to stop message streaming. The stream is an iterator of messages according to the specified filters.
- static timestamp_to_datetime(timestamp, gps_offset=0)[source]
This function converts GPS nanosecond of the week timestamps provided by the device to Python’s datetime objects. It assumes that the message was just received and determines the start of week based on the current system timestamp.<br><br> <b>Note:</b> timestamps are provided in GPS time, i.e., you’ll have to add leap seconds to make it UTC.
- Parameters:
timestamp – GPS nanosecond of the week timestamp
gps_offset – offset between GPS time and UTC
- Returns:
datetime object
- class grx.SampleFilter(address=None, downlink_formats=None)[source]
Filter to specify for which aircraft and downlink formats, IQ samples should be delivered Use icao ID 0xffffffff to enable I/Q collection for all transponder IDs. In addition, you must specify a list of downlink formats for the matching transponders. The filter applies logical AND on transponder ID and downlink formats, i.e., IQ samples are only added for messages which match both rules.
- __init__(address=None, downlink_formats=None)[source]
This class holds filter settings for I/Q samples subscriptions
- Parameters:
icao – transponder address to subscribe to; None (default) will subscribe to all addresses
downlink_formats – list of downlink formats (decimal) to subscribe to; None (default) will subscribe to all downlink formats
Examples
In addition to the grpcio dependencies mentioned above, the following examples make use of a few external libraries to decode or show the data provided by the GRX1090. To install these dependencies using pip, run the following:
# Mode S and ADS-B decoding library for Python
python -m pip install -U pyModeS
# extensive scientific computing library
python -m pip install -U numpy
# extensive plotting library
python -m pip install -U matplotlib
# python image library
python -m pip install -U Pillow
Using the filters
import binascii
import datetime
import sys
import pyModeS as pms
from grx import GRX1090
# create device object
if len(sys.argv) == 2:
print("Connecting to {}.".format(sys.argv[1]))
grx = GRX1090(sys.argv[1])
else:
grx = GRX1090("192.168.3.241")
# we only want to receive DF 11 and 17
subscription, stream = grx.subscribe_modes([11, 17])
# retrieve data
for reply in stream:
payload = binascii.hexlify(reply.frame.payload).decode('UTF-8')
timestamp = GRX1090.timestamp_to_datetime(reply.frame.timestamp, 18) # 18 leap seconds as of 2020
print("""
Received Mode S downlink transmission:
Timestamp: {} ({} ns)
Delay: {}
Payload (hex): {}
DF/ICAO 24-bit ID: {}/0x{}
Signal level: {} dBm
Noise level: {} dBm
Carrier offset: {:.0f} Hz (error: {:.4f})
No of I/Q Samples: {:.0f}
Dropped: {} Frames
""".format(timestamp, # timestamp in local time
reply.frame.timestamp, # raw timestamp
datetime.datetime.now() - timestamp, # measure delay to local system time
payload,
pms.df(payload), pms.icao(payload),
reply.frame.level_signal,
reply.frame.level_noise,
reply.frame.carrier_frequency_offset,
reply.frame.carrier_frequency_estimation_error,
len(reply.frame.samples.samples) / 4,
reply.frames_dropped))
Note
If you want to do more than just displaying the received data, we recommend using Junzi Sun’s Python Mode S decoder available here: https://github.com/junzis/pyModeS. It can process the received Mode S downlink transmissions in the hex format printed in this example.
Retrieve GPS information
Note
There is information on the accuracy of the provided GPS information available, too. Have a look at the Protocol Buffers Definitions for more details.
import signal
import sys
import time
import grpc
from google.protobuf.empty_pb2 import Empty
from google.protobuf.json_format import MessageToJson
from grx import Monitord_pb2_grpc
#logger = logging.getLogger('GRX1090')
# create device object
if len(sys.argv) == 2:
#print("Connecting to {}.".format(sys.argv[1]))
host = sys.argv[1]
else:
host = "192.168.3.241"
receiverd_port=5303
monitord_port=5305
spectrumd_port=5306
# create monitord stub
monitord_channel = grpc.insecure_channel("{}:{}".format(host, monitord_port))
monitord_stub = Monitord_pb2_grpc.MonitordStub(monitord_channel)
def signal_handler(sig, frame):
print(']\n')
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
print('[')
notFirst = False
while True:
info = monitord_stub.GetGNSSInformation(Empty())
if notFirst: print(',\n')
print(f'{MessageToJson(info)}',)
notFirst = True
time.sleep(1)
Retrieving I/Q samples
See Retrieving I/Q Samples for more information on how I/Q sample collection works.
import binascii
import datetime
import sys
import pyModeS as pms
from grx import GRX1090, SampleFilter
# create device object
if len(sys.argv) == 2:
print("Connecting to {}.".format(sys.argv[1]))
grx = GRX1090(sys.argv[1])
else:
grx = GRX1090("192.168.3.241")
# subscribe to I/Q samples for all DF17 messages of all aircraft and
# additionally DF11 for aircraft with ICAO 24-bit transponder address 0xa835af
sample_subs = [SampleFilter(None, [17]),
SampleFilter(0xa835af, [11])]
# subscribe to DF11 and DF17 Mode S frames, samples according to the above filters
subscription, stream = grx.subscribe_modes([11, 17], sample_subs)
# retrieve 10 frames including samples for DF11 and DF17
cnt = 10
while cnt > 0:
# retrieve next reply
reply = next(stream)
payload = binascii.hexlify(reply.frame.payload).decode('UTF-8')
timestamp = GRX1090.timestamp_to_datetime(reply.frame.timestamp, 18) # 18 leap seconds as of 2020
print("""
Received Mode S downlink transmission:
Timestamp: {} ({} ns)
Delay: {}
Payload (hex): {}
DF/ICAO 24-bit ID: {}/0x{}
Signal level: {} dBm
Noise level: {} dBm
Carrier offset: {:.0f} Hz (error: {:.4f})
No of I/Q Samples: {:.0f}
Dropped: {} Frames
""".format(timestamp, # timestamp in local time
reply.frame.timestamp, # raw timestamp
datetime.datetime.now() - timestamp, # measure delay to local system time
payload,
pms.df(payload), pms.icao(payload),
reply.frame.level_signal,
reply.frame.level_noise,
reply.frame.carrier_frequency_offset,
reply.frame.carrier_frequency_estimation_error,
# samples are provided as bytes but are actually encoded as two shorts
len(reply.frame.samples.samples) / 4,
reply.frames_dropped))
cnt -= 1
# close stream
subscription.cancel()
Handling network limits
Note
It is generally a good idea to turn on logging as in the example below because only then will our library warn you about network buffer overruns on the GRX1090 automatically.
import binascii
import datetime
import logging
import sys
import pyModeS as pms
from grx import GRX1090, SampleFilter
# create device object
if len(sys.argv) == 2:
print("Connecting to {}.".format(sys.argv[1]))
grx = GRX1090(sys.argv[1])
else:
grx = GRX1090("192.168.3.241")
# subscribe to I/Q samples for all formats of all aircraft
sample_subs = [SampleFilter(None, list(range(25)))]
# subscribe and get stream
subscription, stream = grx.subscribe_modes(sample_subscriptions=sample_subs)
logging.basicConfig(level=logging.INFO)
# retrieve 10 frames with samples
i = 0
for reply in stream:
payload = binascii.hexlify(reply.frame.payload).decode('UTF-8')
timestamp = GRX1090.timestamp_to_datetime(reply.frame.timestamp, 18) # 18 leap seconds as of 2020
print("""
Received Mode S downlink transmission:
Timestamp: {} ({} ns)
Delay: {}
Payload (hex): {}
DF/ICAO 24-bit ID: {}/0x{}
Signal level: {} dBm
Noise level: {} dBm
Carrier offset: {:.0f} Hz (error: {:.4f})
No of I/Q Samples: {:.0f}
Dropped: {} Frames
""".format(timestamp, # timestamp in local time
reply.frame.timestamp, # raw timestamp
datetime.datetime.now() - timestamp, # measure delay to local system time
payload,
pms.df(payload), pms.icao(payload),
reply.frame.level_signal,
reply.frame.level_noise,
reply.frame.carrier_frequency_offset,
reply.frame.carrier_frequency_estimation_error,
len(reply.frame.samples.samples)/4,
reply.frames_dropped))
# Abort after 10 frames
i += 1
if i >= 10:
subscription.cancel()
print("Done. Subscription has ended: " + repr(subscription.done()))
Spectrum Monitoring
The following example shows the output of the subscribe_fft call to the Python wrapper:

import sys
import matplotlib.animation as animation
import matplotlib.pyplot as plt
import numpy as np
from grx import GRX1090
# create device object
if len(sys.argv) == 2:
print("Connecting to {}.".format(sys.argv[1]))
grx = GRX1090(sys.argv[1])
else:
grx = GRX1090("192.168.3.241")
# retrieve and print Mode S status
parms = grx.get_fft_parameters()
print("""
FFTs are currently aggregated with the following parameters:
Center frequency: {} MHz
Sample rate: {} MSamples/s
FFT size: {} samples
Aggregation Factor: {}
""".format(parms.center_frequency/1e6, parms.sample_rate/1e6,
parms.fft_size, parms.aggregation_factor))
# subscribe to FFT stream
subscription, stream = grx.subscribe_fft()
# calculate frequencies based on the parameters retrieved above
stepw = parms.sample_rate/parms.fft_size
lfreq = - 0.5*parms.sample_rate
ufreq = 0.5*parms.sample_rate - stepw
freqs = (lfreq + np.array(range(parms.fft_size))*stepw)/1e6
fig = plt.figure()
ax = fig.add_subplot(1, 1, 1)
# retrieve first block
block = next(stream)
# Draw x and y lists
avgpoints = ax.plot(freqs.astype(float), block.bins_avg, label="Average")[0]
peakpoints = ax.plot(freqs.astype(float), block.bins_peak, label="Peak")[0]
# Format plot
plt.title('1090 MHz RF Band')
plt.ylabel('Amplitude (dBm)')
plt.xlabel('Frequency Offset (MHz)')
ax.legend()
# rescale axis
ax.set_ylim(-180, -50)
def update(i):
global stream, freqs
b = next(stream)
avgpoints.set_data(freqs, b.bins_avg)
peakpoints.set_data(freqs, b.bins_peak)
# start animation
ani = animation.FuncAnimation(fig, update, interval=0.01)
plt.show()
Waterfall Chart
import datetime
import io
import sys
import PIL.Image
from grx import GRX1090, Spectrumd_pb2
# create device object
if len(sys.argv) == 2:
print("Connecting to {}.".format(sys.argv[1]))
grx = GRX1090(sys.argv[1])
else:
grx = GRX1090("192.168.3.241")
# retrieve and print Mode S status
parms = grx.get_fft_parameters()
print("""
FFTs are currently aggregated with the following parameters:
Center frequency: {} MHz
Sample rate: {} MSamples/s
FFT size: {} samples
Aggregation Factor: {}
""".format(parms.center_frequency/1e6, parms.sample_rate/1e6,
parms.fft_size, parms.aggregation_factor))
# access call via underlying stub
_, _, spectrumd = grx.get_low_level_api()
duration = 10 # seconds
dline = parms.aggregation_factor * parms.fft_size / parms.sample_rate
nlines = int(duration / dline)
print("Will collect data for {:.2f} seconds.".format(nlines * dline))
# some default which look good for many setups
req = Spectrumd_pb2.GetWaterfallJPEGRequest(
num_lines=nlines, min_level=-180, max_level=-90, jpeg_quality=90,
aggregation_type=Spectrumd_pb2.GetWaterfallJPEGRequest.AVERAGE)
# retrieve SpecShot
dat = spectrumd.GetWaterfallJPEG(req)
# show it
tmp = io.BytesIO(dat.image)
picture = PIL.Image.open(tmp)
picture.show(title="GRX1090 SpecShot {} - {}".format(
datetime.datetime.fromtimestamp(dat.timestamp-nlines*dline),
datetime.datetime.fromtimestamp(dat.timestamp)))