Fix multiple packets being sent on OBP connections
This commit is contained in:
parent
b251cee899
commit
bd6715566b
|
@ -315,7 +315,7 @@ def rule_timer_loop():
|
||||||
else:
|
else:
|
||||||
if _system['SYSTEM'][0:3] != 'OBP':
|
if _system['SYSTEM'][0:3] != 'OBP':
|
||||||
_bridge_used = True
|
_bridge_used = True
|
||||||
else if _system['SYSTEM'][0:3] == 'OBP' and _system['TO_TYPE'] == 'STAT':
|
elif _system['SYSTEM'][0:3] == 'OBP' and _system['TO_TYPE'] == 'STAT':
|
||||||
_bridge_used = True
|
_bridge_used = True
|
||||||
logger.debug('(ROUTER) Conference Bridge NO ACTION: System: %s, Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID']))
|
logger.debug('(ROUTER) Conference Bridge NO ACTION: System: %s, Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID']))
|
||||||
|
|
||||||
|
@ -780,12 +780,14 @@ class routerOBP(OPENBRIDGE):
|
||||||
OPENBRIDGE.__init__(self, _name, _config, _report)
|
OPENBRIDGE.__init__(self, _name, _config, _report)
|
||||||
self.STATUS = {}
|
self.STATUS = {}
|
||||||
|
|
||||||
def to_target(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system):
|
def to_target(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system,_noOBP):
|
||||||
for _target in BRIDGES[_bridge]:
|
for _target in BRIDGES[_bridge]:
|
||||||
if (_target['SYSTEM'] != self._system) and (_target['ACTIVE']):
|
if (_target['SYSTEM'] != self._system) and (_target['ACTIVE']):
|
||||||
_target_status = systems[_target['SYSTEM']].STATUS
|
_target_status = systems[_target['SYSTEM']].STATUS
|
||||||
_target_system = self._CONFIG['SYSTEMS'][_target['SYSTEM']]
|
_target_system = self._CONFIG['SYSTEMS'][_target['SYSTEM']]
|
||||||
if _target_system['MODE'] == 'OPENBRIDGE':
|
if _target_system['MODE'] == 'OPENBRIDGE':
|
||||||
|
if _noOBP == True:
|
||||||
|
continue
|
||||||
# Is this a new call stream on the target?
|
# Is this a new call stream on the target?
|
||||||
if (_stream_id not in _target_status):
|
if (_stream_id not in _target_status):
|
||||||
# This is a new call stream on the target
|
# This is a new call stream on the target
|
||||||
|
@ -961,7 +963,7 @@ class routerOBP(OPENBRIDGE):
|
||||||
|
|
||||||
if (_system['SYSTEM'] == self._system and _system['TGID'] == _dst_id and _system['TS'] == _slot and _system['ACTIVE'] == True):
|
if (_system['SYSTEM'] == self._system and _system['TGID'] == _dst_id and _system['TS'] == _slot and _system['ACTIVE'] == True):
|
||||||
|
|
||||||
self.to_target(_peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system)
|
self.to_target(_peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system,False)
|
||||||
|
|
||||||
#Send to reflector or TG too, if it exists
|
#Send to reflector or TG too, if it exists
|
||||||
if _bridge[0:1] == '#':
|
if _bridge[0:1] == '#':
|
||||||
|
@ -969,7 +971,7 @@ class routerOBP(OPENBRIDGE):
|
||||||
else:
|
else:
|
||||||
_bridge = '#'+_bridge
|
_bridge = '#'+_bridge
|
||||||
if _bridge in BRIDGES:
|
if _bridge in BRIDGES:
|
||||||
self.to_target(_peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system)
|
self.to_target(_peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system,True)
|
||||||
|
|
||||||
|
|
||||||
# Final actions - Is this a voice terminator?
|
# Final actions - Is this a voice terminator?
|
||||||
|
@ -1046,7 +1048,7 @@ class routerHBP(HBSYSTEM):
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def to_target(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system):
|
def to_target(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system,_noOBP):
|
||||||
for _target in BRIDGES[_bridge]:
|
for _target in BRIDGES[_bridge]:
|
||||||
if _target['SYSTEM'] != self._system or (_target['SYSTEM'] == self._system and _target['TS'] != _slot):
|
if _target['SYSTEM'] != self._system or (_target['SYSTEM'] == self._system and _target['TS'] != _slot):
|
||||||
if _target['ACTIVE']:
|
if _target['ACTIVE']:
|
||||||
|
@ -1054,6 +1056,8 @@ class routerHBP(HBSYSTEM):
|
||||||
_target_system = self._CONFIG['SYSTEMS'][_target['SYSTEM']]
|
_target_system = self._CONFIG['SYSTEMS'][_target['SYSTEM']]
|
||||||
|
|
||||||
if _target_system['MODE'] == 'OPENBRIDGE':
|
if _target_system['MODE'] == 'OPENBRIDGE':
|
||||||
|
if _noOBP == True:
|
||||||
|
continue
|
||||||
# Is this a new call stream on the target?
|
# Is this a new call stream on the target?
|
||||||
if (_stream_id not in _target_status):
|
if (_stream_id not in _target_status):
|
||||||
# This is a new call stream on the target
|
# This is a new call stream on the target
|
||||||
|
@ -1366,7 +1370,7 @@ class routerHBP(HBSYSTEM):
|
||||||
|
|
||||||
if (_system['SYSTEM'] == self._system and _system['TGID'] == _dst_id and _system['TS'] == _slot and _system['ACTIVE'] == True):
|
if (_system['SYSTEM'] == self._system and _system['TGID'] == _dst_id and _system['TS'] == _slot and _system['ACTIVE'] == True):
|
||||||
|
|
||||||
self.to_target(_peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system)
|
self.to_target(_peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system,False)
|
||||||
|
|
||||||
#Send to reflector or TG too, if it exists
|
#Send to reflector or TG too, if it exists
|
||||||
if _bridge[0:1] == '#':
|
if _bridge[0:1] == '#':
|
||||||
|
@ -1374,7 +1378,7 @@ class routerHBP(HBSYSTEM):
|
||||||
else:
|
else:
|
||||||
_bridge = '#'+_bridge
|
_bridge = '#'+_bridge
|
||||||
if _bridge in BRIDGES:
|
if _bridge in BRIDGES:
|
||||||
self.to_target(_peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system)
|
self.to_target(_peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system,True)
|
||||||
|
|
||||||
# Final actions - Is this a voice terminator?
|
# Final actions - Is this a voice terminator?
|
||||||
if (_frame_type == HBPF_DATA_SYNC) and (_dtype_vseq == HBPF_SLT_VTERM) and (self.STATUS[_slot]['RX_TYPE'] != HBPF_SLT_VTERM):
|
if (_frame_type == HBPF_DATA_SYNC) and (_dtype_vseq == HBPF_SLT_VTERM) and (self.STATUS[_slot]['RX_TYPE'] != HBPF_SLT_VTERM):
|
||||||
|
@ -1686,4 +1690,7 @@ if __name__ == '__main__':
|
||||||
mysql = mysql_task.start(60)
|
mysql = mysql_task.start(60)
|
||||||
mysql.addErrback(loopingErrHandle)
|
mysql.addErrback(loopingErrHandle)
|
||||||
|
|
||||||
|
#more threads
|
||||||
|
reactor.suggestThreadPoolSize(30)
|
||||||
|
|
||||||
reactor.run()
|
reactor.run()
|
||||||
|
|
|
@ -7,51 +7,56 @@ class Proxy(DatagramProtocol):
|
||||||
|
|
||||||
def __init__(self,ListenPort,connTrack,Timeout,Debug):
|
def __init__(self,ListenPort,connTrack,Timeout,Debug):
|
||||||
self.connTrack = connTrack
|
self.connTrack = connTrack
|
||||||
|
self.sourceTrack = {}
|
||||||
self.timeout = Timeout
|
self.timeout = Timeout
|
||||||
self.debug = Debug
|
self.debug = Debug
|
||||||
|
|
||||||
def datagramReceived(self, data, addr):
|
def datagramReceived(self, data, addr):
|
||||||
host,port = addr
|
host,port = addr
|
||||||
|
|
||||||
|
nowtime = time()
|
||||||
|
|
||||||
Debug = self.debug
|
Debug = self.debug
|
||||||
|
|
||||||
#If the packet comes from the master
|
#If the packet comes from the master
|
||||||
if host == '127.0.0.1' and port in self.connTrack:
|
if host == '127.0.0.1' and port in self.connTrack:
|
||||||
if int(self.connTrack[port]['time'])+self.timeout > time():
|
if int(self.connTrack[port]['time'])+self.timeout >nowtime:
|
||||||
self.transport.write(data,(self.connTrack[port]['host'],self.connTrack[port]['sport']))
|
self.transport.write(data,(self.connTrack[port]['host'],self.connTrack[port]['sport']))
|
||||||
#if master refuses login, remove tracking and block for timeout seconds
|
#if master refuses login, remove tracking and block fornowtimeout seconds
|
||||||
if data == b'MSTNAK\x00#\xbf"':
|
#if data == b'MSTNAK\x00#\xbf"':
|
||||||
self.connTrack[port]['time'] = False
|
#self.connTrack[port]['time'] = False
|
||||||
self.connTrack[port]['nacktime'] = time()+self.timeout
|
#self.connTrack[port]['nacktime'] =nowtime+self.timeout
|
||||||
if Debug:
|
if Debug:
|
||||||
|
print("return path match")
|
||||||
print(data)
|
print(data)
|
||||||
|
elif host+str(port) in self.sourceTrack:
|
||||||
|
del self.sourceTrack[host+str(port)]
|
||||||
return
|
return
|
||||||
|
|
||||||
for dport in self.connTrack:
|
#If we have a sourcetrack for this connect and thenowtimeout has not expired, forward to tracked port
|
||||||
#If blocked from refused login, ignore the packet if its been less than nacktime
|
if host+str(port) in self.sourceTrack and (int(self.sourceTrack[host+str(port)]['time'])+self.timeout >nowtime):
|
||||||
if int(self.connTrack[dport]['nacktime']) + self.timeout > time():
|
self.transport.write(data, ('127.0.0.1',self.sourceTrack[host+str(port)]['dport']))
|
||||||
if Debug:
|
self.connTrack[self.sourceTrack[host+str(port)]['dport']]['time'] =nowtime
|
||||||
print("NACK\n")
|
self.sourceTrack[host+str(port)]['time'] =nowtime
|
||||||
return
|
|
||||||
#If we have a conntrack for this connect and the timeout has not expired, forward to tracked port
|
|
||||||
if self.connTrack[dport]['host'] == host and self.connTrack[dport]['sport'] == port and (int(self.connTrack[dport]['time'])+self.timeout > time()):
|
|
||||||
self.connTrack[dport]['time'] = time()
|
|
||||||
self.connTrack[dport]['host'] = host
|
|
||||||
self.connTrack[dport]['sport'] = port
|
|
||||||
self.transport.write(data, ('127.0.0.1',dport))
|
|
||||||
self.connTrack[dport]['time'] = time()
|
|
||||||
if Debug:
|
if Debug:
|
||||||
|
print("Tracked inbound match")
|
||||||
print(data)
|
print(data)
|
||||||
return
|
return
|
||||||
|
elif host+str(port) in self.sourceTrack:
|
||||||
|
del self.sourceTrack[host+str(port)]
|
||||||
|
|
||||||
#Find free port to map for new connection
|
#Find free port to map for new connection
|
||||||
for dport in self.connTrack:
|
for dport in self.connTrack:
|
||||||
if (self.connTrack[dport]['time'] == False or (int(self.connTrack[dport]['time'])+self.timeout < time())):
|
if (self.connTrack[dport]['time'] == False or (int(self.connTrack[dport]['time'])+self.timeout <nowtime)):
|
||||||
self.connTrack[dport]['sport'] = port
|
self.connTrack[dport]['sport'] = port
|
||||||
self.connTrack[dport]['host'] = host
|
self.connTrack[dport]['host'] = host
|
||||||
self.connTrack[dport]['time'] = time()
|
self.connTrack[dport]['time'] =nowtime
|
||||||
|
self.sourceTrack[host+str(port)] = {}
|
||||||
|
self.sourceTrack[host+str(port)]['dport'] = dport
|
||||||
|
self.sourceTrack[host+str(port)]['time'] =nowtime
|
||||||
self.transport.write(data, ('127.0.0.1',dport))
|
self.transport.write(data, ('127.0.0.1',dport))
|
||||||
if Debug:
|
if Debug:
|
||||||
|
print("New connection")
|
||||||
print(data)
|
print(data)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -66,7 +71,7 @@ if __name__ == '__main__':
|
||||||
DestPortEnd = 54002
|
DestPortEnd = 54002
|
||||||
Timeout = 35
|
Timeout = 35
|
||||||
Stats = True
|
Stats = True
|
||||||
Debug = False
|
Debug = True
|
||||||
|
|
||||||
#*******************
|
#*******************
|
||||||
|
|
||||||
|
@ -79,13 +84,14 @@ if __name__ == '__main__':
|
||||||
reactor.listenUDP(ListenPort,Proxy(ListenPort,CONNTRACK,Timeout,Debug))
|
reactor.listenUDP(ListenPort,Proxy(ListenPort,CONNTRACK,Timeout,Debug))
|
||||||
|
|
||||||
def loopingErrHandle(failure):
|
def loopingErrHandle(failure):
|
||||||
print('(GLOBAL) STOPPING REACTOR TO AVOID MEMORY LEAK: Unhandled error in timed loop.\n {}'.format(failure))
|
print('(GLOBAL) STOPPING REACTOR TO AVOID MEMORY LEAK: Unhandled error innowtimed loop.\n {}'.format(failure))
|
||||||
reactor.stop()
|
reactor.stop()
|
||||||
|
|
||||||
def stats():
|
def stats():
|
||||||
count = 0
|
count = 0
|
||||||
|
nowtime = time()
|
||||||
for port in CONNTRACK:
|
for port in CONNTRACK:
|
||||||
if int(CONNTRACK[port]['time'])+Timeout > time():
|
if int(CONNTRACK[port]['time'])+Timeout > nowtime:
|
||||||
count = count+1
|
count = count+1
|
||||||
|
|
||||||
totalPorts = DestPortEnd - DestportStart
|
totalPorts = DestPortEnd - DestportStart
|
||||||
|
|
Loading…
Reference in New Issue