Add some locking around peers

This commit is contained in:
Simon 2021-01-04 23:09:48 +00:00
parent 349f89f8fe
commit e4f1a932c0
2 changed files with 25 additions and 3 deletions

View File

@ -897,11 +897,14 @@ def mysql_config_check():
logger.debug('(MYSQL) TG2 ACL changed')
#Preserve peers list
if system in CONFIG['SYSTEMS']:
if system in CONFIG['SYSTEMS'] and CONFIG['SYSTEMS'][system]['ENABLED']:
systems[system]._peer_sema.acquire(blocking=True)
SQLCONFIG[system]['PEERS'] = CONFIG['SYSTEMS'][system]['PEERS']
systems[system]._peer_sema.release()
CONFIG['SYSTEMS'][system].update(SQLCONFIG[system])
#Add MySQL config data to config dict
CONFIG['SYSTEMS'].update(SQLCONFIG)
#CONFIG['SYSTEMS'].update(SQLCONFIG)
SQLCONFIG = {}
sql.close()

View File

@ -35,6 +35,7 @@ from hashlib import sha256, sha1
from hmac import new as hmac_new, compare_digest
from time import time
from collections import deque
from threading import Semaphore
# Twisted is pretty important, so I keep it separate
from twisted.internet.protocol import DatagramProtocol, Factory, Protocol
@ -207,6 +208,8 @@ class HBSYSTEM(DatagramProtocol):
self._report = _report
self._config = self._CONFIG['SYSTEMS'][self._system]
self._laststrid = {1: b'', 2: b''}
self._peer_sema = Semaphore(value=1)
# Define shortcuts and generic function names based on the type of system we are
if self._config['MODE'] == 'MASTER':
@ -244,10 +247,12 @@ class HBSYSTEM(DatagramProtocol):
# Check to see if any of the peers have been quiet (no ping) longer than allowed
if _this_peer['LAST_PING']+(self._CONFIG['GLOBAL']['PING_TIME']*self._CONFIG['GLOBAL']['MAX_MISSED']) < time():
remove_list.append(peer)
self._peer_sema.acquire(blocking=True)
for peer in remove_list:
logger.info('(%s) Peer %s (%s) has timed out and is being removed', self._system, self._peers[peer]['CALLSIGN'], self._peers[peer]['RADIO_ID'])
# Remove any timed out peers from the configuration
del self._CONFIG['SYSTEMS'][self._system]['PEERS'][peer]
self._peer_sema.release()
if not self._peers and self._CONFIG['OPTIONS']:
logger.info('(%s) Deleting HBP Options',self._system)
del self._CONFIG['OPTIONS']
@ -410,6 +415,7 @@ class HBSYSTEM(DatagramProtocol):
# Check for valid Radio ID
if acl_check(_peer_id, self._CONFIG['GLOBAL']['REG_ACL']) and acl_check(_peer_id, self._config['REG_ACL']):
# Build the configuration data strcuture for the peer
self._peer_sema.acquire(blocking=True)
self._peers.update({_peer_id: {
'CONNECTION': 'RPTL-RECEIVED',
'CONNECTED': time(),
@ -435,6 +441,7 @@ class HBSYSTEM(DatagramProtocol):
'SOFTWARE_ID': '',
'PACKAGE_ID': '',
}})
self._peer_sema.release()
logger.info('(%s) Repeater Logging in with Radio ID: %s, %s:%s', self._system, int_id(_peer_id), _sockaddr[0], _sockaddr[1])
_salt_str = bytes_4(self._peers[_peer_id]['SALT'])
self.send_peer(_peer_id, b''.join([RPTACK, _salt_str]))
@ -452,8 +459,10 @@ class HBSYSTEM(DatagramProtocol):
if _peer_id in self._peers \
and self._peers[_peer_id]['CONNECTION'] == 'CHALLENGE_SENT' \
and self._peers[_peer_id]['SOCKADDR'] == _sockaddr:
self._peer_sema.acquire(blocking=True)
_this_peer = self._peers[_peer_id]
_this_peer['LAST_PING'] = time()
self._peer_sema.release()
_sent_hash = _data[8:]
_salt_str = bytes_4(_this_peer['SALT'])
_calc_hash = bhex(sha256(_salt_str+self._config['PASSPHRASE']).hexdigest())
@ -464,7 +473,9 @@ class HBSYSTEM(DatagramProtocol):
else:
logger.info('(%s) Peer %s has FAILED the login exchange successfully', self._system, _this_peer['RADIO_ID'])
self.transport.write(b''.join([MSTNAK, _peer_id]), _sockaddr)
self._peer_sema.acquire(blocking=True)
del self._peers[_peer_id]
self._peer_sema.release()
else:
self.transport.write(b''.join([MSTNAK, _peer_id]), _sockaddr)
logger.warning('(%s) Login challenge from Radio ID that has not logged in: %s', self._system, int_id(_peer_id))
@ -477,13 +488,16 @@ class HBSYSTEM(DatagramProtocol):
and self._peers[_peer_id]['SOCKADDR'] == _sockaddr:
logger.info('(%s) Peer is closing down: %s (%s)', self._system, self._peers[_peer_id]['CALLSIGN'], int_id(_peer_id))
self.transport.write(b''.join([MSTNAK, _peer_id]), _sockaddr)
self._peer_sema.acquire(blocking=True)
del self._peers[_peer_id]
self._peer_sema.release()
else:
_peer_id = _data[4:8] # Configure Command
if _peer_id in self._peers \
and self._peers[_peer_id]['CONNECTION'] == 'WAITING_CONFIG' \
and self._peers[_peer_id]['SOCKADDR'] == _sockaddr:
self._peer_sema.acquire(blocking=True)
_this_peer = self._peers[_peer_id]
_this_peer['CONNECTION'] = 'YES'
_this_peer['CONNECTED'] = time()
@ -502,6 +516,7 @@ class HBSYSTEM(DatagramProtocol):
_this_peer['URL'] = _data[98:222]
_this_peer['SOFTWARE_ID'] = _data[222:262]
_this_peer['PACKAGE_ID'] = _data[262:302]
self._peer_sema.release()
self.send_peer(_peer_id, b''.join([RPTACK, _peer_id]))
logger.info('(%s) Peer %s (%s) has sent repeater configuration', self._system, _this_peer['CALLSIGN'], _this_peer['RADIO_ID'])
@ -513,7 +528,9 @@ class HBSYSTEM(DatagramProtocol):
_peer_id = _data[4:8]
if _peer_id in self._peers and self._peers[_peer_id]['SOCKADDR'] == _sockaddr:
_this_peer = self._peers[_peer_id]
self._peer_sema.aquire(blocking-True)
_this_peer['OPTIONS'] = _data[8:]
self._peer_sema.release()
self.send_peer(_peer_id, b''.join([RPTACK, _peer_id]))
logger.info('(%s) Peer %s has sent options %s', self._system, _this_peer['CALLSIGN'], _this_peer['OPTIONS'])
self._CONFIG['SYSTEMS'][self._system]['OPTIONS'] = _this_peer['OPTIONS'].decode()
@ -523,8 +540,10 @@ class HBSYSTEM(DatagramProtocol):
if _peer_id in self._peers \
and self._peers[_peer_id]['CONNECTION'] == "YES" \
and self._peers[_peer_id]['SOCKADDR'] == _sockaddr:
self._peer_sema.acquire(blocking=True)
self._peers[_peer_id]['PINGS_RECEIVED'] += 1
self._peers[_peer_id]['LAST_PING'] = time()
self._peer_sema.release()
self.send_peer(_peer_id, b''.join([MSTPONG, _peer_id]))
#logger.debug('(%s) Received and answered RPTPING from peer %s (%s)', self._system, self._peers[_peer_id]['CALLSIGN'], int_id(_peer_id))
else: