From bd6715566b5de80970072661d2f69634a7bf05de Mon Sep 17 00:00:00 2001 From: Simon Date: Thu, 3 Dec 2020 20:28:23 +0000 Subject: [PATCH] Fix multiple packets being sent on OBP connections --- bridge_master.py | 21 ++++++++++------ hotspot_proxy.py | 62 ++++++++++++++++++++++++++---------------------- 2 files changed, 48 insertions(+), 35 deletions(-) diff --git a/bridge_master.py b/bridge_master.py index b80f4bb..ff95663 100755 --- a/bridge_master.py +++ b/bridge_master.py @@ -315,7 +315,7 @@ def rule_timer_loop(): else: if _system['SYSTEM'][0:3] != 'OBP': _bridge_used = True - else if _system['SYSTEM'][0:3] == 'OBP' and _system['TO_TYPE'] == 'STAT': + elif _system['SYSTEM'][0:3] == 'OBP' and _system['TO_TYPE'] == 'STAT': _bridge_used = True logger.debug('(ROUTER) Conference Bridge NO ACTION: System: %s, Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID'])) @@ -780,12 +780,14 @@ class routerOBP(OPENBRIDGE): OPENBRIDGE.__init__(self, _name, _config, _report) self.STATUS = {} - def to_target(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system): + def to_target(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system,_noOBP): for _target in BRIDGES[_bridge]: if (_target['SYSTEM'] != self._system) and (_target['ACTIVE']): _target_status = systems[_target['SYSTEM']].STATUS _target_system = self._CONFIG['SYSTEMS'][_target['SYSTEM']] if _target_system['MODE'] == 'OPENBRIDGE': + if _noOBP == True: + continue # Is this a new call stream on the target? if (_stream_id not in _target_status): # This is a new call stream on the target @@ -961,7 +963,7 @@ class routerOBP(OPENBRIDGE): if (_system['SYSTEM'] == self._system and _system['TGID'] == _dst_id and _system['TS'] == _slot and _system['ACTIVE'] == True): - self.to_target(_peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system) + self.to_target(_peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system,False) #Send to reflector or TG too, if it exists if _bridge[0:1] == '#': @@ -969,7 +971,7 @@ class routerOBP(OPENBRIDGE): else: _bridge = '#'+_bridge if _bridge in BRIDGES: - self.to_target(_peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system) + self.to_target(_peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system,True) # Final actions - Is this a voice terminator? @@ -1046,7 +1048,7 @@ class routerHBP(HBSYSTEM): } } - def to_target(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system): + def to_target(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system,_noOBP): for _target in BRIDGES[_bridge]: if _target['SYSTEM'] != self._system or (_target['SYSTEM'] == self._system and _target['TS'] != _slot): if _target['ACTIVE']: @@ -1054,6 +1056,8 @@ class routerHBP(HBSYSTEM): _target_system = self._CONFIG['SYSTEMS'][_target['SYSTEM']] if _target_system['MODE'] == 'OPENBRIDGE': + if _noOBP == True: + continue # Is this a new call stream on the target? if (_stream_id not in _target_status): # This is a new call stream on the target @@ -1366,7 +1370,7 @@ class routerHBP(HBSYSTEM): if (_system['SYSTEM'] == self._system and _system['TGID'] == _dst_id and _system['TS'] == _slot and _system['ACTIVE'] == True): - self.to_target(_peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system) + self.to_target(_peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system,False) #Send to reflector or TG too, if it exists if _bridge[0:1] == '#': @@ -1374,7 +1378,7 @@ class routerHBP(HBSYSTEM): else: _bridge = '#'+_bridge if _bridge in BRIDGES: - self.to_target(_peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system) + self.to_target(_peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system,True) # Final actions - Is this a voice terminator? if (_frame_type == HBPF_DATA_SYNC) and (_dtype_vseq == HBPF_SLT_VTERM) and (self.STATUS[_slot]['RX_TYPE'] != HBPF_SLT_VTERM): @@ -1686,4 +1690,7 @@ if __name__ == '__main__': mysql = mysql_task.start(60) mysql.addErrback(loopingErrHandle) + #more threads + reactor.suggestThreadPoolSize(30) + reactor.run() diff --git a/hotspot_proxy.py b/hotspot_proxy.py index a7625c9..f2695d8 100644 --- a/hotspot_proxy.py +++ b/hotspot_proxy.py @@ -7,51 +7,56 @@ class Proxy(DatagramProtocol): def __init__(self,ListenPort,connTrack,Timeout,Debug): self.connTrack = connTrack + self.sourceTrack = {} self.timeout = Timeout self.debug = Debug def datagramReceived(self, data, addr): host,port = addr + nowtime = time() + Debug = self.debug #If the packet comes from the master if host == '127.0.0.1' and port in self.connTrack: - if int(self.connTrack[port]['time'])+self.timeout > time(): + if int(self.connTrack[port]['time'])+self.timeout >nowtime: self.transport.write(data,(self.connTrack[port]['host'],self.connTrack[port]['sport'])) - #if master refuses login, remove tracking and block for timeout seconds - if data == b'MSTNAK\x00#\xbf"': - self.connTrack[port]['time'] = False - self.connTrack[port]['nacktime'] = time()+self.timeout - if Debug: - print(data) + #if master refuses login, remove tracking and block fornowtimeout seconds + #if data == b'MSTNAK\x00#\xbf"': + #self.connTrack[port]['time'] = False + #self.connTrack[port]['nacktime'] =nowtime+self.timeout + if Debug: + print("return path match") + print(data) + elif host+str(port) in self.sourceTrack: + del self.sourceTrack[host+str(port)] return - for dport in self.connTrack: - #If blocked from refused login, ignore the packet if its been less than nacktime - if int(self.connTrack[dport]['nacktime']) + self.timeout > time(): - if Debug: - print("NACK\n") - return - #If we have a conntrack for this connect and the timeout has not expired, forward to tracked port - if self.connTrack[dport]['host'] == host and self.connTrack[dport]['sport'] == port and (int(self.connTrack[dport]['time'])+self.timeout > time()): - self.connTrack[dport]['time'] = time() - self.connTrack[dport]['host'] = host - self.connTrack[dport]['sport'] = port - self.transport.write(data, ('127.0.0.1',dport)) - self.connTrack[dport]['time'] = time() - if Debug: - print(data) - return + #If we have a sourcetrack for this connect and thenowtimeout has not expired, forward to tracked port + if host+str(port) in self.sourceTrack and (int(self.sourceTrack[host+str(port)]['time'])+self.timeout >nowtime): + self.transport.write(data, ('127.0.0.1',self.sourceTrack[host+str(port)]['dport'])) + self.connTrack[self.sourceTrack[host+str(port)]['dport']]['time'] =nowtime + self.sourceTrack[host+str(port)]['time'] =nowtime + if Debug: + print("Tracked inbound match") + print(data) + return + elif host+str(port) in self.sourceTrack: + del self.sourceTrack[host+str(port)] #Find free port to map for new connection for dport in self.connTrack: - if (self.connTrack[dport]['time'] == False or (int(self.connTrack[dport]['time'])+self.timeout < time())): + if (self.connTrack[dport]['time'] == False or (int(self.connTrack[dport]['time'])+self.timeout time(): + if int(CONNTRACK[port]['time'])+Timeout > nowtime: count = count+1 totalPorts = DestPortEnd - DestportStart