1
0
mirror of https://github.com/f4exb/sdrangel.git synced 2025-11-22 08:03:27 -05:00

Scripts API: SuperScanner: select shortest distance between hotspot and used channel

This commit is contained in:
f4exb 2020-05-27 16:35:53 +02:00
parent 71862d5b3d
commit 981c7129a3
4 changed files with 25 additions and 6 deletions

View File

@ -252,3 +252,9 @@ This file drives how channels in the connected SDRangel instance are managed.
<h2>sdrangel.py</h2>
Holds constants related to SDRangel software required by other scripts
<h2>Unit tests</h2>
Run as `python <file>` in the virtual environment
- `test_superscanner.py` is testing `superscanner.py`

View File

@ -2,4 +2,5 @@ requests
Flask
numpy
websocket
websocket-client
websocket-client
mock

View File

@ -293,12 +293,18 @@ def process_hotspots(scanned_hotspots):
# calculate hotspot distances for each used channel and reuse the channel for the closest hotspot
channels = CONFIG['channel_info']
used_channels = [channel for channel in channels if channel['usage'] == 1]
consolidated_distances = []
for channel in used_channels: # loop on used channels
distances = [[abs(channel['frequency'] - get_hotspot_frequency(channel, hotspot)), hotspot] for hotspot in hotspots]
distances = sorted(distances, key=operator.itemgetter(0))
print(f'channel {channel["index"]} distances: {distances}')
if distances:
hotspot = distances[0][1]
consolidated_distances.append([distances[0][0], channel, distances[0][1]]) # [distance, channel, hotspot]
consolidated_distances = sorted(consolidated_distances, key=operator.itemgetter(0)) # get (channel, hotspot) pair with shortest distance first
# reallocate used channels on their closest hotspot
for distance in consolidated_distances:
channel = distance[1]
hotspot = distance[2]
if hotspot in hotspots: # hotspot is not processed yet
channel['usage'] = 2 # mark channel used on this pass
channel['frequency'] = get_hotspot_frequency(channel, hotspot)
set_channel_frequency(channel)

View File

@ -2,9 +2,11 @@ import unittest
import mock
import superscanner
# ======================================================================
def print_hex(bytestring):
print('\\x' + '\\x'.join('{:02x}'.format(x) for x in bytestring))
# ======================================================================
def get_deviceset_info(deviceset_index):
return {
"channelcount": 4,
@ -56,12 +58,15 @@ def get_deviceset_info(deviceset_index):
}
}
# ======================================================================
def set_channel_frequency(channel):
pass
# ======================================================================
def set_channel_mute(channel):
pass
# ======================================================================
class TestStringMethods(unittest.TestCase):
def test_upper(self):
@ -78,12 +83,14 @@ class TestStringMethods(unittest.TestCase):
with self.assertRaises(TypeError):
s.split(2)
# ======================================================================
class TestSuperScannerOptions(unittest.TestCase):
def test_options_minimal(self):
options = superscanner.get_input_options(["-ctoto"])
self.assertEqual(options.config_file, 'toto')
# ======================================================================
class TestSuperScannerDecode(unittest.TestCase):
def test_decode_bytes(self):
@ -98,6 +105,7 @@ class TestSuperScannerDecode(unittest.TestCase):
msg_struct = superscanner.decode_message(msg_bytes)
self.assertEqual(msg_struct['fft_size'], 1024)
# ======================================================================
class TestSuperScannerProcessHotspots(unittest.TestCase):
@mock.patch('superscanner.get_deviceset_info', side_effect=get_deviceset_info)
@ -159,7 +167,6 @@ class TestSuperScannerProcessHotspots(unittest.TestCase):
]
superscanner.process_hotspots(hotspots1)
channel_info = superscanner.CONFIG['channel_info']
print(channel_info)
self.assertEqual(channel_info[0]['usage'], 1)
self.assertEqual(channel_info[1]['usage'], 0)
self.assertEqual(channel_info[2]['usage'], 0)
@ -178,7 +185,6 @@ class TestSuperScannerProcessHotspots(unittest.TestCase):
]
superscanner.process_hotspots(hotspots2)
channel_info = superscanner.CONFIG['channel_info']
print(channel_info)
self.assertEqual(channel_info[0]['usage'], 1)
self.assertEqual(channel_info[1]['usage'], 1)
self.assertEqual(channel_info[2]['usage'], 0)
@ -193,13 +199,13 @@ class TestSuperScannerProcessHotspots(unittest.TestCase):
]
superscanner.process_hotspots(hotspots3)
channel_info = superscanner.CONFIG['channel_info']
print(channel_info)
self.assertEqual(channel_info[0]['usage'], 0)
self.assertEqual(channel_info[1]['usage'], 1)
self.assertEqual(channel_info[2]['usage'], 0)
self.assertEqual(channel_info[1]['frequency'], 145200000)
# ======================================================================
if __name__ == '__main__':
unittest.main()