pYSFReflector/YSFReflector

612 lines
18 KiB
Python
Executable File

#!/usr/bin/python3
# pYSFReflector
#
# Created by Antonio Matraia (IU5JAE) on 20/02/2021.
# Copyright 2021 Antonio Matraia (IU5JAE). All rights reserved.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import socket
import threading
import queue
import sys
import os
import time
import re
import configparser
import datetime
import signal
from datetime import datetime
import bisect
import struct
def ip2long(ip):
packed = socket.inet_aton(ip)
lng = struct.unpack("!L", packed)[0]
return lng
def long2ip(lng):
packed = struct.pack("!L", lng)
ip = socket.inet_ntoa(packed)
return ip
## gestione liste LH ecc ##
def inserisci_lista(lista, elemento, n_max):
if (len(lista) < n_max):
lista.append(elemento)
else:
for i in range(n_max - 1):
lista[i] = lista[i+1]
lista[n_max-1] = elemento
def stampa_lista(lista):
for i in range(len(lista)):
print(lista[len(lista)-i-1])
#################################
def inlist(a, x):
i = bisect.bisect_left(a, x)
if i != len(a) and a[i] == x:
return True
else:
return False
def check_string(l):
s = ''
for c in l:
if ((not c.isprintable()) and (ord(c) != 10)):
s += '<' +str(ord(c)) + '>'
if c.isprintable():
s += c
return s
def RecvData(sock,recvPackets):
while True:
data,addr = sock.recvfrom(1024) # bloccante se non ricevo niente
recvPackets.put((data,addr))
def CalcID(ref):
c = ref.strip().ljust(16)
u = 0
for a in c:
u = (u + ord(a)) & 0xFFFFFFFF
u = (u + (u << 10) & 0xFFFFFFFF) & 0xFFFFFFFF
u = (u ^ (u >> 6)) & 0xFFFFFFFF
u = (u + (u << 3) & 0xFFFFFFFF) & 0xFFFFFFFF
u = (u ^ (u >> 11))& 0xFFFFFFFF
u = (u + (u << 15)& 0xFFFFFFFF)& 0xFFFFFFFF
u = u % 100000
return u
def getidgw(cl, adr):
i=[0, '']
for c in cl:
if ((c[0] == adr[0]) and (c[1] == adr[1])):
i = [c[4], c[2]]
break
return i
def ElencoNodi(cl):
while True:
time.sleep(120)
cl_lo = []
if (len(cl) == 0):
printlog('No repeaters/gateways linked')
else:
printlog('Currently linked repeaters/gateways:')
for c in cl:
printlog(' ' + c[2].ljust(10) + ': ' + str(c[0]) + ':' + str(c[1]) + ' ' + str(c[3]) + '/60')
if (c[5] == 1):
cl_lo.append(c)
if (len(cl_lo) == 0):
printlog('No repeaters/gateways muted')
else:
printlog('Currently muted repeaters/gateways:')
for c in cl_lo:
printlog(' ' + c[2].ljust(10) + ': ' + str(c[0]) + ':' + str(c[1]) + ' ' + str(c[3]) + '/60')
def TimeoutNodi(cl):
while True:
time.sleep(1)
for c in cl:
c[3] += 1
if (c[3] > 60):
printlog('Removing ' + c[2].ljust(10) + ' (' + c[0] + ':' + str(c[1]) + ') disappeared')
cl.remove(c)
def TimeoutTX(t, t_lock, r_lock, lista_lh):
while True:
if (t[1] < 5):
t[1] += 0.1
if ((t[1] > 1.0) and (t[0] != 0)):
t[0] = 0
printlog('Network watchdog has expired')
inserisci_lista(lista_lh, [check_string(t[2]), check_string(t[3]), check_string(t[4]), t[5], datetime.fromtimestamp(t[6]).strftime("%d-%m-%Y %H-%M-%S"), round(time.time() - t[6]) ], 20)
t[0] = 0
t[2] = ''
t[3] = ''
t[4] = ''
t[5] = 0
t[6] = 0
pop_list = []
for d in t_lock:
if (t_lock[d] < 5):
t_lock[d] += 0.1
if ((t_lock[d] > 1.5) and (t_lock[d] != 0)):
pop_list.append(d)
r_lock.remove(d)
for x in pop_list:
t_lock.pop(x)
printlog('Removed from blockeds queue ' + str(x))
time.sleep(0.1)
def canTrasmit(cs, re_en):
global BLACK_LIST
global WHITE_LIST
call_sp = re.split(r'[-/]', cs)
call = call_sp[0]
if inlist(WHITE_LIST, call):
return True
if inlist(BLACK_LIST, call):
return False
if (len(call_sp) > 1):
if (call_sp[1] == 'RPT'):
return False
if re_en:
if (re.match(r'^\d?[A-Z]{1,2}\d{1,4}[A-Z]{1,3}$',call,re.IGNORECASE) and (len(call) <= 8)):
return True
else:
return False
else:
return True
def lista_gw(cl):
info = ''
for c in cl:
info += c[2] + ':' + c[0] + ':' + str(c[1]) + ';'
return info
def lista_invio(lista):
info = ''
for i in range(len(lista)):
info += lista[len(lista)-i-1][0] + ':' + lista[len(lista)-i-1][1] + ':' + lista[len(lista)-i-1][2] + ':' + str(lista[len(lista)-i-1][3]) + ':' + lista[len(lista)-i-1][4] + ':' + str(lista[len(lista)-i-1][5]) + ';'
return info
def update_clients(cl):
global GW_BL
global IP_BL
for c in cl:
if (inlist(GW_BL, c[2]) or inlist(IP_BL, ip2long(c[0]))):
c[5] = 1
else:
c[5] = 0
def blacklist(f_bl, t_reload, cli):
global BLACK_LIST
global WHITE_LIST
global GW_BL
global IP_BL
f_time_old = 0
try:
f_time = os.stat(f_bl).st_mtime
except:
pass
while True:
f_time = os.stat(f_bl).st_mtime
if (f_time != f_time_old):
try:
file = open(f_bl)
BL_TMP = []
GW_TMP = []
IP_TMP = []
WL_TMP = []
printlog('Reload the Blacklist from File')
for row in file:
content = row.strip()
# riga valida
if ((len(content) > 3) and (content[0] != '#')):
c_split = content.split(':')
# CALL
if (len(c_split) == 1 or c_split[0] == 'CS'):
if (len(c_split) == 1):
cont = content
if (c_split[0] == 'CS'):
cont = c_split[1]
if ((len(cont) <= 8) and (len(cont) >= 3)):
if (not inlist(BL_TMP, cont)):
bisect.insort(BL_TMP,cont)
# WL
if (len(c_split) == 2 and c_split[0] == 'AL'):
if (not inlist(WL_TMP, c_split[1])):
bisect.insort(WL_TMP,c_split[1])
# GW
if (len(c_split) == 2 and c_split[0] == 'GW'):
if (not inlist(GW_TMP, c_split[1])):
bisect.insort(GW_TMP,c_split[1])
# IP
if (len(c_split) == 2 and c_split[0] == 'IP'):
try:
ipa = socket.gethostbyname(c_split[1])
ipl = ip2long(ipa)
except:
ipl = 0
printlog('Invalid hostname ' + c_split[1])
if (ipl > 0):
if (not inlist(IP_TMP, ipl)):
bisect.insort(IP_TMP, ipl)
file.close()
except Exception as ex:
printlog('Failed to load Blacklist from File ')
BLACK_LIST = BL_TMP.copy()
WHITE_LIST = WL_TMP.copy()
GW_BL = GW_TMP.copy()
IP_BL = IP_TMP.copy()
f_time_old = f_time
update_clients(cli)
else:
pass
time.sleep(t_reload)
## lettura file configurazione ##
def ReadConfig(f,p):
config = configparser.ConfigParser()
config_file = f.strip()
config.read(config_file)
name = config['Info']['Name']
description = config['Info']['Description']
try:
id = int(config['Info']['id'])
except:
id = 0
log_path = config['Log']['FilePath']
log_name = config['Log']['FileRoot']
try:
file_rotate = config['Log']['FileRotate']
except:
file_rotate = "1" # to keep file-rotation by default with timestamp
try:
port = int(config['Network']['Port'])
except:
port = 42000
try:
file_blacklist = config['Block List']['File']
except:
file_blacklist = ''
try:
CheckRE = int(config['Block List']['CheckRE'])
except:
CheckRE = 1
try:
t_reload_blacklist = float(config['Block List']['Time'])
except:
t_reload_blacklist = 5.0
if (t_reload_blacklist < 0.1):
t_reload_blacklist = 0.1
p.append(id) # 0
p.append(name) # 1
p.append(description) # 2
p.append(log_path) # 3
p.append(log_name) # 4
p.append(port) # 5
p.append(file_blacklist) # 6
p.append(t_reload_blacklist) # 7
p.append(file_rotate) # 8
p.append(CheckRE) # 9
def sanitize_msg(data):
bya_msg = bytearray(data)
if ((data[0:4] == b"YSFP") and (len(data) == 14)):
for i in range(10):
if (bya_msg[i+4] == 0):
bya_msg[i+4] = 32
if ((data[0:4] == b"YSFD") and (len(data) == 155)):
for i in range(30):
if (bya_msg[i+4] == 0):
bya_msg[i+4] = 32
return(bytes(bya_msg))
def RunServer(config):
global filelog
global version
global BLACK_LIST
global GW_BL
global IP_BL
host = '0.0.0.0'
port = config[5]
s = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
s.setblocking(1)
s.bind((host,port))
clients = [] # addr, port, gw, t_corr, ID, lonly
c = []
rx_lock = []
rx_lock_tout = {}
# lista LH
LH = []
# lista bloccati
BL = []
id = 1 # id nodo
id_str = 1 # id stream
tx = [0, 0, '', '', '', 0, 0] # id_nodo, tout, gateway, src, dest, id_stream, start_time
refl_name = config[1]
refl_desc = config[2]
if ((config[0] > 0) and (config[0] < 1000000)):
refl_id = str(config[0]).zfill(5)
else:
refl_id = CalcID(refl_name)
f_blacklist = config[6]
tr_blacklist = config[7] * 60.0
CheckRE = config[9]
recvPackets = queue.Queue()
print('Starting YSFReflector-' + version)
printlog('Starting YSFReflector-' + version)
threading.Thread(target=RecvData,args=(s,recvPackets)).start()
threading.Thread(target=ElencoNodi,args=(clients,)).start()
threading.Thread(target=TimeoutNodi,args=(clients,)).start()
threading.Thread(target=TimeoutTX,args=(tx,rx_lock_tout,rx_lock,LH)).start()
if (len(f_blacklist) > 0):
threading.Thread(target=blacklist,args=(f_blacklist,tr_blacklist,clients)).start()
time_start = time.time()
while True:
data_ns, addr = recvPackets.get() # bloccante se coda vuota
data = sanitize_msg(data_ns)
cmd = data[0:4]
if (cmd == b'YSFP'):
pres = False
for c in clients:
if ((c[0] == addr[0]) and (c[1] == addr[1])):
pres = True
c[3] = 0
break
if not pres:
lonly = 0
if inlist(GW_BL, (data[4:14]).decode().strip()):
lonly = 1
if inlist(IP_BL, ip2long(addr[0])):
lonly = 1
c=[addr[0], addr[1], (data[4:14]).decode().strip(), 0, id, lonly]
id += 1
clients.append(c)
printlog('Adding ' + c[2].ljust(10) + ' (' + c[0] + ':' + str(c[1]) + ')')
s.sendto(b'YSFPREFLECTOR ',addr)
if (cmd == b'YSFU'):
for c in clients:
if ((c[0] == addr[0]) and (c[1] == addr[1])):
printlog('Removing ' + c[2].ljust(10) + ' (' + c[0] + ':' + str(c[1]) + ') unlinked')
clients.remove(c)
break
if ((cmd == b'YSFD') and (len(data) == 155)):
[id_corr, gw_corr] = getidgw(clients, addr)
if (tx[0] == 0):
if inlist(GW_BL, gw_corr):
tx_ok = False
block_r = 'GW'
else:
if inlist(IP_BL, ip2long(addr[0])):
tx_ok = False
block_r = 'IP'
else:
tx_ok = canTrasmit(data[14:24].decode().strip(), CheckRE)
block_r = 'CS'
if tx_ok:
if ((tx[0] == 0) and (id_corr != 0)):
tx[0] = id_corr
# gateway
tx[2] = data[4:14].decode().strip()
# src
tx[3] = data[14:24].decode().strip()
# dest
tx[4] = data[24:34].decode().strip()
# stream ID
tx[5] = id_str
# time start
tx[6] = time.time()
id_str += 1
printlog('Received data from ' + tx[3].ljust(10) + ' to ' + tx[4].ljust(10) + ' at ' + tx[2].ljust(10))
else:
if (id_corr not in rx_lock):
rx_lock.append(id_corr)
inserisci_lista(BL, [check_string(data[4:14].decode().strip()), check_string(data[14:24].decode().strip()), check_string(data[24:34].decode().strip()), -1, datetime.fromtimestamp(time.time()).strftime("%d-%m-%Y %H-%M-%S"), -1 ], 20)
printlog('Data from ' + data[14:24].decode().strip().ljust(10) + ' at ' + data[4:14].decode().strip().ljust(10) + ' blocked/' + block_r)
# printlog('Bloccato: ' + data[14:24].decode().strip() + ' via ' + data[4:14].decode().strip())
rx_lock_tout[id_corr] = 0
if ((id_corr == tx[0]) and (id_corr != 0)):
tx[1] = 0
for c in clients:
if (((c[0] != addr[0]) or (c[1] != addr[1])) and (id_corr == tx[0]) and (id_corr != 0) and (id_corr not in rx_lock)):
s.sendto(data,(c[0], c[1]))
if ((data[34] & 0x01) == 0x01):
if (tx[0] != 0):
printlog('Received end of transmission')
inserisci_lista(LH, [check_string(tx[2]), check_string(tx[3]), check_string(tx[4]), tx[5], datetime.fromtimestamp(tx[6]).strftime("%d-%m-%Y %H-%M-%S"), round(time.time() - tx[6]) ], 20)
tx[0] = 0
tx[2] = ''
tx[3] = ''
tx[4] = ''
tx[5] = 0
tx[6] = 0
if (cmd == b'YSFS'):
# printlog('YSF server status enquiry from ' + addr[0] + ':' + str(addr[1]))
if (len(clients) > 999):
num_cli = 999
else:
num_cli = len(clients)
info = 'YSFS' + str(refl_id).zfill(5) + refl_name.ljust(16) + refl_desc.ljust(14) + str(num_cli).zfill(3)
s.sendto(str.encode(info),addr)
## messaggi per report attivo ##
if (cmd == b'QSRU'):
printlog('Received command ' + cmd.decode() + ' from: ' + addr[0] + ':' + str(addr[1]))
info = 'ASRU;' + str(round(time.time()-time_start)) + ';'
s.sendto(str.encode(info),addr)
if (cmd == b'QSRI'):
printlog('Received command ' + cmd.decode() + ' from: ' + addr[0] + ':' + str(addr[1]))
info = 'ASRI;' + str(refl_id) + ':' + refl_name + ':' + refl_desc + ';'
s.sendto(str.encode(info),addr)
if (cmd == b'QGWL'):
printlog('Received command ' + cmd.decode() + ' from: ' + addr[0] + ':' + str(addr[1]))
info = 'AGWL;' + lista_gw(clients)
s.sendto(str.encode(info),addr)
if (cmd == b'QLHL'):
printlog('Received command ' + cmd.decode() + ' from: ' + addr[0] + ':' + str(addr[1]))
info = 'ALHL;' + lista_invio(LH)
s.sendto(str.encode(info),addr)
if (cmd == b'QREJ'):
printlog('Received command ' + cmd.decode() + ' from: ' + addr[0] + ':' + str(addr[1]))
info = 'AREJ;' + lista_invio(BL)
s.sendto(str.encode(info),addr)
if (cmd == b'QBLK'):
printlog('Received command ' + cmd.decode() + ' from: ' + addr[0] + ':' + str(addr[1]))
s_info = ''
for c in BLACK_LIST:
s_info += c + ';'
info = 'ABLK;' + s_info
s.sendto(str.encode(info),addr)
s.close()
def printlog(mess):
global filelog
global log_basename
global file_rotate
if file_rotate == "1":
log_file = log_basename + '-' + str(datetime.utcnow().strftime('%Y-%m-%d')) + '.log'
else:
log_file = log_basename + '.log'
try:
if not os.path.isfile(log_file):
filelog.flush()
filelog.close()
filelog = open(log_file,'x')
except:
pass
str_log = check_string('M: ' + str(datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f'))[:-3] + ' ' + mess)
try:
filelog.write(str_log + '\n')
filelog.flush()
except:
pass
######## main ########
version = '20210322'
if (len(sys.argv) != 2):
print('Invalid Number of Arguments')
print('use: YSFReflector <configuration file>')
sys.exit()
if (sys.argv[1].strip() == '-v'):
print('YSFReflector version ' + version)
sys.exit()
## lettura configurazione ##
config=[]
try:
ReadConfig(sys.argv[1].strip(), config)
except:
print('Unable to read configuration file')
sys.exit()
log_basename = config[3] + '/' + config[4]
file_rotate = config[8]
### log
if file_rotate == "1":
log_file = log_basename + '-' + str(datetime.utcnow().strftime('%Y-%m-%d')) + '.log'
else:
log_file = log_basename + '.log'
try:
if os.path.isfile(log_file):
filelog = open(log_file,'a')
else:
filelog = open(log_file,'x')
except:
print('Unable to Open Log File')
sys.exit()
BLACK_LIST = []
WHITE_LIST = []
GW_BL = []
IP_BL = []
RunServer(config)