SecureTrack Python Streaming API

Note

The API is publicly available via TCP under the address api.secureadsb.com:4201.

Installation

Before using the SecureTrack API Python bindings, you have to install Python’s Protocol Buffers and gRPC libraries. This can be done, e.g., with pip as follows:

python -m pip install -U grpcio

More details and troubleshooting can be found at the official gRPC python implementation on github:

The SecureTrack API Python bindings (including pre-generated Protocol Buffer/gRPC bindings) can be generated from the SeRoAPI.proto file using this command:

python -m grpc_tools.protoc -I./ --python_out=. --grpc_python_out=. SeRoAPI.proto

Note that you have to install the grpcio-tools package for this first:

python -m pip install grpcio-tools

Alternatively, you can download the pre-generated bindings here:

The archive contains the following files, which are also output of the above protoc call:

SeRoAPI_pb2_grpc.py  - generated Python gRPC method stubs
SeRoAPI_pb2.py       - generated Python Protocol Buffers bindings (data structures)

In order to use the module, you have to make sure these two files are in your PYTHONPATH. You can achieve that by simply putting them into your working directory. Once done, you can use the SecureTrack API as follows.

Quick Start

Start by importing the required modules into your script:

import grpc                    # basic gRPC functions
import SeRoAPI_pb2 as pb       # SecureTrack API SerDes
import SeRoAPI_pb2_grpc as st  # SecureTrack API stubs

Then create a TLS channel and a stub that allows you to make calls to the API. Note that we use Transport Layer Security to have Transport Layer Security, which needs a certificate (ca.crt).

cert = open('ca.crt', 'rb').read() # point this to the 'ca.crt' file
credentials = grpc.ssl_channel_credentials(cert)
ch = grpc.secure_channel(address, credentials)
stub = st.SeRoAPIStub(ch)

Using this stub, we will now retrieve information about all sensors. Note that we use Token Authentication to authenticate API users:

# create request for retrieving my sensors
req = pb.SensorInfoRequest(token='INSERT YOUR TOKEN HERE')

# call the API method
res = stub.GetSensorInfo(req)

In order to a detailed overview which information is provided by the call (and stored in res), just have a look at the call and data definitions under Protocol Buffers Definition. For example, to access a sensor’s location, you would access the gnss field of each SensorInformation object in res:

for info in res.sensorInfo:
    print("\t{} ({}): located at {:.4f},{:.4f}".format(
        info.sensor.serial, pb.Sensor.Type.Name(info.sensor.type),
        info.gnss.position.latitude, info.gnss.position.longitude))

While the GetSensorInfo gRPC call returns a single response object, the other two calls (GetTargetReports and GetModeSDownlinkFrames) return streams of objects. These streams can be iterated and are in theory infinite. Note, however, that in practice streams get sometimes interrupted by interruptions in network connections or restart of the server or client software.

Warning

Depending on the number of sensors associated with your token and their coverage, the data volume of the stream can be very high. We recommend to apply as many filters as possible to narrow down the data stream to what’s really important to you.

For example: a sensor that is located in a high traffic density area with with a 450 km range in all directions can produce up to about 2500 Mode S replies per second. If you are only interested in ADS-B data, add a filter for downlink format 17 to enable server-side filtering.

Here is an example for how to work with a stream in Python:

# subscribe to target reports
it = stub.GetTargetReports(pb.TargetReportsRequest(token='INSERT YOUR TOKEN HERE'))
for tgt in it:
    if tgt.HasField('mlat'):
        # target is tracked by MLAT
        ...

    if tgt.HasField('adsb'):
        # target is tracked via ADS-B
        ...

Warning

It is important to shutdown streams gracefully if your program keeps running after reading from a stream or if you want to change the subscription (filters). Otherwise the stream will just continue receiving and buffering data in the background which will occupy unnecessary memory and network resources.

Be kind and cancel the subscription to let the server know it can stop sending data:

# close the stream
subscription.cancel()

For more detailed examples, see Examples below.

Further Reading

Examples

Synchronous (Blocking)

#!/usr/bin/python

"""
This example shows how to use the public SecureTrack gRPC API.
"""

# API-specific imports
import grpc  # basic gRPC functions
# some basic imports
import time

import SeRoAPI_pb2 as pb  # SecureTrack API SerDes
import SeRoAPI_pb2_grpc as st  # SecureTrack API stubs

# some configuration
token = 'INSERT YOUR TOKEN HERE'
address = 'api.secureadsb.com:4201'
ca_cert = 'ca.crt' # point this to the ca.crt file
info_timeout = 15  # seconds

# create channel and stub
cert = open(ca_cert, 'rb').read()
credentials = grpc.ssl_channel_credentials(cert)
ch = grpc.secure_channel(address, credentials)
stub = st.SeRoAPIStub(ch)

# create request for retrieving my sensors
req = pb.SensorInfoRequest(token=token)

# call the API method
res = stub.GetSensorInfo(req)

# print result
print(f'API returned {len(res.sensorInfo)} sensors for this token:')
for info in res.sensorInfo:
    print("\t{} ({}, alias {}): located at {:.4f},{:.4f} (current fix: {})".format(
        info.sensor.serial, pb.Sensor.Type.Name(info.sensor.type), info.alias,
        info.gnss.position.latitude, info.gnss.position.longitude,
        pb.GNSSInformation.Position.FixType.Name(info.gnss.position.fix_type)))

# subscribe to target reports for 10 seconds
res = stub.GetTargetReports(pb.TargetReportsRequest(token=token))

print("Retrieving target reports for 10 seconds.")

start = time.time()
aircraft = dict()
cnt = 0
dropped = 0
for tgt in res:
    if time.time() - start > 10:
        break

    # count and store
    cnt += 1
    aircraft[tgt.target.address] = tgt

    if tgt.dropped_reports - dropped > 0:
        print(f'Reports were dropped: {tgt.dropped_reports}')
        dropped = tgt.dropped_reports

# graceful shutdown
res.cancel()

print("Done! We received {} reports from {} targets.".format(cnt, len(aircraft)))

# print some stats
adsb_cnt = 0
mlat_cnt = 0
for report in aircraft.values():
    if report.HasField('mlat') and time.time() - report.mlat.tx_timestamp < info_timeout:
        mlat_cnt += 1

    if report.HasField('adsb') and time.time() - report.adsb.position_last_seen < info_timeout:
        adsb_cnt += 1

print(report)

print("Out of {} targets, {} had an ADS-B position and {} were also tracked by MLAT.".format(
    len(aircraft), adsb_cnt, mlat_cnt))

Asynchronous (Non-blocking)

For asynchronous (non-blocking) interactions with gRPC APIs, the gRPC folks created the gRPC AsyncIO API:

It can be used in conjunction with Python’s asyncio library. Documentation and example for that are available in the official asyncio documentation:

#!/usr/bin/python

"""
This example shows how to use the public SecureTrack gRPC API in an asynchronous
fashion using asyncio.
"""

# Python's asynchronous I/O library
import asyncio
# API-specific imports
import grpc  # basic gRPC functions
# some basic imports
import time

import SeRoAPI_pb2 as pb  # SecureTrack API SerDes
import SeRoAPI_pb2_grpc as st  # SecureTrack API stubs

# some configuration
token = 'INSERT YOUR TOKEN HERE'
address = 'api.secureadsb.com:4201'
ca_cert = 'ca.crt' # point this to the ca.crt file


# subscribes to target reports for 10 seconds
async def stream_handler():
    global token, address
    # Note: channel will be closed automatically when we leave the with statement
    cert = open(ca_cert, 'rb').read()
    credentials = grpc.ssl_channel_credentials(cert)
    async with grpc.aio.secure_channel(address, credentials) as channel:
        print("Starting to retrieve data for 10 seconds.")
        stub = st.SeRoAPIStub(channel)
        it = stub.GetTargetReports(pb.TargetReportsRequest(token=token))
        tgts = dict()
        start = time.time()
        try:
            async for tgt in it.__aiter__():
                if time.time() - start > 10:
                    break

                tgts[tgt.target.address] = tgt
        except grpc.RpcError as e:
            print(e)

        it.cancel()

        print('I am also done retrieving data.')
        print(f'We have seen {len(tgts)} different aircraft.')


# sleeps for 5 seconds
async def chiller():
    print("In the meantime, I am going to chill for 5 seconds.")
    await asyncio.sleep(5)
    print("Feeling pretty relaxed now. Returning to main!")


# run the two above methods in parallel
async def main():
    await asyncio.gather(stream_handler(), chiller())


print("Calling async main method.")
asyncio.run(main())
print("Exiting, bye bye.")