2020-11-12 13:46:18 -05:00
#!/usr/bin/env python3
"""
Connects to spectrum server to monitor PSD and detect local increase to pilot channel ( s )
"""
import requests , traceback , sys , json , time
import struct , operator
import math
import numpy as np
import websocket
try :
import thread
except ImportError :
import _thread as thread
import time
2024-07-21 08:20:48 -04:00
from datetime import datetime , timezone
2020-11-12 13:46:18 -05:00
from optparse import OptionParser
import sdrangel
OPTIONS = None
API_URI = None
WS_URI = None
PASS_INDEX = 0
PSD_FLOOR = [ ]
CONFIG = { }
UNSERVED_FREQUENCIES = [ ]
# ======================================================================
class SuperScannerError ( Exception ) :
def __init__ ( self , message ) :
self . message = message
# ======================================================================
class SuperScannerWebsocketError ( SuperScannerError ) :
pass
# ======================================================================
class SuperScannerWebsocketClosed ( SuperScannerError ) :
pass
# ======================================================================
class SuperScannerOptionsError ( SuperScannerError ) :
pass
# ======================================================================
class SuperScannerAPIError ( SuperScannerError ) :
pass
# ======================================================================
def log_with_timestamp ( message ) :
2024-07-21 08:20:48 -04:00
t = datetime . now ( timezone . utc )
2020-11-12 13:46:18 -05:00
print ( f ' { t . isoformat ( ) } { message } ' )
# ======================================================================
def get_input_options ( args = None ) :
if args is None :
args = sys . argv [ 1 : ]
parser = OptionParser ( usage = " usage: %% prog [-t] \n " )
parser . add_option ( " -a " , " --address " , dest = " address " , help = " SDRangel web base address. Default: 127.0.0.1 " , metavar = " ADDRESS " , type = " string " )
parser . add_option ( " -p " , " --api-port " , dest = " api_port " , help = " SDRangel API port. Default: 8091 " , metavar = " PORT " , type = " int " )
parser . add_option ( " -w " , " --ws-port " , dest = " ws_port " , help = " SDRangel websocket spectrum server port. Default: 8887 " , metavar = " PORT " , type = " int " )
parser . add_option ( " -c " , " --config-file " , dest = " config_file " , help = " JSON configuration file. Mandatory " , metavar = " FILE " , type = " string " )
parser . add_option ( " -j " , " --psd-in " , dest = " psd_input_file " , help = " JSON file containing PSD floor information. " , metavar = " FILE " , type = " string " )
parser . add_option ( " -J " , " --psd-out " , dest = " psd_output_file " , help = " Write PSD floor information to JSON file. " , metavar = " FILE " , type = " string " )
parser . add_option ( " -n " , " --nb-passes " , dest = " passes " , help = " Number of passes for PSD floor estimation. Default: 10 " , metavar = " NUM " , type = " int " )
parser . add_option ( " -m " , " --margin " , dest = " margin " , help = " Margin in dB above PSD floor to detect acivity. Default: 3 " , metavar = " DB " , type = " int " )
parser . add_option ( " -f " , " --psd-level " , dest = " psd_fixed " , help = " Use a fixed PSD floor value. " , metavar = " DB " , type = " float " )
parser . add_option ( " -X " , " --psd-exclude-higher " , dest = " psd_exclude_higher " , help = " Level above which to exclude bin scan. " , metavar = " DB " , type = " float " )
parser . add_option ( " -x " , " --psd-exclude-lower " , dest = " psd_exclude_lower " , help = " Level below which to exclude bin scan. " , metavar = " DB " , type = " float " )
parser . add_option ( " -N " , " --hotspots-noise " , dest = " hotspots_noise " , help = " Number of hotspots above which detection is considered as noise. Default 8 " , metavar = " NUM " , type = " int " )
parser . add_option ( " -G " , " --psd-graph " , dest = " psd_graph " , help = " Show PSD floor graphs. Requires matplotlib " , action = " store_true " )
parser . add_option ( " -g " , " --group-tolerance " , dest = " group_tolerance " , help = " Radius (1D) tolerance in points (bins) for hotspots grouping. Default 1. " , metavar = " NUM " , type = " int " )
parser . add_option ( " -r " , " --freq-round " , dest = " freq_round " , help = " Frequency rounding value in Hz. Default: 1 (no rounding) " , metavar = " NUM " , type = " int " )
parser . add_option ( " -o " , " --freq-offset " , dest = " freq_offset " , help = " Frequency rounding offset in Hz. Default: 0 (no offset) " , metavar = " NUM " , type = " int " )
( options , args ) = parser . parse_args ( args )
if ( options . config_file == None ) :
raise SuperScannerOptionsError ( ' A configuration file is required. Option -c or --config-file ' )
if ( options . address == None ) :
options . address = " 127.0.0.1 "
if ( options . api_port == None ) :
options . api_port = 8091
if ( options . ws_port == None ) :
options . ws_port = 8887
if ( options . passes == None ) :
options . passes = 10
elif options . passes < 1 :
options . passes = 1
if ( options . margin == None ) :
options . margin = 3
if ( options . hotspots_noise == None ) :
options . hotspots_noise = 8
if ( options . group_tolerance == None ) :
options . group_tolerance = 1
if ( options . freq_round == None ) :
options . freq_round = 1
if ( options . freq_offset == None ) :
options . freq_offset = 0
return options
# ======================================================================
def on_ws_message ( ws , message ) :
global PASS_INDEX
try :
struct_message = decode_message ( message )
if OPTIONS . psd_fixed is not None and OPTIONS . passes > 0 :
compute_fixed_floor ( struct_message )
OPTIONS . passes = 0 # done
elif OPTIONS . psd_input_file is not None and OPTIONS . passes > 0 :
global PSD_FLOOR
with open ( OPTIONS . psd_input_file ) as json_file :
PSD_FLOOR = json . load ( json_file )
OPTIONS . passes = 0 # done
elif OPTIONS . passes > 0 :
compute_floor ( struct_message )
OPTIONS . passes - = 1
PASS_INDEX + = 1
print ( f ' PSD floor pass no { PASS_INDEX } ' )
elif OPTIONS . passes == 0 :
OPTIONS . passes - = 1
if OPTIONS . psd_output_file :
with open ( OPTIONS . psd_output_file , ' w ' ) as outfile :
json . dump ( PSD_FLOOR , outfile )
if OPTIONS . psd_graph :
show_floor ( )
else :
scan ( struct_message )
except Exception as ex :
tb = traceback . format_exc ( )
print ( tb , file = sys . stderr )
# ======================================================================
def on_ws_error ( ws , error ) :
raise SuperScannerWebsocketError ( f ' { error } ' )
# ======================================================================
def on_ws_close ( ws ) :
raise SuperScannerWebsocketClosed ( ' websocket closed ' )
# ======================================================================
def on_ws_open ( ws ) :
log_with_timestamp ( ' Web socket opened starting... ' )
def run ( * args ) :
pass
thread . start_new_thread ( run , ( ) )
# ======================================================================
def decode_message ( byte_message ) :
struct_message = { }
struct_message [ ' cf ' ] = int . from_bytes ( byte_message [ 0 : 8 ] , byteorder = ' little ' , signed = False )
struct_message [ ' elasped ' ] = int . from_bytes ( byte_message [ 8 : 16 ] , byteorder = ' little ' , signed = False )
struct_message [ ' ts ' ] = int . from_bytes ( byte_message [ 16 : 24 ] , byteorder = ' little ' , signed = False )
struct_message [ ' fft_size ' ] = int . from_bytes ( byte_message [ 24 : 28 ] , byteorder = ' little ' , signed = False )
struct_message [ ' fft_bw ' ] = int . from_bytes ( byte_message [ 28 : 32 ] , byteorder = ' little ' , signed = False )
indicators = int . from_bytes ( byte_message [ 32 : 36 ] , byteorder = ' little ' , signed = False )
struct_message [ ' linear ' ] = ( indicators & 1 ) == 1
struct_message [ ' ssb ' ] = ( ( indicators & 2 ) >> 1 ) == 1
struct_message [ ' usb ' ] = ( ( indicators & 4 ) >> 2 ) == 1
struct_message [ ' samples ' ] = [ ]
for sample_index in range ( struct_message [ ' fft_size ' ] ) :
psd = struct . unpack ( ' f ' , byte_message [ 36 + 4 * sample_index : 40 + 4 * sample_index ] ) [ 0 ]
struct_message [ ' samples ' ] . append ( psd )
return struct_message
# ======================================================================
def compute_fixed_floor ( struct_message ) :
global PSD_FLOOR
nb_samples = len ( struct_message [ ' samples ' ] )
PSD_FLOOR = [ ( OPTIONS . psd_fixed , False ) ] * nb_samples
# ======================================================================
def compute_floor ( struct_message ) :
global PSD_FLOOR
fft_size = struct_message [ ' fft_size ' ]
psd_samples = struct_message [ ' samples ' ]
for psd_index , psd in enumerate ( psd_samples ) :
exclude = False
if OPTIONS . psd_exclude_higher :
exclude = psd > OPTIONS . psd_exclude_higher
if OPTIONS . psd_exclude_lower :
exclude = psd < OPTIONS . psd_exclude_lower
if psd_index < len ( PSD_FLOOR ) :
PSD_FLOOR [ psd_index ] [ 1 ] = exclude or PSD_FLOOR [ psd_index ] [ 1 ]
if psd > PSD_FLOOR [ psd_index ] [ 0 ] :
PSD_FLOOR [ psd_index ] [ 0 ] = psd
else :
PSD_FLOOR . append ( [ ] )
PSD_FLOOR [ psd_index ] . append ( psd )
PSD_FLOOR [ psd_index ] . append ( exclude )
# ======================================================================
def show_floor ( ) :
import matplotlib
import matplotlib . pyplot as plt
print ( ' show_floor ' )
plt . figure ( 1 )
plt . subplot ( 211 )
plt . plot ( [ x [ 1 ] for x in PSD_FLOOR ] )
plt . ylabel ( ' PSD exclusion ' )
plt . subplot ( 212 )
plt . plot ( [ x [ 0 ] for x in PSD_FLOOR ] )
plt . ylabel ( ' PSD floor ' )
plt . show ( )
# ======================================================================
def freq_rounding ( freq , round_freq , round_offset ) :
shifted_freq = freq - round_offset
return round ( shifted_freq / round_freq ) * round_freq + round_offset
# ======================================================================
def scan ( struct_message ) :
ts = struct_message [ ' ts ' ]
freq_density = struct_message [ ' fft_bw ' ] / struct_message [ ' fft_size ' ]
hotspots = [ ]
hotspot = { }
last_hotspot_index = 0
if struct_message [ ' ssb ' ] :
freq_start = struct_message [ ' cf ' ]
freq_stop = struct_message [ ' cf ' ] + struct_message [ ' fft_bw ' ]
else :
freq_start = struct_message [ ' cf ' ] - ( struct_message [ ' fft_bw ' ] / 2 )
freq_stop = struct_message [ ' cf ' ] + ( struct_message [ ' fft_bw ' ] / 2 )
psd_samples = struct_message [ ' samples ' ]
psd_sum = 0
psd_count = 1
for psd_index , psd in enumerate ( psd_samples ) :
freq = freq_start + psd_index * freq_density
if PSD_FLOOR [ psd_index ] [ 1 ] : # exclusion zone
continue
if psd > PSD_FLOOR [ psd_index ] [ 0 ] + OPTIONS . margin : # detection
psd_sum + = 10 * * ( psd / 10 )
psd_count + = 1
if psd_index > last_hotspot_index + OPTIONS . group_tolerance : # new hotspot
if hotspot . get ( " begin " ) : # finalize previous hotspot
hotspot [ " end " ] = hotspot_end
hotspot [ " power " ] = psd_sum / psd_count
hotspots . append ( hotspot )
hotspot = { " begin " : freq }
psd_sum = 10 * * ( psd / 10 )
psd_count = 1
hotspot_end = freq
last_hotspot_index = psd_index
if hotspot . get ( " begin " ) : # finalize last hotspot
hotspot [ " end " ] = hotspot_end
hotspot [ " power " ] = psd_sum / psd_count
hotspots . append ( hotspot )
process_hotspots ( hotspots )
# ======================================================================
def allocate_channel ( ) :
channels = CONFIG [ ' channel_info ' ]
for channel in channels :
if channel [ ' usage ' ] == 0 :
return channel
return None
# ======================================================================
def freq_in_ranges_check ( freq ) :
freqrange_inclusions = CONFIG . get ( ' freqrange_inclusions ' , [ ] )
freqrange_exclusions = CONFIG . get ( ' freqrange_exclusions ' , [ ] )
for freqrange in freqrange_exclusions :
if freqrange [ 0 ] < = freq < = freqrange [ 1 ] :
return False
for freqrange in freqrange_inclusions :
if freqrange [ 0 ] < = freq < = freqrange [ 1 ] :
return True
return False
# ======================================================================
def get_hotspot_frequency ( channel , hotspot ) :
fc_pos = channel . get ( ' fc_pos ' , ' center ' )
if fc_pos == ' lsb ' :
channel_frequency = freq_rounding ( hotspot [ ' end ' ] , OPTIONS . freq_round , OPTIONS . freq_offset )
elif fc_pos == ' usb ' :
channel_frequency = freq_rounding ( hotspot [ ' begin ' ] , OPTIONS . freq_round , OPTIONS . freq_offset )
else :
channel_frequency = freq_rounding ( hotspot [ ' fc ' ] , OPTIONS . freq_round , OPTIONS . freq_offset )
fc_shift = channel . get ( ' fc_shift ' , 0 )
return channel_frequency + fc_shift
# ======================================================================
def process_hotspots ( scanned_hotspots ) :
global CONFIG
global UNSERVED_FREQUENCIES
if len ( scanned_hotspots ) > OPTIONS . hotspots_noise :
return
# calculate frequency for each hotspot and create list of valid hotspots
hotspots = [ ]
for hotspot in scanned_hotspots :
width = hotspot [ ' end ' ] - hotspot [ ' begin ' ]
fc = hotspot [ ' begin ' ] + width / 2
if not freq_in_ranges_check ( fc ) :
continue
hotspot [ ' fc ' ] = fc
hotspot [ ' begin ' ] = fc - ( width / 2 ) # re-center around fc
hotspot [ ' end ' ] = fc + ( width / 2 )
hotspots . append ( hotspot )
# calculate hotspot distances for each used channel and reuse the channel for the closest hotspot
channels = CONFIG [ ' channel_info ' ]
used_channels = [ channel for channel in channels if channel [ ' usage ' ] == 1 ]
consolidated_distances = [ ]
for channel in used_channels : # loop on used channels
distances = [ [ abs ( channel [ ' frequency ' ] - get_hotspot_frequency ( channel , hotspot ) ) , hotspot ] for hotspot in hotspots ]
distances = sorted ( distances , key = operator . itemgetter ( 0 ) )
if distances :
consolidated_distances . append ( [ distances [ 0 ] [ 0 ] , channel , distances [ 0 ] [ 1 ] ] ) # [distance, channel, hotspot]
consolidated_distances = sorted ( consolidated_distances , key = operator . itemgetter ( 0 ) ) # get (channel, hotspot) pair with shortest distance first
# reallocate used channels on their closest hotspot
for distance in consolidated_distances :
channel = distance [ 1 ]
hotspot = distance [ 2 ]
if hotspot in hotspots : # hotspot is not processed yet
channel_frequency = get_hotspot_frequency ( channel , hotspot )
channel [ ' usage ' ] = 2 # mark channel used on this pass
if channel [ ' frequency ' ] != channel_frequency : # optimization: do not move to same frequency
channel [ ' frequency ' ] = channel_frequency
channel_index = channel [ ' index ' ]
set_channel_frequency ( channel )
log_with_timestamp ( f ' Moved channel { channel_index } to frequency { channel_frequency } Hz ' )
hotspots . remove ( hotspot ) # done with this hotspot
# for remaining hotspots we need to allocate new channels
for hotspot in hotspots :
channel = allocate_channel ( )
if channel :
channel_index = channel [ ' index ' ]
channel_frequency = get_hotspot_frequency ( channel , hotspot )
channel [ ' usage ' ] = 2 # mark channel used on this pass
channel [ ' frequency ' ] = channel_frequency
set_channel_frequency ( channel )
log_with_timestamp ( f ' Allocated channel { channel_index } on frequency { channel_frequency } Hz ' )
else :
fc = hotspot [ ' fc ' ]
if fc not in UNSERVED_FREQUENCIES :
UNSERVED_FREQUENCIES . append ( fc )
log_with_timestamp ( f ' All channels allocated. Cannot process signal at { fc } Hz ' )
# cleanup
for channel in CONFIG [ ' channel_info ' ] :
if channel [ ' usage ' ] == 1 : # channel unused on this pass
channel [ ' usage ' ] = 0 # release it
channel_index = channel [ ' index ' ]
fc = channel [ ' frequency ' ]
set_channel_mute ( channel )
UNSERVED_FREQUENCIES . clear ( ) # at least one channel is able to serve next time
log_with_timestamp ( f ' Released channel { channel_index } on frequency { fc } Hz ' )
elif channel [ ' usage ' ] == 2 : # channel used on this pass
channel [ ' usage ' ] = 1 # reset usage for next pass
# ======================================================================
def set_channel_frequency ( channel ) :
deviceset_index = CONFIG [ ' deviceset_index ' ]
channel_index = channel [ ' index ' ]
channel_id = channel [ ' id ' ]
df = channel [ ' frequency ' ] - CONFIG [ ' device_frequency ' ]
url = f ' { API_URI } /sdrangel/deviceset/ { deviceset_index } /channel/ { channel_index } /settings '
payload = {
sdrangel . CHANNEL_TYPES [ channel_id ] [ ' settings ' ] : {
sdrangel . CHANNEL_TYPES [ channel_id ] [ ' df_key ' ] : df ,
sdrangel . CHANNEL_TYPES [ channel_id ] [ ' mute_key ' ] : 0
} ,
' channelType ' : channel_id ,
' direction ' : 0
}
r = requests . patch ( url = url , json = payload )
if r . status_code / / 100 != 2 :
raise SuperScannerAPIError ( f ' Set channel { channel_index } frequency failed ' )
# ======================================================================
def set_channel_mute ( channel ) :
deviceset_index = CONFIG [ ' deviceset_index ' ]
channel_index = channel [ ' index ' ]
channel_id = channel [ ' id ' ]
url = f ' { API_URI } /sdrangel/deviceset/ { deviceset_index } /channel/ { channel_index } /settings '
payload = {
sdrangel . CHANNEL_TYPES [ channel_id ] [ ' settings ' ] : {
sdrangel . CHANNEL_TYPES [ channel_id ] [ ' mute_key ' ] : 1
} ,
' channelType ' : channel_id ,
' direction ' : 0
}
r = requests . patch ( url = url , json = payload )
if r . status_code / / 100 != 2 :
raise SuperScannerAPIError ( f ' Set channel { channel_index } mute failed ' )
# ======================================================================
def get_deviceset_info ( deviceset_index ) :
url = f ' { API_URI } /sdrangel/deviceset/ { deviceset_index } '
r = requests . get ( url = url )
if r . status_code / / 100 != 2 :
raise SuperScannerAPIError ( f ' Get deviceset { deviceset_index } info failed ' )
return r . json ( )
# ======================================================================
def make_config ( ) :
global CONFIG
deviceset_index = CONFIG [ ' deviceset_index ' ]
deviceset_info = get_deviceset_info ( deviceset_index )
device_frequency = deviceset_info [ " samplingDevice " ] [ " centerFrequency " ]
CONFIG [ ' device_frequency ' ] = device_frequency
for channel_info in CONFIG [ ' channel_info ' ] :
channel_index = channel_info [ ' index ' ]
if channel_index < deviceset_info [ ' channelcount ' ] :
channel_offset = deviceset_info [ ' channels ' ] [ channel_index ] [ ' deltaFrequency ' ]
channel_id = deviceset_info [ ' channels ' ] [ channel_index ] [ ' id ' ]
channel_info [ ' id ' ] = channel_id
channel_info [ ' usage ' ] = 0 # 0: unused 1: used 2: reused in current allocation step (temporary state)
channel_info [ ' frequency ' ] = device_frequency + channel_offset
else :
raise SuperScannerAPIError ( f ' There is no channel with index { channel_index } in deviceset { deviceset_index } ' )
# ======================================================================
def main ( ) :
try :
global OPTIONS
global CONFIG
global API_URI
global WS_URI
OPTIONS = get_input_options ( )
log_with_timestamp ( f ' Start with options: { OPTIONS } ' )
with open ( OPTIONS . config_file ) as json_file : # get base config
CONFIG = json . load ( json_file )
log_with_timestamp ( f ' Initial configuration: { CONFIG } ' )
API_URI = f ' http:// { OPTIONS . address } : { OPTIONS . api_port } '
WS_URI = f ' ws:// { OPTIONS . address } : { OPTIONS . ws_port } '
make_config ( ) # complete config with device set information from SDRangel
ws = websocket . WebSocketApp ( WS_URI ,
on_message = on_ws_message ,
on_error = on_ws_error ,
on_close = on_ws_close )
ws . on_open = on_ws_open
ws . run_forever ( )
except SuperScannerWebsocketError as ex :
print ( ex . message )
except SuperScannerWebsocketClosed :
print ( " Spectrum websocket closed " )
except Exception as ex :
tb = traceback . format_exc ( )
print ( tb , file = sys . stderr )
# ======================================================================
if __name__ == " __main__ " :
main ( )