This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
lijia-tsg-oam/py_src/tsg_monit_stream.py
2019-07-29 18:43:18 +08:00

447 lines
16 KiB
Python

#!/usr/bin/env python2
import argparse
import json
import prettytable
import time
import sys
import signal
import os
import telegraf
import socket
TBPS = (1 * 1000 * 1000 * 1000 * 1000)
GBPS = (1 * 1000 * 1000 * 1000)
MBPS = (1 * 1000 * 1000)
KBPS = (1 * 1000)
G_JSON_PATH = '/var/run/mrzcpd/mrmonit.daemon'
G_APP_JSON_PATH = '/var/run/mrzcpd/mrmonit.app.%s'
#TITLE_VECTOR_RX = ['RxOnline', 'RxDeliver', 'RxMissed', 'RxBits']
#TITLE_VECTOR_TX = ['TxOnline', 'TxDeliver', 'TxMissed', 'TxBits']
#TITLE_VECTOR_FTX = ['FTXOnline', 'FTXDeliver', 'FTXMissed', 'FTXBits']
TITLE_VECTOR_RX = ['RxPkts', 'RxBits', 'RxDrops']
TITLE_VECTOR_TX = ['TxPkts', 'TxBits', 'TxDrops']
TITLE_VECTOR_FTX = ['FTxPkts', 'FTxBits', 'FTxDrops']
TITLE_APP_STAT = ['PKTRx', 'PKTTx', 'MbufAlloc', 'MbufFree', 'MbufInUse']
TITLE_APP_MAP = {
'PKTRx' : 'packet_recv_count',
'PKTTx' : 'packet_send_count',
'MbufAlloc' : 'mbuf_alloc_count',
'MbufFree' : 'mbuf_free_count',
'MbufInUse' : 'mbuf_in_use_count'
}
TITLE_MAP = { 'RxOnline' : 'rx_on_line',
'RxPkts' : 'rx_deliver',
'RxDrops' : 'rx_missed',
'RxBits' : 'rx_total_len',
'TxOnline' : 'tx_on_line',
'TxPkts' : 'tx_deliver',
'TxDrops' : 'tx_missed',
'TxBits' : 'tx_total_len',
'FTXOnline' : 'ftx_on_line',
'FTxPkts' : 'ftx_deliver',
'FTxDrops' : 'ftx_missed',
'FTxBits' : 'ftx_total_len'
}
# ##################################add by lijia for tsg oam
TITLE_VECTOR = ['RxPkts', 'RxBits', 'RxDrops', 'TxPkts', 'TxBits', 'TxDrops']
udp_sock_fd = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
remote_ip_port = ('192.168.11.219', 8126)
# ##################################add by lijia for tsg oam
def locate_vector_by_symbol(vector, symbol):
return [s for s in vector if s['symbol'] == symbol]
def list_all_vdev(json_fp):
return [ s['symbol'] for s in json_fp['raw']]
def vdev_value_read(json_fp, str_device, str_item):
phydevs = locate_vector_by_symbol(json_fp['raw'], str_device)
return phydevs[0]['stats']['accumulative'][str_item]
def vdev_speed_read(json_fp, str_device, str_item):
phydevs = locate_vector_by_symbol(json_fp['raw'], str_device)
return phydevs[0]['stats']['speed'][str_item]
def vdev_streams_read(json_fp, str_device):
phydevs = locate_vector_by_symbol(json_fp['raw'], str_device)
return phydevs[0]['rxstreams'], phydevs[0]['txstreams']
def trans_to_human_readable(value):
if value > TBPS:
return value * 1.0 / TBPS, 'T'
if value > GBPS:
return value * 1.0 / GBPS, 'G'
if value > MBPS:
return value * 1.0 / MBPS, 'M'
if value > KBPS:
return value * 1.0 / KBPS, 'K'
return value * 1.0, ' '
def vec_trans_to_human_readable(vec):
r_vector = []
for value in vec:
h_value, h_value_unit = trans_to_human_readable(value)
r_vector.append('%7.2f%c' % (h_value, h_value_unit))
return r_vector
def dump_one_device(json_fp, devsym, title_vector_rx, title_vector_tx, speed):
__rd_function = vdev_value_read if speed == 0 else vdev_speed_read
ValueListSum = [0] * len(title_vector_rx + title_vector_tx)
nr_rxstream, nr_txstream = vdev_streams_read(json_fp, devsym)
for stream_id in range(max(nr_rxstream, nr_txstream)):
ValueList = []
for item in title_vector_rx:
value = __rd_function(json_fp, devsym, TITLE_MAP[item])[stream_id] \
if stream_id < nr_rxstream else 0
ValueList.append(value)
for item in title_vector_tx:
value = __rd_function(json_fp, devsym, TITLE_MAP[item])[stream_id] \
if stream_id < nr_txstream else 0
ValueList.append(value)
for i,v in enumerate(ValueList):
ValueListSum[i] += v
return ValueListSum
def dump_summary_table(json_fp, appsym, devsym, title_vector_rx, title_vector_tx,
is_human_number = 0, speed = 0):
print('\nTime: %s, App: %s' % (time.strftime('%c'), appsym))
table_phydev = prettytable.PrettyTable([' '] + title_vector_rx + title_vector_tx,
vertical_char=' ',horizontal_char = '-', junction_char=' ')
for item in[' '] + title_vector_rx + title_vector_tx:
table_phydev.align[item] = 'r'
ValueListTotal = [0] * len(title_vector_rx + title_vector_tx)
for dev in devsym:
ValueListSum = dump_one_device(json_fp, dev, title_vector_rx, title_vector_tx, speed)
for i,v in enumerate(ValueListSum):
ValueListTotal[i] += v
if is_human_number:
table_phydev.add_row([dev] + vec_trans_to_human_readable(ValueListSum))
else:
table_phydev.add_row([dev] + ValueListSum)
# ##################################add by lijia for tsg oam
print("###### %s" %devsym)
line_prot_buf = "app,device=%s,flow_type=inline " %(str(dev))
for num in range(0,5):
#Accumulative
#tmp_str = "%s=%s," %(TITLE_VECTOR[num],ValueList[num+1])
# per second
tmp_str = "%s=%s," %(TITLE_VECTOR[num],ValueListSum[num+1])
line_prot_buf += tmp_str
num += 1
#tmp_str = "%s=%s" %(TITLE_VECTOR[num],ValueList[num+1])
tmp_str = "%s=%s" %(TITLE_VECTOR[num],ValueListSum[num])
line_prot_buf += tmp_str
print(line_prot_buf)
udp_sock_fd.sendto(line_prot_buf, remote_ip_port)
print("######\n")
# ##################################add by lijia for tsg oam
if is_human_number:
table_phydev.add_row(['Total'] + vec_trans_to_human_readable(ValueListTotal))
else:
table_phydev.add_row(['Total'] + ValueListTotal)
print(table_phydev)
def dump_human_table(json_fp, appsym, devsym, title_vector_rx, title_vector_tx,
is_human_number = 0, speed = 0):
print("##### into dump_human_table")
print('\nTime: %s, App: %s, Device: %s ' % (time.strftime('%c'), appsym, devsym))
table_phydev = prettytable.PrettyTable([' '] + title_vector_rx + title_vector_tx,
vertical_char=' ',horizontal_char = '-', junction_char=' ')
__rd_function = vdev_value_read if speed == 0 else vdev_speed_read
for item in[' '] + title_vector_rx + title_vector_tx:
table_phydev.align[item] = 'r'
ValueListSum = [0] * len(title_vector_rx + title_vector_tx)
nr_rxstream, nr_txstream = vdev_streams_read(json_fp, devsym)
for stream_id in range(max(nr_rxstream, nr_txstream)):
ValueList = []
for item in title_vector_rx:
value = __rd_function(json_fp, devsym, TITLE_MAP[item])[stream_id] \
if stream_id < nr_rxstream else 0
ValueList.append(value)
for item in title_vector_tx:
value = __rd_function(json_fp, devsym, TITLE_MAP[item])[stream_id] \
if stream_id < nr_txstream else 0
ValueList.append(value)
str_leader = ''
str_leader += 'RX[%d]' % stream_id if stream_id < nr_rxstream else ''
str_leader += 'TX[%d]' % stream_id if stream_id < nr_txstream else ''
if is_human_number:
table_phydev.add_row([str_leader] + vec_trans_to_human_readable(ValueList))
else:
table_phydev.add_row([str_leader] + ValueList)
for i,v in enumerate(ValueList):
ValueListSum[i] += v
if is_human_number:
table_phydev.add_row(['Total'] + vec_trans_to_human_readable(ValueListSum))
else:
table_phydev.add_row(['Total'] + ValueListSum)
print(table_phydev)
def dump_status_table(json_fp, appsym):
print("##### into dump_status_table")
json_fp_appstat = json_fp['appstat']
nr_stream = len(json_fp['appstat']['packet_recv_count'])
print('\nTime: %s, App: %s' % (time.strftime('%c'), appsym))
table_phydev = prettytable.PrettyTable(['TID'] + TITLE_APP_STAT,
vertical_char=' ',horizontal_char = '-', junction_char=' ')
for item in['TID'] + TITLE_APP_STAT:
table_phydev.align[item] = 'r'
ValueListSum = [0] * len(TITLE_APP_STAT)
for tid in range(nr_stream):
ValueList = []
for item in TITLE_APP_STAT:
value = json_fp_appstat[TITLE_APP_MAP[item]][tid]
ValueList.append(value)
table_phydev.add_row([tid] + ValueList)
for i,v in enumerate(ValueList):
ValueListSum[i] += v
table_phydev.add_row(['Total'] + ValueListSum)
print(table_phydev)
def dump_apm_sendlog(json_fp, telegraf_client, appsym, user_interface, title_vector_rx, title_vector_tx):
for dev in user_interface:
ValueListSumSpeed = dump_one_device(json_fp, dev, title_vector_rx, title_vector_tx, 1)
ValueListSumValue = dump_one_device(json_fp, dev, title_vector_rx, title_vector_tx, 0)
sendlog_dict_speed = {}
sendlog_dict_value = {}
sendlog_tag = {'app': appsym, 'device': dev}
for id, value in enumerate(title_vector_rx + title_vector_tx):
sendlog_dict_speed[value] = ValueListSumSpeed[id]
sendlog_dict_value[value] = ValueListSumValue[id]
telegraf_client.metric('mr4_stream_rxtx_speed', sendlog_dict_speed, tags = sendlog_tag)
telegraf_client.metric('mr4_stream_rxtx_value', sendlog_dict_value, tags = sendlog_tag)
return
def setup_argv_parser(applist):
parser = argparse.ArgumentParser(description='Marsio ZeroCopy Tools -- Monitor stream information',
version = 'Marsio ZeroCopy Tools Suite 4.1')
parser.add_argument('-t', '--time', help = 'interval, seconds to wait between updates',
type=int, default = 1)
parser.add_argument('-l', '--loop', help = 'print loop, exit when recv a signal',
action='store_true', default = 0)
parser.add_argument('-H', '--human-readable', help = 'print value in human readable format',
action = 'store_true', default = 0)
parser.add_argument('-s', '--speed', help = 'print speed value instead of accumulative value',
action = 'store_true', default = 0)
parser.add_argument('-i', '--interface', help = 'the name of network interface',
action = 'append')
parser.add_argument('-m', '--metrics', help = 'group of metrics', choices=['rx','tx','ftx'],
default = ['rx','tx'])
parser.add_argument('--clear-screen', help = 'clear screen at start of loop',
action='store_true', default = 0)
parser.add_argument('--per-stream', help = 'print per thread/stream value',
action='store_true', default = 0)
parser.add_argument('--status', help = 'print application running status',
action='store_true', default = 0)
parser.add_argument('app', metavar='APP', help = 'the name of slave application', nargs = '*',
default=applist)
# APM sendlog options
parser.add_argument('--sendlog-apm', help = 'send log to apm server',
action='store_true', default = 0)
parser.add_argument('--sendlog-apm-cfg', help = 'send log configure file',
type=str, default = '/opt/mrzcpd/etc/mrsendlog.conf')
return parser.parse_args()
def global_json_load():
with open(G_JSON_PATH) as json_fp:
return json.load(json_fp)
def app_json_load(appsym):
with open(G_APP_JSON_PATH % appsym) as json_fp:
return json.load(json_fp)
def app_symbol_load():
j_global = global_json_load()
return [s["symbol"] for s in j_global["app"] if s["registed"] == 1]
def sigint_handler(handler, frame):
sys.exit(0)
def check_vdev_options(json_fp, r_option):
if r_option.interface == None:
return
vdev_list = list_all_vdev(json_fp)
for devsym in r_option.interface:
if devsym not in vdev_list:
print("monit_stream: error: argument -i/--interface: invalid interface.")
sys.exit(1)
def sendlog_hostname(test_hostname, test_port):
import socket
hostname = socket.gethostname()
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect((test_hostname, int(test_port)))
local_ip_addr = s.getsockname()[0]
except:
local_ip_addr = '127.0.0.1'
finally:
s.close()
return hostname, local_ip_addr
def sendlog_apm_init(r_option):
import ConfigParser
import urllib
config = ConfigParser.ConfigParser()
config.read(r_option.sendlog_apm_cfg)
apm_server_url = config.get('sendlog_apm', 'apm_stream_server')
url_proto, rest = urllib.splittype(apm_server_url)
url_host, rest = urllib.splithost(rest)
url_host, url_port = urllib.splitport(url_host)
hostname, local_ip_addr = sendlog_hostname(url_host, url_port)
sendlog_tags = {'host' : hostname, 'local_ip_addr': local_ip_addr}
if url_proto == 'udp':
telegraf_client = telegraf.TelegrafClient(host = url_host,
port = int(url_port), tags = sendlog_tags)
elif url_proto == 'http':
telegraf_client = telegraf.HttpClient(host = url_host,
port = int(url_port), tags = sendlog_tags)
return telegraf_client
def main():
signal.signal(signal.SIGINT, sigint_handler)
# Check Parameters
try:
applist = app_symbol_load()
if len(applist) == 0:
print("monit_stream: error: no running application.")
sys.exit(1)
r_option = setup_argv_parser(applist)
for appsym in r_option.app:
__json_fp = app_json_load(appsym)
except IOError as err:
print("%s, program %s is not running." % (str(err), appsym))
sys.exit(1)
title_vector_rx = []
title_vector_tx = []
if 'rx' in r_option.metrics:
title_vector_rx.extend(TITLE_VECTOR_RX)
if 'tx' in r_option.metrics:
title_vector_tx.extend(TITLE_VECTOR_TX)
if 'ftx' in r_option.metrics:
title_vector_tx.extend(TITLE_VECTOR_FTX)
try:
if r_option.sendlog_apm:
telegraf_client = sendlog_apm_init(r_option)
except:
print("APM sendlog setup failed.")
raise
sys.exit(1)
try:
while True:
if r_option.clear_screen:
os.system('clear')
for appsym in r_option.app:
json_fp = app_json_load(appsym)
check_vdev_options(json_fp, r_option)
user_interface = r_option.interface if r_option.interface != None else list_all_vdev(json_fp)
if r_option.status:
dump_status_table(json_fp, appsym)
continue
if r_option.sendlog_apm:
dump_apm_sendlog(json_fp, telegraf_client, appsym, user_interface,
title_vector_rx, title_vector_tx)
continue
if not r_option.per_stream:
dump_summary_table(json_fp, appsym, user_interface, title_vector_rx, title_vector_tx,
r_option.human_readable, r_option.speed)
else:
for devsym in user_interface:
dump_human_table(json_fp, appsym, devsym, title_vector_rx, title_vector_tx,
r_option.human_readable, r_option.speed)
if not r_option.loop:
break
time.sleep(r_option.time)
except KeyboardInterrupt:
pass
except ValueError as err:
print(("%s, perhaps program is not running.") % str(err))
except IOError as err:
print(("%s, perhaps program is not running.") % str(err))
return 0
if __name__ == '__main__':
main()