SecureTrack Python Streaming API
Note
The API is publicly available via TCP under the address api.secureadsb.com:4201
.
Installation
Before using the SecureTrack API Python bindings, you have to install Python’s Protocol Buffers and gRPC libraries. This
can be done, e.g., with pip
as follows:
python -m pip install -U grpcio
More details and troubleshooting can be found at the official gRPC python implementation on github:
The SecureTrack API Python bindings (including pre-generated Protocol Buffer/gRPC bindings) can be generated from the
SeRoAPI.proto
file using this command:
python -m grpc_tools.protoc -I./ --python_out=. --grpc_python_out=. SeRoAPI.proto
Note that you have to install the grpcio-tools
package for this first:
python -m pip install grpcio-tools
Alternatively, you can download the pre-generated bindings here:
The archive contains the following files, which are also output of the above protoc
call:
SeRoAPI_pb2_grpc.py - generated Python gRPC method stubs
SeRoAPI_pb2.py - generated Python Protocol Buffers bindings (data structures)
In order to use the module, you have to make sure these two files are in your PYTHONPATH
. You can achieve that by
simply putting them into your working directory. Once done, you can use the SecureTrack API as follows.
Quick Start
Start by importing the required modules into your script:
import grpc # basic gRPC functions
import SeRoAPI_pb2 as pb # SecureTrack API SerDes
import SeRoAPI_pb2_grpc as st # SecureTrack API stubs
Then create a TLS channel and a stub that allows you to make calls to the API.
Note that we use Transport Layer Security to have Transport Layer Security, which needs a certificate (ca.crt
).
cert = open('ca.crt', 'rb').read() # point this to the 'ca.crt' file
credentials = grpc.ssl_channel_credentials(cert)
ch = grpc.secure_channel(address, credentials)
stub = st.SeRoAPIStub(ch)
Using this stub, we will now retrieve information about all sensors. Note that we use Token Authentication to authenticate API users:
# create request for retrieving my sensors
req = pb.SensorInfoRequest(token='INSERT YOUR TOKEN HERE')
# call the API method
res = stub.GetSensorInfo(req)
In order to a detailed overview which information is provided by the call (and stored in res
), just have a look at
the call and data definitions under Protocol Buffers Definition. For example, to access a sensor’s location, you would access
the gnss
field of each SensorInformation object in res
:
for info in res.sensorInfo:
print("\t{} ({}): located at {:.4f},{:.4f}".format(
info.sensor.serial, pb.Sensor.Type.Name(info.sensor.type),
info.gnss.position.latitude, info.gnss.position.longitude))
While the GetSensorInfo
gRPC call returns a single response object, the other two calls (GetTargetReports
and
GetModeSDownlinkFrames
) return streams of objects. These streams can be iterated and are in theory infinite. Note,
however, that in practice streams get sometimes interrupted by interruptions in network connections or restart of the
server or client software.
Warning
Depending on the number of sensors associated with your token and their coverage, the data volume of the stream can be very high. We recommend to apply as many filters as possible to narrow down the data stream to what’s really important to you.
For example: a sensor that is located in a high traffic density area with with a 450 km range in all directions can produce up to about 2500 Mode S replies per second. If you are only interested in ADS-B data, add a filter for downlink format 17 to enable server-side filtering.
Here is an example for how to work with a stream in Python:
# subscribe to target reports
it = stub.GetTargetReports(pb.TargetReportsRequest(token='INSERT YOUR TOKEN HERE'))
for tgt in it:
if tgt.HasField('mlat'):
# target is tracked by MLAT
...
if tgt.HasField('adsb'):
# target is tracked via ADS-B
...
Warning
It is important to shutdown streams gracefully if your program keeps running after reading from a stream or if you want to change the subscription (filters). Otherwise the stream will just continue receiving and buffering data in the background which will occupy unnecessary memory and network resources.
Be kind and cancel the subscription to let the server know it can stop sending data:
# close the stream
subscription.cancel()
For more detailed examples, see Examples below.
Further Reading
Examples
Synchronous (Blocking)
#!/usr/bin/python
"""
This example shows how to use the public SecureTrack gRPC API.
"""
# API-specific imports
import grpc # basic gRPC functions
# some basic imports
import time
import SeRoAPI_pb2 as pb # SecureTrack API SerDes
import SeRoAPI_pb2_grpc as st # SecureTrack API stubs
# some configuration
token = 'INSERT YOUR TOKEN HERE'
address = 'api.secureadsb.com:4201'
ca_cert = 'ca.crt' # point this to the ca.crt file
info_timeout = 15 # seconds
# create channel and stub
cert = open(ca_cert, 'rb').read()
credentials = grpc.ssl_channel_credentials(cert)
ch = grpc.secure_channel(address, credentials)
stub = st.SeRoAPIStub(ch)
# create request for retrieving my sensors
req = pb.SensorInfoRequest(token=token)
# call the API method
res = stub.GetSensorInfo(req)
# print result
print(f'API returned {len(res.sensorInfo)} sensors for this token:')
for info in res.sensorInfo:
print("\t{} ({}, alias {}): located at {:.4f},{:.4f} (current fix: {})".format(
info.sensor.serial, pb.Sensor.Type.Name(info.sensor.type), info.alias,
info.gnss.position.latitude, info.gnss.position.longitude,
pb.GNSSInformation.Position.FixType.Name(info.gnss.position.fix_type)))
# subscribe to target reports for 10 seconds
res = stub.GetTargetReports(pb.TargetReportsRequest(token=token))
print("Retrieving target reports for 10 seconds.")
start = time.time()
aircraft = dict()
cnt = 0
dropped = 0
for tgt in res:
if time.time() - start > 10:
break
# count and store
cnt += 1
aircraft[tgt.target.address] = tgt
if tgt.dropped_reports - dropped > 0:
print(f'Reports were dropped: {tgt.dropped_reports}')
dropped = tgt.dropped_reports
# graceful shutdown
res.cancel()
print("Done! We received {} reports from {} targets.".format(cnt, len(aircraft)))
# print some stats
adsb_cnt = 0
mlat_cnt = 0
for report in aircraft.values():
if report.HasField('mlat') and time.time() - report.mlat.tx_timestamp < info_timeout:
mlat_cnt += 1
if report.HasField('adsb') and time.time() - report.adsb.position_last_seen < info_timeout:
adsb_cnt += 1
print(report)
print("Out of {} targets, {} had an ADS-B position and {} were also tracked by MLAT.".format(
len(aircraft), adsb_cnt, mlat_cnt))
Asynchronous (Non-blocking)
For asynchronous (non-blocking) interactions with gRPC APIs, the gRPC folks created the gRPC AsyncIO API:
It can be used in conjunction with Python’s asyncio
library. Documentation and example for that are available in
the official asyncio
documentation:
#!/usr/bin/python
"""
This example shows how to use the public SecureTrack gRPC API in an asynchronous
fashion using asyncio.
"""
# Python's asynchronous I/O library
import asyncio
# API-specific imports
import grpc # basic gRPC functions
# some basic imports
import time
import SeRoAPI_pb2 as pb # SecureTrack API SerDes
import SeRoAPI_pb2_grpc as st # SecureTrack API stubs
# some configuration
token = 'INSERT YOUR TOKEN HERE'
address = 'api.secureadsb.com:4201'
ca_cert = 'ca.crt' # point this to the ca.crt file
# subscribes to target reports for 10 seconds
async def stream_handler():
global token, address
# Note: channel will be closed automatically when we leave the with statement
cert = open(ca_cert, 'rb').read()
credentials = grpc.ssl_channel_credentials(cert)
async with grpc.aio.secure_channel(address, credentials) as channel:
print("Starting to retrieve data for 10 seconds.")
stub = st.SeRoAPIStub(channel)
it = stub.GetTargetReports(pb.TargetReportsRequest(token=token))
tgts = dict()
start = time.time()
try:
async for tgt in it.__aiter__():
if time.time() - start > 10:
break
tgts[tgt.target.address] = tgt
except grpc.RpcError as e:
print(e)
it.cancel()
print('I am also done retrieving data.')
print(f'We have seen {len(tgts)} different aircraft.')
# sleeps for 5 seconds
async def chiller():
print("In the meantime, I am going to chill for 5 seconds.")
await asyncio.sleep(5)
print("Feeling pretty relaxed now. Returning to main!")
# run the two above methods in parallel
async def main():
await asyncio.gather(stream_handler(), chiller())
print("Calling async main method.")
asyncio.run(main())
print("Exiting, bye bye.")
Raw Data (Mode S downlink frames)
#!/usr/bin/python
"""
This example shows how to use the public SecureTrack gRPC API.
"""
# API-specific imports
import grpc # basic gRPC functions
# some basic imports
import time
import SeRoAPI_pb2 as pb # SecureTrack API SerDes
import SeRoAPI_pb2_grpc as st # SecureTrack API stubs
# some configuration
token = 'INSERT YOUR TOKEN HERE'
address = 'api.secureadsb.com:4201'
ca_cert = 'ca.crt' # point this to the ca.crt file
duration = 100
# create channel and stub
cert = open(ca_cert, 'rb').read()
credentials = grpc.ssl_channel_credentials(cert)
ch = grpc.secure_channel(address, credentials)
stub = st.SeRoAPIStub(ch)
# create request for retrieving my sensors
req = pb.ModeSDownlinkFramesRequest(token=token)
print(f'Retrieving Mode S replies for {duration} seconds.')
res = stub.GetModeSDownlinkFrames(req)
tmp = pb.ModeSDownlinkFrame
start = time.time()
aircraft = dict()
cnt = 0
dropped = 0
for rply in res:
if rply.receptions[0].sensor_timestamp/1000 - start > duration: break
# count and store
cnt += 1
if rply.target.address in aircraft:
aircraft[rply.target.address] += 1
else:
aircraft[rply.target.address] = 1
if rply.dropped_frames - dropped > 0:
print(f'Frames were dropped: {rply.dropped_frames}')
dropped = rply.dropped_frames
# graceful shutdown
res.cancel()
print("Done! We received {} replies from {} targets.".format(cnt, len(aircraft)))
print(f'We dropped {dropped} replies.')