Various changes to use threads and make code thread safe

This commit is contained in:
Simon 2020-10-02 20:47:04 +01:00
parent 0392790dd4
commit 935bcba271
2 changed files with 46 additions and 15 deletions

View File

@ -42,6 +42,11 @@ import re
from twisted.internet.protocol import Factory, Protocol from twisted.internet.protocol import Factory, Protocol
from twisted.protocols.basic import NetstringReceiver from twisted.protocols.basic import NetstringReceiver
from twisted.internet import reactor, task from twisted.internet import reactor, task
#We're going to *try* and be thread safe
from twisted.python import threadable
threadable.init(1)
from threading import Semaphore
# Things we import from the main hblink module # Things we import from the main hblink module
from hblink import HBSYSTEM, OPENBRIDGE, systems, hblink_handler, reportFactory, REPORT_OPCODES, mk_aliases from hblink import HBSYSTEM, OPENBRIDGE, systems, hblink_handler, reportFactory, REPORT_OPCODES, mk_aliases
@ -264,9 +269,21 @@ def stream_trimmer_loop():
else: else:
logger.error('(%s) Attemped to remove OpenBridge Stream ID %s not in the Stream ID list: %s', system, int_id(stream_id), [id for id in systems[system].STATUS]) logger.error('(%s) Attemped to remove OpenBridge Stream ID %s not in the Stream ID list: %s', system, int_id(stream_id), [id for id in systems[system].STATUS])
def threadIdent():
logger.debug('(IDENT) starting ident thread')
reactor.callInThread(ident)
def threadedMysql():
logger.debug('(MYSQL) Starting MySQL thread')
if not mysql_sema.acquire(blocking = False):
logger.debug('(MYSQL) Previous thread is still running (can\'t acquire semaphore). Try next iteration')
return
reactor.callInThread(mysql_config_check)
mysql_sema.release()
def ident(): def ident():
for system in systems: for system in systems:
if CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE': if CONFIG['SYSTEMS'][system]['MODE'] != 'MASTER':
continue continue
if CONFIG['SYSTEMS'][system]['VOICE_IDENT'] == True: if CONFIG['SYSTEMS'][system]['VOICE_IDENT'] == True:
#We only care about slot 2 - idents go out on slot 2 #We only care about slot 2 - idents go out on slot 2
@ -281,6 +298,8 @@ def ident():
for character in _systemcs: for character in _systemcs:
_say.append(words[character]) _say.append(words[character])
_say.append(words['silence']) _say.append(words['silence'])
#test
#_say.append(AMBEobj.readSingleFile('44xx.ambe'))
speech = pkt_gen(bytes_3(16777215), bytes_3(16777215), bytes_4(16777215), 1, _say) speech = pkt_gen(bytes_3(16777215), bytes_3(16777215), bytes_4(16777215), 1, _say)
sleep(1) sleep(1)
@ -291,7 +310,8 @@ def ident():
break break
#Packet every 60ms #Packet every 60ms
sleep(0.058) sleep(0.058)
systems[system].send_system(pkt) #Twisted is not thread safe. We need to call this in the reactor main thread
reactor.callFromThread(systems[system].send_system,pkt)
def mysql_config_check(): def mysql_config_check():
logger.debug('(MYSQL) Periodic config check') logger.debug('(MYSQL) Periodic config check')
@ -766,16 +786,23 @@ class routerHBP(HBSYSTEM):
speech = pkt_gen(bytes_3(9), bytes_3(9), bytes_4(9), 1, _say) speech = pkt_gen(bytes_3(9), bytes_3(9), bytes_4(9), 1, _say)
sleep(1) #Nested function - see below
while True: def sendSpeech(self,speech):
try: sleep(1)
pkt = next(speech) while True:
except StopIteration: try:
break pkt = next(speech)
#Packet every 60ms except StopIteration:
sleep(0.058) break
self.send_system(pkt) #Packet every 60ms
#print(len(pkt), pkt[4], pkt) sleep(0.058)
#Call the actual packet send in the reactor thread
#as it's not thread safe
reactor.callFromThread(self.send_system,pkt)
#print(len(pkt), pkt[4], pkt)
#call speech in a thread as it contains sleep() and hence could block the reactor
reactor.callInThread(sendSpeech,self,speech)
# Mark status variables for use later # Mark status variables for use later
self.STATUS[_slot]['RX_PEER'] = _peer_id self.STATUS[_slot]['RX_PEER'] = _peer_id
@ -1210,13 +1237,16 @@ if __name__ == '__main__':
stream_trimmer.addErrback(loopingErrHandle) stream_trimmer.addErrback(loopingErrHandle)
# Ident # Ident
ident_task = task.LoopingCall(ident) #This runs in a thread so as not to block the reactor
ident_task = task.LoopingCall(threadIdent)
ident = ident_task.start(900) ident = ident_task.start(900)
ident.addErrback(loopingErrHandle) ident.addErrback(loopingErrHandle)
#Mysql config checker #Mysql config checker
#This runs in a thread so as not to block the reactor
if CONFIG['MYSQL']['USE_MYSQL'] == True: if CONFIG['MYSQL']['USE_MYSQL'] == True:
mysql_task = task.LoopingCall(mysql_config_check) mysql_sema = Semaphore(value=1)
mysql_task = task.LoopingCall(threadedMysql)
mysql = mysql_task.start(60) mysql = mysql_task.start(60)
mysql.addErrback(loopingErrHandle) mysql.addErrback(loopingErrHandle)

View File

@ -63,6 +63,7 @@ class readAMBE:
]) ])
return _wordBADict return _wordBADict
#Read a single ambe file from the audio directory
def readSingleFile(self,filename): def readSingleFile(self,filename):
ambeBytearray = {} ambeBytearray = {}
_wordBitarray = bitarray(endian='big') _wordBitarray = bitarray(endian='big')