rebase onto develop-2.0

This commit is contained in:
lijia
2024-10-18 16:47:51 +08:00
parent 99a68d5c9e
commit e734af76d8
66 changed files with 22616 additions and 6 deletions

View File

@@ -1,5 +1,5 @@
set(INFRA exdata mq tuple packet_manager packet_io ip_reassembly tcp_reassembly session_manager module_manager)
set(DEPS bitmap dablooms interval_tree logger nmx_pool rbtree timeout toml)
set(INFRA exdata mq tuple packet_manager packet_io ip_reassembly tcp_reassembly session_manager module_manager monitor)
set(DEPS bitmap dablooms interval_tree logger nmx_pool rbtree timeout toml ringbuf)
set(DECODERS lpi_plus)
set(WHOLE_ARCHIVE ${DEPS} ${INFRA} ${DECODERS})
set(LIBS fieldstat4)

View File

@@ -0,0 +1,22 @@
# add_subdirectory(enforcer)
add_library(monitor
monitor_cmd_assistant.c
monitor_transaction.c
monitor_server.c
monitor_utils.c
monitor_stat.c
monitor_spinlock.c
monitor_ringbuf.c
monitor_rpc.c
)
include_directories(${CMAKE_SOURCE_DIR}/include/)
include_directories(${CMAKE_SOURCE_DIR}/deps)
include_directories(${CMAKE_SOURCE_DIR}/infra)
target_include_directories(monitor PUBLIC ${CMAKE_CURRENT_LIST_DIR})
set_target_properties(monitor PROPERTIES LINK_FLAGS "-Wl,--version-script=${CMAKE_SOURCE_DIR}/infra/monitor/version.map")
set_target_properties(monitor PROPERTIES PREFIX "")
target_link_libraries(monitor toml sds linenoise tuple session_manager libevent-static libevent-static cjson-static ringbuf)
target_link_options(monitor PRIVATE -rdynamic)

View File

@@ -0,0 +1,11 @@
add_library(monitor_enforcer STATIC show_session_enforcer.c )
include_directories(${CMAKE_SOURCE_DIR}/include/)
include_directories(${CMAKE_SOURCE_DIR}/deps)
include_directories(${CMAKE_SOURCE_DIR}/deps/uthash)
include_directories(${CMAKE_SOURCE_DIR}/deps/timeout)
include_directories(${CMAKE_SOURCE_DIR}/infra)
include_directories(${CMAKE_SOURCE_DIR}/infra/tuple/)
include_directories(${CMAKE_SOURCE_DIR}/infra/packet_manager)
include_directories(${CMAKE_SOURCE_DIR}/infra/tcp_reassembly)
target_include_directories(monitor_enforcer PUBLIC ${CMAKE_CURRENT_LIST_DIR})

View File

@@ -0,0 +1,818 @@
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <stdint.h>
#include <stddef.h>
#include <assert.h>
#include "stellar/session.h"
#include "stellar/monitor.h"
#include "session_manager/session_manager_rte.h"
#include "monitor/monitor_utils.h"
#include "monitor/monitor_rpc.h"
#include "sds/sds.h"
#include "session_manager/session_internal.h"
// temp add
extern struct session_manager_rte *session_manager_get_runtime(struct session_manager *sess_mgr, uint16_t thread_id);
#define SHOW_SESSION_BRIEF_LIMIT_DEFAULT 10
#define SHOW_SESSION_BRIEF_LIMIT_MAX 1000
#define SHOW_SESSION_BRIEF_SCAN_COUNT_DEFAULT 100
/* show session brief */
struct stm_show_session_brief_opt
{
#define SESSION_SCAN_SNET (1 << 25)
#define SESSION_SCAN_DNET (1 << 26)
#define SESSION_SCAN_ID (1 << 27)
#define SESSION_SCAN_CURSOR (1 << 28)
#define SESSION_SCAN_COUNT (1 << 29)
#define SESSION_SCAN_LIMIT (1 << 30)
struct session_scan_opts scan_opt;
uint32_t limit;
};
/* show session detail <id> */
struct stm_show_session_detail_opt
{
uint64_t sess_id;
int thread_idx; // todo, not used now
};
struct show_session_brief_result
{
uint64_t sid;
int thread_index;
enum session_state state;
enum session_type protocol;
time_t create_time_in_sec;
time_t last_pkt_time_in_sec;
uint8_t flow_dir;
struct tuple6 addr;
} __attribute__((packed));
struct show_session_brief_array
{
size_t array_num;
struct show_session_brief_result array[0]; /* Continuous memory, array_num * sizeof(struct show_session_brief_result) */
} __attribute__((packed));
struct show_session_detail_result
{
struct show_session_brief_result sess_brief;
unsigned long long sess_stat[MAX_FLOW_TYPE][MAX_STAT];
enum session_direction direction; // in or out
unsigned char application; // L7 appid
// todo, other info
} __attribute__((packed));
static void stm_session_brief_cli_args_set_default(struct stm_show_session_brief_opt *show_opt)
{
memset(show_opt, 0, sizeof(struct stm_show_session_brief_opt));
show_opt->limit = SHOW_SESSION_BRIEF_LIMIT_DEFAULT;
show_opt->scan_opt.cursor = 0;
show_opt->scan_opt.count = SHOW_SESSION_BRIEF_SCAN_COUNT_DEFAULT;
}
static uint32_t stm_session_brief_cli_args_get_flags(const char *para_name)
{
if (NULL == para_name)
{
return 0;
}
if (strncasecmp(para_name, "id", 2) == 0)
{
return SESSION_SCAN_ID;
}
else if (strncasecmp(para_name, "cursor", 6) == 0)
{
return SESSION_SCAN_CURSOR;
}
else if (strncasecmp(para_name, "count", 5) == 0)
{
return SESSION_SCAN_COUNT;
}
else if (strncasecmp(para_name, "state", 5) == 0)
{
return SESSION_SCAN_STATE;
}
else if (strncasecmp(para_name, "sip", 3) == 0)
{
return SESSION_SCAN_SIP;
}
else if (strncasecmp(para_name, "dip", 3) == 0)
{
return SESSION_SCAN_DIP;
}
else if (strncasecmp(para_name, "snet", 4) == 0)
{
return SESSION_SCAN_SNET;
}
else if (strncasecmp(para_name, "dnet", 4) == 0)
{
return SESSION_SCAN_DNET;
}
else if (strncasecmp(para_name, "sport", 5) == 0)
{
return SESSION_SCAN_SPORT;
}
else if (strncasecmp(para_name, "dport", 5) == 0)
{
return SESSION_SCAN_DPORT;
}
else if (strncasecmp(para_name, "protocol", 8) == 0)
{
return SESSION_SCAN_TYPE;
}
else if (strncasecmp(para_name, "limit", 5) == 0)
{
return SESSION_SCAN_LIMIT;
}
else if (strncasecmp(para_name, "create-time-in", strlen("create-time-in")) == 0)
{
return SESSION_SCAN_CREATE_TIME;
}
else if (strncasecmp(para_name, "last-pkt-time-in", strlen("last-pkt-time-in")) == 0)
{
return SESSION_SCAN_LASPKT_TIME;
}
return 0;
}
static enum session_state show_session_state_pton(const char *state_str)
{
if (strncasecmp(state_str, "opening", strlen("opening")) == 0)
{
return SESSION_STATE_OPENING;
}
else if (strncasecmp(state_str, "active", strlen("active")) == 0)
{
return SESSION_STATE_ACTIVE;
}
else if (strncasecmp(state_str, "closing", strlen("closing")) == 0)
{
return SESSION_STATE_CLOSING;
}
return MAX_STATE;
}
static uint32_t show_session_ipaddr_ntop(const char *ip_string, struct session_scan_opts *scan_opt, uint32_t flag)
{
uint32_t addr_family;
if (SESSION_SCAN_SIP == flag)
{
addr_family = stm_inet_pton(ip_string, &scan_opt->src_addr[0].v4, &scan_opt->src_addr[0].v6);
}
else
{
addr_family = stm_inet_pton(ip_string, &scan_opt->dst_addr[0].v4, &scan_opt->dst_addr[0].v6);
}
if (addr_family == 0)
{
return 0;
}
if (AF_INET == addr_family)
{
if (SESSION_SCAN_SIP == flag)
{
scan_opt->src_addr[1].v4 = scan_opt->src_addr[0].v4;
}
else
{
scan_opt->dst_addr[1].v4 = scan_opt->dst_addr[0].v4;
}
}
else
{
if (SESSION_SCAN_SIP == flag)
{
scan_opt->src_addr[1].v6 = scan_opt->src_addr[0].v6;
}
else
{
scan_opt->dst_addr[1].v6 = scan_opt->dst_addr[0].v6;
}
}
return addr_family;
}
static sds show_session_cli_args_sanity_check(const struct stm_show_session_brief_opt *brief_opt)
{
uint32_t flags = brief_opt->scan_opt.flags;
sds ss = sdsempty();
if ((flags & SESSION_SCAN_SIP) && (flags & SESSION_SCAN_SNET))
{
ss = sdscatprintf(ss, "error: the 'sip' and 'snet' options conflict!");
return ss;
}
if ((flags & SESSION_SCAN_DIP) && (flags & SESSION_SCAN_DNET))
{
ss = sdscatprintf(ss, "error: the 'dip' and 'dnet' options conflict!");
return ss;
}
return ss;
}
static int show_session_is_help(int argc, char *argv[])
{
for (int i = 0; i < argc; i++)
{
if (strncasecmp(argv[i], "help", 4) == 0)
{
return 1;
}
if (strncasecmp(argv[i], "--help", 6) == 0)
{
return 1;
}
if (strncasecmp(argv[i], "-h", 2) == 0)
{
return 1;
}
}
return 0;
}
static struct monitor_reply *show_session_brief_usage(void)
{
sds ss = sdsempty();
ss = sdscatprintf(ss, "Usage: show session brief [options]\n");
ss = sdscatprintf(ss, "Options:\n");
ss = sdscatprintf(ss, " -h, --help help\tdisplay usage information and exit\n");
ss = sdscatprintf(ss, " cursor <cursor>\n");
ss = sdscatprintf(ss, " count <count>\n");
ss = sdscatprintf(ss, " state <opening|active|closing>\n");
ss = sdscatprintf(ss, " protocol <tcp|udp>\n");
ss = sdscatprintf(ss, " sip <source ip>\n");
ss = sdscatprintf(ss, " dip <destination ip>\n");
ss = sdscatprintf(ss, " sport <source port>\n");
ss = sdscatprintf(ss, " dport <destination port>\n");
ss = sdscatprintf(ss, " snet <source network/netmask>\texample 192.168.1.0/24\n");
ss = sdscatprintf(ss, " dnet <destination network/netmask>\texample 1234::abcd/48\n");
ss = sdscatprintf(ss, " create-time-in <last-N-[seconds|minutes|hours|days]>\texample last-1-hours\n");
ss = sdscatprintf(ss, " last-pkt-time-in <last-N-[seconds|minutes|hours|days]>\texample last-7-days\n");
ss = sdscatprintf(ss, " limit <limit>\n");
struct monitor_reply *reply = monitor_reply_new_string("%s", ss);
sdsfree(ss);
return reply;
}
static struct monitor_reply *show_session_detail_usage(void)
{
sds ss = sdsempty();
ss = sdscatprintf(ss, "Usage: show session detial [options]\n");
ss = sdscatprintf(ss, "Options:\n");
ss = sdscatprintf(ss, " -h, --help help\tdisplay usage information and exit\n");
ss = sdscatprintf(ss, " id <session id>\n");
struct monitor_reply *reply = monitor_reply_new_string("%s", ss);
sdsfree(ss);
return reply;
}
/*
"show session ..." command args parser
return:
NULL: success, brief_opt is filled
not NULL: error message
*/
static struct monitor_reply *show_session_brief_cli_args_parse(int argc, char *argv[], struct stm_show_session_brief_opt *brief_opt)
{
uint32_t ipaddr_family = 0, history_ipaddr_family = 0;
sds ss = NULL;
uint32_t flag;
struct monitor_reply *error_reply = NULL;
const char *opt_name, *opt_value;
stm_session_brief_cli_args_set_default(brief_opt);
int i = 3; // skip "show session brief"
while (i < argc)
{
ipaddr_family = 0;
opt_name = argv[i];
flag = stm_session_brief_cli_args_get_flags(opt_name);
if (i + 1 >= argc)
{
error_reply = monitor_reply_new_error("option %s requires an argument\n", opt_name);
return error_reply;
}
opt_value = argv[++i];
switch (flag)
{
case SESSION_SCAN_TYPE:
{
if (strncasecmp(opt_value, "tcp", 3) == 0)
{
brief_opt->scan_opt.type = SESSION_TYPE_TCP;
}
else if (strncasecmp(opt_value, "udp", 3) == 0)
{
brief_opt->scan_opt.type = SESSION_TYPE_UDP;
}
else
{
error_reply = monitor_reply_new_error("unsupported protocol type: %s. should be <tcp|udp>\n", opt_value);
goto error_exit;
}
}
break;
case SESSION_SCAN_STATE:
{
enum session_state tmp_state = show_session_state_pton(opt_value);
if (tmp_state == MAX_STATE)
{
error_reply = monitor_reply_new_error("unrecognized session state: %s. should be <opening|active|closing>\n", opt_value);
goto error_exit;
}
brief_opt->scan_opt.state = tmp_state;
}
break;
// case SESSION_SCAN_ID:
// if (stm_string_isdigit(opt_value) == 0)
// {
// *error_reply = monitor_reply_new_error("invalid session id: %s. should be integer in range: <1 - UINT64_MAX>\n", opt_value);
// goto error_exit;
// }
// show_opt->detail_opt.sess_id = strtoull(opt_value, NULL, 10);
// break;
case SESSION_SCAN_SIP:
ipaddr_family = show_session_ipaddr_ntop(opt_value, &brief_opt->scan_opt, SESSION_SCAN_SIP);
if (ipaddr_family == 0)
{
error_reply = monitor_reply_new_error("invalid sip address: %s\n", opt_value);
goto error_exit;
}
brief_opt->scan_opt.addr_family = ipaddr_family;
break;
case SESSION_SCAN_DIP:
ipaddr_family = show_session_ipaddr_ntop(opt_value, &brief_opt->scan_opt, SESSION_SCAN_DIP);
if (ipaddr_family == 0)
{
error_reply = monitor_reply_new_error("invalid dip address: %s\n", opt_value);
goto error_exit;
}
brief_opt->scan_opt.addr_family = ipaddr_family;
break;
case SESSION_SCAN_SNET:
{
uint32_t ipv4addr, ipv4mask;
struct in6_addr ipv6addr, ipv6mask;
ipaddr_family = stm_ip_cidr_pton(opt_value, &ipv4addr, &ipv4mask, &ipv6addr, &ipv6mask);
if (ipaddr_family == 0)
{
error_reply = monitor_reply_new_error("invalid snet CIDR address: %s\n", opt_value);
goto error_exit;
}
if (AF_INET == ipaddr_family)
{
uint32_t ipv4_range[2];
stm_ipv4_cidr_to_range(ipv4addr, ipv4mask, ipv4_range);
brief_opt->scan_opt.src_addr[0].v4.s_addr = ipv4_range[0];
brief_opt->scan_opt.src_addr[1].v4.s_addr = ipv4_range[1];
}
else
{
struct in6_addr ipv6_range[2];
stm_ipv6_cidr_to_range(&ipv6addr, &ipv6mask, ipv6_range);
brief_opt->scan_opt.src_addr[0].v6 = ipv6_range[0];
brief_opt->scan_opt.src_addr[1].v6 = ipv6_range[1];
}
brief_opt->scan_opt.addr_family = ipaddr_family;
flag = SESSION_SCAN_SIP;
}
break;
case SESSION_SCAN_DNET:
{
uint32_t ipv4addr, ipv4mask;
struct in6_addr ipv6addr, ipv6mask;
ipaddr_family = stm_ip_cidr_pton(opt_value, &ipv4addr, &ipv4mask, &ipv6addr, &ipv6mask);
if (ipaddr_family == 0)
{
error_reply = monitor_reply_new_error("invalid dnet CIDR address: %s\n", opt_value);
goto error_exit;
}
if (AF_INET == ipaddr_family)
{
uint32_t ipv4_range[2];
stm_ipv4_cidr_to_range(ipv4addr, ipv4mask, ipv4_range);
brief_opt->scan_opt.dst_addr[0].v4.s_addr = ipv4_range[0];
brief_opt->scan_opt.dst_addr[1].v4.s_addr = ipv4_range[1];
}
else
{
struct in6_addr ipv6_range[2];
stm_ipv6_cidr_to_range(&ipv6addr, &ipv6mask, ipv6_range);
brief_opt->scan_opt.dst_addr[0].v6 = ipv6_range[0];
brief_opt->scan_opt.dst_addr[1].v6 = ipv6_range[1];
}
brief_opt->scan_opt.addr_family = ipaddr_family;
flag = SESSION_SCAN_DIP;
}
break;
case SESSION_SCAN_SPORT:
{
if (stm_string_isdigit(opt_value) == 0)
{
error_reply = monitor_reply_new_error("illegal sport: %s. should be integer\n", opt_value);
goto error_exit;
}
int tmp_val = atoi(opt_value);
if (tmp_val <= 0 || tmp_val > 65535)
{
error_reply = monitor_reply_new_error("illegal sport: %s. should be <1-65535>\n", opt_value);
goto error_exit;
}
brief_opt->scan_opt.src_port = htons((unsigned short)tmp_val);
}
break;
case SESSION_SCAN_DPORT:
{
if (stm_string_isdigit(opt_value) == 0)
{
error_reply = monitor_reply_new_error("illegal sport: %s. should be integer\n", opt_value);
goto error_exit;
}
int tmp_val = atoi(opt_value);
if (tmp_val <= 0 || tmp_val > 65535)
{
error_reply = monitor_reply_new_error("illegal sport: %s. should be <1-65535>\n", opt_value);
goto error_exit;
}
brief_opt->scan_opt.dst_port = htons((unsigned short)tmp_val);
}
break;
case SESSION_SCAN_CURSOR:
brief_opt->scan_opt.cursor = strtoul(opt_value, NULL, 10);
break;
case SESSION_SCAN_COUNT:
brief_opt->scan_opt.count = strtoul(opt_value, NULL, 10);
if (brief_opt->scan_opt.count == 0)
{
error_reply = monitor_reply_new_error("illegal count: %s. should be <1 - UINT32_MAX>\n", opt_value);
goto error_exit;
}
break;
case SESSION_SCAN_LIMIT:
brief_opt->limit = strtoul(opt_value, NULL, 10);
if (brief_opt->limit == 0 || brief_opt->limit > SHOW_SESSION_BRIEF_LIMIT_MAX)
{
error_reply = monitor_reply_new_error("illegal limit: %s. should be <1 - %u>\n", opt_value, SHOW_SESSION_BRIEF_LIMIT_MAX);
goto error_exit;
}
break;
case SESSION_SCAN_CREATE_TIME:
{
time_t tmp_range[2] = {0, 0};
if (stm_time_range_pton(opt_value, time(NULL), tmp_range) < 0)
{
error_reply = monitor_reply_new_error("invalid create-time-in: %s. \r\nsyntax: <last-N-[seconds|minutes|hours|days]>\n", opt_value);
goto error_exit;
}
brief_opt->scan_opt.create_time_ms[0] = tmp_range[0] * 1000; // second to ms
brief_opt->scan_opt.create_time_ms[1] = tmp_range[1] * 1000; // second to ms
}
break;
case SESSION_SCAN_LASPKT_TIME:
{
time_t tmp_range[2] = {0, 0};
if (stm_time_range_pton(opt_value, time(NULL), tmp_range) < 0)
{
error_reply = monitor_reply_new_error("invalid last-pkt-time-in: %s. \r\nsyntax: <last-N-[seconds|minutes|hours|days]>\n", opt_value);
goto error_exit;
}
brief_opt->scan_opt.laspkt_time_ms[0] = tmp_range[0] * 1000; // second to ms
brief_opt->scan_opt.laspkt_time_ms[1] = tmp_range[1] * 1000; // second to ms
}
break;
default:
error_reply = monitor_reply_new_error("unrecognized params: %s \n", opt_name);
return error_reply;
}
if ((history_ipaddr_family != 0) && (ipaddr_family != 0) && (history_ipaddr_family != ipaddr_family))
{
error_reply = monitor_reply_new_error("contradictory ip version, expression rejects all sessions!\n");
goto error_exit;
}
history_ipaddr_family = ipaddr_family;
i++; // to next option
brief_opt->scan_opt.flags |= flag;
}
ss = show_session_cli_args_sanity_check(brief_opt);
if (ss && sdslen(ss) > 0)
{
error_reply = monitor_reply_new_error("%s\n", ss);
sdsfree(ss);
goto error_exit;
}
sdsfree(ss);
error_reply = NULL;
return NULL;
error_exit:
return error_reply;
}
static void get_single_session_brief(struct session *sess, int thread_id, struct show_session_brief_result *brief)
{
brief->thread_index = thread_id;
brief->state = session_get_current_state(sess);
brief->protocol = session_get_type(sess);
session_is_symmetric(sess, &brief->flow_dir);
brief->create_time_in_sec = session_get_timestamp(sess, SESSION_TIMESTAMP_START) / 1000; // ms to sec
brief->last_pkt_time_in_sec = session_get_timestamp(sess, SESSION_TIMESTAMP_LAST) / 1000; // ms to sec
const struct tuple6 *tp6 = session_get_tuple6(sess);
memcpy(&brief->addr, tp6, sizeof(struct tuple6));
}
struct iovec show_session_brief_on_worker_thread_rpc_cb(int thread_idx, struct iovec request, void *args)
{
struct iovec response = {};
assert(request.iov_len == sizeof(struct stm_show_session_brief_opt));
struct stm_show_session_brief_opt *show_brief_opt = (struct stm_show_session_brief_opt *)request.iov_base;
uint64_t session_id_array[SHOW_SESSION_BRIEF_LIMIT_MAX];
uint64_t session_id_array_count = SHOW_SESSION_BRIEF_LIMIT_MAX;
struct session_manager *sess_mgr = stellar_module_get_session_manager((struct stellar_module_manager *)args);
struct session_manager_rte *sess_mgr_rt = session_manager_get_runtime(sess_mgr, thread_idx);
session_id_array_count = session_manager_rte_scan_session(sess_mgr_rt, &show_brief_opt->scan_opt, session_id_array, show_brief_opt->limit);
if (session_id_array_count == 0)
{
// no session match the filter params, but no error! still need to build a empty reply!
// go on !!!
response.iov_base = NULL;
response.iov_len = 0;
return response;
}
struct show_session_brief_result *brief_result_array = (struct show_session_brief_result *)calloc(session_id_array_count, sizeof(struct show_session_brief_result));
for (uint32_t i = 0; i < session_id_array_count; i++)
{
struct session *sess = session_manager_rte_lookup_session_by_id(sess_mgr_rt, session_id_array[i]);
assert(sess != NULL);
get_single_session_brief(sess, thread_idx, &brief_result_array[i]);
brief_result_array[i].sid = session_id_array[i];
}
response.iov_base = brief_result_array;
response.iov_len = session_id_array_count;
return response;
}
static sds session_brief_to_readable(const struct show_session_brief_result *sess_brief)
{
char time_buf[64];
char addr_buf[256];
sds ss = sdsempty();
strftime(time_buf, sizeof(time_buf), "%Y-%m-%d %H:%M:%S", localtime(&sess_brief->create_time_in_sec));
ss = sdscatprintf(ss, "%-6d %-21lu %-5s %-8s %-20s %s", sess_brief->thread_index, sess_brief->sid,
(sess_brief->protocol == SESSION_TYPE_TCP) ? "tcp" : "udp",
stm_session_state_ntop(sess_brief->state), time_buf,
stm_get0_readable_session_addr(&sess_brief->addr, addr_buf, sizeof(addr_buf)));
return ss;
}
static struct monitor_reply *show_session_brief_cmd_cb(struct stellar_monitor *monitor, int argc, char *argv[], void *arg)
{
struct stm_show_session_brief_opt brief_opt = {};
if (show_session_is_help(argc, argv))
{
return show_session_brief_usage();
}
struct monitor_reply *error_reply = show_session_brief_cli_args_parse(argc, argv, &brief_opt);
if (error_reply != NULL)
{
return error_reply;
}
struct monitor_reply *cmd_reply;
struct stellar_module_manager *mod_mgr = (struct stellar_module_manager *)arg;
int thread_num = stellar_module_manager_get_max_thread_num(mod_mgr);
struct iovec request;
request.iov_base = (void *)&brief_opt;
request.iov_len = sizeof(struct stm_show_session_brief_opt);
struct iovec response[thread_num];
memset(response, 0, sizeof(response));
size_t tot_sess_id_num = 0;
sds ss = sdsempty();
ss = sdscatprintf(ss, "%-6s %-21s %-5s %-8s %-20s %s\r\n", "thread", "session-id", "proto", "state", "create-time", "tuple4(sip:sport-dip:dport)");
ss = sdscatprintf(ss, "--------------------------------------------------------------------------------------------\r\n");
for (int i = 0; i < thread_num; i++)
{
response[i] = monitor_worker_thread_rpc(monitor, i, request, show_session_brief_on_worker_thread_rpc_cb, mod_mgr);
if (response[i].iov_base == NULL || response[i].iov_len == 0) // empty result
{
continue;
}
struct show_session_brief_result *brief_res_array = (struct show_session_brief_result *)(response[i].iov_base);
tot_sess_id_num += response[i].iov_len; // session_id_array_count
for (size_t j = 0; j < response[i].iov_len; j++)
{
ss = sdscatprintf(ss, "%s\n", session_brief_to_readable(&brief_res_array[j]));
}
if (tot_sess_id_num >= brief_opt.limit)
{
break;
}
}
if (tot_sess_id_num == 0)
{
cmd_reply = monitor_reply_new_string("No session found");
goto empty_result;
}
cmd_reply = monitor_reply_new_string("%s", ss);
empty_result:
sdsfree(ss);
for (int i = 0; i < thread_num; i++)
{
if (response[i].iov_base)
{
free(response[i].iov_base);
}
}
return cmd_reply;
}
/*
todo: add thread id,
fast patch, avoid traversing all worker threads
*/
static struct monitor_reply *show_session_detail_cli_args_parse(int argc, char *argv[], struct stm_show_session_detail_opt *detail_opt)
{
if (argc < 4)
{
return monitor_reply_new_error("missing session id\n");
}
if (argc > 5)
{
return monitor_reply_new_error("too many arguments\n");
}
if (strncasecmp(argv[3], "id", 2) != 0)
{
return monitor_reply_new_error("missing session id\n");
}
if (stm_string_isdigit(argv[4]) == 0)
{
return monitor_reply_new_error("invalid session id: %s. should be integer in range: <1 - UINT64_MAX>\n", argv[4]);
}
detail_opt->sess_id = strtoull(argv[4], NULL, 10);
return NULL;
}
struct iovec show_session_detail_on_worker_thread_rpc_cb(int thread_idx, struct iovec request, void *args)
{
struct iovec response = {};
assert(request.iov_len == sizeof(struct stm_show_session_detail_opt));
struct stm_show_session_detail_opt *detail_opt = (struct stm_show_session_detail_opt *)request.iov_base;
struct session_manager *sess_mgr = stellar_module_get_session_manager((struct stellar_module_manager *)args);
struct session_manager_rte *sess_mgr_rt = session_manager_get_runtime(sess_mgr, thread_idx);
struct session *sess = session_manager_rte_lookup_session_by_id(sess_mgr_rt, detail_opt->sess_id);
if (NULL == sess)
{
return response;
}
struct show_session_detail_result *detail_res = (struct show_session_detail_result *)calloc(1, sizeof(struct show_session_detail_result));
get_single_session_brief(sess, thread_idx, &detail_res->sess_brief);
detail_res->sess_brief.sid = detail_opt->sess_id;
detail_res->direction = session_get_direction(sess);
// todo, get some exact stat, not all
memcpy(detail_res->sess_stat, sess->stats, sizeof(detail_res->sess_stat));
// todo, get application info
response.iov_base = detail_res;
response.iov_len = sizeof(struct show_session_detail_result);
return response;
}
static sds session_detail_to_readable(const struct show_session_detail_result *sess_detail)
{
char addr_buf[256];
sds ss = sdsempty();
char time_buf[64];
#define SHOW_SESSION_DETAIL_NAME_FORMAT "%-30s"
#define SHOW_SESSION_DETAIL_VALUE_FORMAT "%llu"
const char *flow_str[MAX_FLOW_TYPE] = {"C2S flow", "S2C flow"};
ss = sdscatprintf(ss, "%-15s: %lu\r\n", "session-id", sess_detail->sess_brief.sid);
ss = sdscatprintf(ss, "%-15s: %d\r\n", "thread", sess_detail->sess_brief.thread_index);
ss = sdscatprintf(ss, "%-15s: %s\r\n", "state", stm_session_state_ntop(sess_detail->sess_brief.state));
ss = sdscatprintf(ss, "%-15s: %s\r\n", "protocol", (sess_detail->sess_brief.protocol == SESSION_TYPE_TCP) ? "tcp" : "udp");
strftime(time_buf, sizeof(time_buf), "%Y-%m-%d %H:%M:%S", localtime(&sess_detail->sess_brief.create_time_in_sec));
ss = sdscatprintf(ss, "%-15s: %s\r\n", "create-time", time_buf);
strftime(time_buf, sizeof(time_buf), "%Y-%m-%d %H:%M:%S", localtime(&sess_detail->sess_brief.last_pkt_time_in_sec));
ss = sdscatprintf(ss, "%-15s: %s\r\n", "last-pkt-time", time_buf);
ss = sdscatprintf(ss, "%-15s: %s\r\n", "tuple4", stm_get0_readable_session_addr(&sess_detail->sess_brief.addr, addr_buf, sizeof(addr_buf)));
ss = sdscatprintf(ss, "%-15s: %s\r\n", "symmetric", stm_session_flow_dir_ntop(sess_detail->sess_brief.flow_dir));
ss = sdscatprintf(ss, "%-15s: %s\r\n", "direction", sess_detail->direction == SESSION_DIRECTION_INBOUND ? "INBOUND" : "OUTBOUND");
ss = sdscatprintf(ss, "statistics:\r\n");
char printf_format[256];
snprintf(printf_format, sizeof(printf_format), "\t%s : %s\r\n", SHOW_SESSION_DETAIL_NAME_FORMAT, SHOW_SESSION_DETAIL_VALUE_FORMAT);
for (int flow = 0; flow < MAX_FLOW_TYPE; flow++)
{
ss = sdscatprintf(ss, " %s:\r\n", flow_str[flow]);
ss = sdscatprintf(ss, printf_format, "raw_packets_received", sess_detail->sess_stat[flow][STAT_RAW_PACKETS_RECEIVED]);
ss = sdscatprintf(ss, printf_format, "raw_bytes_received", sess_detail->sess_stat[flow][STAT_RAW_BYTES_RECEIVED]);
ss = sdscatprintf(ss, printf_format, "duplicate_packets_bypass", sess_detail->sess_stat[flow][STAT_DUPLICATE_PACKETS_BYPASS]);
ss = sdscatprintf(ss, printf_format, "injected_packets", sess_detail->sess_stat[flow][STAT_INJECTED_PACKETS_SUCCESS]);
ss = sdscatprintf(ss, printf_format, "injected_bytes", sess_detail->sess_stat[flow][STAT_INJECTED_BYTES_SUCCESS]);
if (SESSION_TYPE_TCP == sess_detail->sess_brief.protocol)
{
ss = sdscatprintf(ss, printf_format, "tcp_segments_retransmit", sess_detail->sess_stat[flow][STAT_TCP_SEGMENTS_RETRANSMIT]);
ss = sdscatprintf(ss, printf_format, "tcp_payloads_retransmit", sess_detail->sess_stat[flow][STAT_TCP_PAYLOADS_RETRANSMIT]);
ss = sdscatprintf(ss, printf_format, "tcp_segments_reordered", sess_detail->sess_stat[flow][STAT_TCP_SEGMENTS_REORDERED]);
ss = sdscatprintf(ss, printf_format, "tcp_payloads_reordered", sess_detail->sess_stat[flow][STAT_TCP_PAYLOADS_REORDERED]);
}
}
// todo:
ss = sdscatprintf(ss, "\r\n \033[31mtodo: add security policy rule id, HTTP URL, Server FQDN, etc... \033[0m");
return ss;
}
static struct monitor_reply *show_session_detail_cmd_cb(struct stellar_monitor *stm, int argc, char *argv[], void *arg)
{
struct stm_show_session_detail_opt detail_opt = {};
detail_opt.thread_idx = -1;
if (show_session_is_help(argc, argv))
{
return show_session_detail_usage();
}
struct monitor_reply *error_reply = show_session_detail_cli_args_parse(argc, argv, &detail_opt);
if (error_reply != NULL)
{
return error_reply;
}
struct monitor_reply *cmd_reply;
struct stellar_module_manager *mod_mgr = (struct stellar_module_manager *)arg;
int thread_num = stellar_module_manager_get_max_thread_num(mod_mgr);
struct iovec request;
request.iov_base = (void *)&detail_opt;
request.iov_len = sizeof(struct stm_show_session_detail_opt);
struct iovec response[thread_num];
memset(response, 0, sizeof(response));
size_t tot_sess_id_num = 0;
sds ss = sdsempty();
for (int i = 0; i < thread_num; i++)
{
if (detail_opt.thread_idx != -1 && detail_opt.thread_idx != i)
{
continue;
}
response[i] = monitor_worker_thread_rpc(stm, i, request, show_session_detail_on_worker_thread_rpc_cb, mod_mgr);
if (response[i].iov_base == NULL || response[i].iov_len == 0) // empty result
{
continue;
}
struct show_session_detail_result *detail_res = (struct show_session_detail_result *)(response[i].iov_base);
ss = sdscatprintf(ss, "%s\n", session_detail_to_readable(detail_res));
tot_sess_id_num++;
break;
}
if (tot_sess_id_num == 0)
{
cmd_reply = monitor_reply_new_string("No session found by id %lu", detail_opt.sess_id);
goto empty_result;
}
cmd_reply = monitor_reply_new_string("%s", ss);
empty_result:
sdsfree(ss);
for (int i = 0; i < thread_num; i++)
{
if (response[i].iov_base)
{
free(response[i].iov_base);
}
}
return cmd_reply;
}
int show_session_enforcer_init(struct stellar_module_manager *mod_mgr, struct stellar_monitor *stm)
{
monitor_register_cmd(stm, "show session brief", show_session_brief_cmd_cb, "readonly",
"[sip | dip | sport | dport | protocol | state | cursor | count | limit | create-time-in | last-pkt-time-in ]",
"Show session brief information", (void *)mod_mgr);
monitor_register_cmd(stm, "show session detail", show_session_detail_cmd_cb, "readonly",
"id <id>",
"Show session verbose information", (void *)mod_mgr);
return 0;
}

View File

@@ -0,0 +1,537 @@
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#ifdef __cplusplus
extern "C"
{
#endif
#include "cJSON.h"
#include "sds/sds.h"
#include "monitor_private.h"
#include "monitor_cmd_assistant.h"
#include "monitor_utils.h"
#ifdef __cplusplus
}
#endif
struct stm_cmd_spec
{
const char *cmd_name;
const char *cmd_flags;
const char *cmd_hint;
int cmd_arity;
int cmd_first_key_offset;
};
struct stm_cmd_assistant
{
cJSON *cjson_root;
stm_cmd_assistant_completion_cb *cmd_completion_cb;
stm_cmd_assistant_hints_cb *cmd_hints_cb;
sds hints_result; // mallo and free for every hits
};
static __thread struct stm_cmd_assistant *__g_stm_cli_assistant = NULL;
static cJSON *stm_cli_register_cmd(cJSON *father_next_array, const char *cur_cmd_prefix)
{
cJSON *cur_level_item = NULL;
/* search current cmd (name is prefix) in father->next array[] */
int array_size = cJSON_GetArraySize(father_next_array);
for (int i = 0; i < array_size; i++)
{
cur_level_item = cJSON_GetArrayItem(father_next_array, i);
cJSON *cmd_name_item = cJSON_GetObjectItem(cur_level_item, "prefix");
if (cmd_name_item != NULL && (0 == strcmp(cur_cmd_prefix, cmd_name_item->valuestring)))
{
break;
}
else
{
cur_level_item = NULL;
}
}
if (NULL == cur_level_item)
{
/* if not exist, create new cmd (name is prefix) */
cur_level_item = cJSON_CreateObject();
cJSON_AddItemToObject(cur_level_item, "prefix", cJSON_CreateString(cur_cmd_prefix));
cJSON *new_cmd_next_array = cJSON_CreateArray();
cJSON_AddItemToObject(cur_level_item, "next", new_cmd_next_array);
/* insert into father->next array */
cJSON_AddItemToArray(father_next_array, cur_level_item);
}
else
{
; // already exist, do nothing
}
return cur_level_item;
}
/*
search json object by cli_cmd_line, if exsit , return 0, do nothing,
the search json object function can be used for register_usage();
if not exist, create new json object and insert into father->next array
*/
int stm_cmd_assistant_register(struct stm_cmd_assistant *aide, const char *cli_cmd_line)
{
int argc = 0;
if (NULL == aide->cjson_root)
{
aide->cjson_root = cJSON_CreateArray();
}
sds *array = sdssplitargs(cli_cmd_line, &argc);
cJSON *father_candidate_array = aide->cjson_root;
cJSON *cur_cmd_obj = NULL;
for (int i = 0; i < argc; i++)
{
cur_cmd_obj = stm_cli_register_cmd(father_candidate_array, array[i]);
father_candidate_array = cJSON_GetObjectItem(cur_cmd_obj, "next");
}
sdsfreesplitres(array, argc);
return 0;
}
static cJSON *stm_cli_search_cmd(cJSON *father_next_array, const char *cur_cmd_prefix)
{
cJSON *cur_level_item = NULL;
/* search current cmd (name is prefix) in father->next array[] */
int array_size = cJSON_GetArraySize(father_next_array);
for (int i = 0; i < array_size; i++)
{
cur_level_item = cJSON_GetArrayItem(father_next_array, i);
cJSON *cmd_name_item = cJSON_GetObjectItem(cur_level_item, "prefix");
if (cmd_name_item != NULL && (0 == strcmp(cur_cmd_prefix, cmd_name_item->valuestring)))
{
break;
}
else
{
cur_level_item = NULL;
}
}
return cur_level_item;
}
cJSON *stm_cmd_assistant_search(struct stm_cmd_assistant *aide, const char *cli_cmd_line)
{
int argc = 0;
sds *array = sdssplitargs(cli_cmd_line, &argc);
cJSON *father_candidate_array = aide->cjson_root;
cJSON *match_item = NULL;
for (int i = 0; i < argc; i++)
{
match_item = stm_cli_search_cmd(father_candidate_array, array[i]);
if (NULL == match_item)
{
return NULL;
}
father_candidate_array = cJSON_GetObjectItem(match_item, "next");
}
sdsfreesplitres(array, argc);
return match_item;
}
int stm_cmd_assistant_register_usage(struct stm_cmd_assistant *aide, const char *cli_cmd_line, const char *usage)
{
if (NULL == aide || NULL == cli_cmd_line || NULL == usage)
{
return -1;
}
if (strlen(cli_cmd_line) == 0)
{
cJSON *obj = cJSON_CreateObject();
cJSON_AddItemToObjectCS(obj, "usage", cJSON_CreateString(usage));
cJSON_AddItemToArray(aide->cjson_root, obj);
return 0;
}
cJSON *obj = stm_cmd_assistant_search(aide, cli_cmd_line);
if (NULL == obj)
{
stm_cmd_assistant_register(aide, cli_cmd_line);
obj = stm_cmd_assistant_search(aide, cli_cmd_line);
if (NULL == obj)
{
return -1;
}
}
cJSON_AddItemToObject(obj, "usage", cJSON_CreateString(usage));
return 0;
}
char *stm_cmd_assistant_verbose_print(int format)
{
struct stm_cmd_assistant *aide = __g_stm_cli_assistant;
if (NULL == aide->cjson_root)
{
return (char *)strdup("[]");
}
char *debug_print;
if (format == 1)
{
debug_print = cJSON_Print(aide->cjson_root);
}
else
{
debug_print = cJSON_PrintUnformatted(aide->cjson_root);
}
return debug_print;
}
char *stm_cmd_assistant_brief_print(void)
{
struct stm_cmd_assistant *aide = __g_stm_cli_assistant;
sds s = sdsempty();
s = sdscat(s, "Usage:\r\n");
int array_size = cJSON_GetArraySize(aide->cjson_root);
for (int sz = 0; sz < array_size; sz++)
{
cJSON *item = cJSON_GetArrayItem(aide->cjson_root, sz);
cJSON *cmd_name_item = cJSON_GetObjectItem(item, "prefix");
s = sdscatprintf(s, "\t%s\r\n", cmd_name_item->valuestring);
}
char *print_str = strdup(s);
sdsfree(s);
return print_str;
}
int stm_cmd_assistant_json_load(struct stm_cmd_assistant *aide, const char *json_str)
{
if (NULL == aide || NULL == json_str || strlen(json_str) == 0)
{
return -1;
}
cJSON *root = cJSON_Parse(json_str);
if (NULL == root)
{
return -1;
}
aide->cjson_root = root;
return 0;
}
void stm_cmd_assistant_free(struct stm_cmd_assistant *aide)
{
if (aide)
{
if (aide->cjson_root)
{
cJSON_Delete(aide->cjson_root);
}
if (aide->hints_result)
{
sdsfree(aide->hints_result);
}
free(aide);
}
}
struct stm_cmd_assistant *stm_cmd_assistant_new(void)
{
struct stm_cmd_assistant *aide = calloc(1, sizeof(struct stm_cmd_assistant));
aide->cjson_root = NULL;
aide->hints_result = NULL;
__g_stm_cli_assistant = aide;
return aide;
}
struct stm_cmd_assistant *stm_cmd_assistant_get(void)
{
return __g_stm_cli_assistant;
}
static int stm_cmd_exist(struct stm_cmd_assistant *aide, const char *cmd_name)
{
cJSON *root = aide->cjson_root;
if (NULL == root)
{
return 0;
}
int array_size = cJSON_GetArraySize(root);
for (int sz = 0; sz < array_size; sz++)
{
cJSON *item = cJSON_GetArrayItem(root, sz);
cJSON *cmd_name_item = cJSON_GetObjectItem(item, "prefix");
if (cmd_name_item && cmd_name_item->valuestring)
{
if (strcasecmp(cmd_name, cmd_name_item->valuestring) == 0)
{
return 1;
}
}
}
return 0;
}
/*
* return value:
* 0: success
* -1: failed
* 1: already exist
*/
int stm_cmd_assistant_register_cmd(struct stm_cmd_assistant *aide, const char *cmd_name, void *cmd_cb, void *cmd_arg,
const char *flags, const char *hint, const char *desc)
{
if (NULL == aide || NULL == cmd_name || NULL == cmd_cb)
{
return -1;
}
if (stm_cmd_exist(aide, cmd_name))
{
return 1;
}
if (stm_strncasecmp_exactly(flags, "readonly", 8) != 0 && stm_strncasecmp_exactly(flags, "write", 5) != 0)
{
return -1;
}
cJSON *obj = cJSON_CreateObject();
cJSON_AddItemToObject(obj, "prefix", cJSON_CreateString(cmd_name));
cJSON_AddItemToObject(obj, "flags", cJSON_CreateString(flags));
cJSON_AddItemToObject(obj, "hints", cJSON_CreateString(hint));
cJSON_AddItemToObject(obj, "desc", cJSON_CreateString(desc));
char tmp_str[32] = {};
snprintf(tmp_str, sizeof(tmp_str), "%p", cmd_cb);
cJSON_AddItemToObject(obj, "cmd_cb", cJSON_CreateString(tmp_str));
snprintf(tmp_str, sizeof(tmp_str), "%p", cmd_arg);
cJSON_AddItemToObject(obj, "cmd_arg", cJSON_CreateString(tmp_str));
if (NULL == aide->cjson_root)
{
aide->cjson_root = cJSON_CreateArray();
}
cJSON_AddItemToArray(aide->cjson_root, obj);
return 0;
}
char *stm_cmd_assistant_serialize(struct stm_cmd_assistant *aide)
{
if (NULL == aide)
{
return NULL;
}
char *debug_print = cJSON_Print(aide->cjson_root);
return debug_print;
}
int stm_cmd_assistant_dserialize(struct stm_cmd_assistant *aide, const char *json)
{
cJSON *tmp_root = cJSON_Parse(json);
if (NULL == tmp_root)
{
return -1;
}
aide->cjson_root = tmp_root;
return 0;
}
/*
return value:
0: equal exactly, uesed for hints, or get_cmd_cb()
-1: cli_array < register_cmd_array //raw command1 and command2 line has same section, and command1 line is shorter than command2, used for tab auto completion
1: cli_array > register_cmd_array //raw command1 and command2 line has same section, and command1 line is longer than command2, used for get_cmd_cb()
-2: not match any secions
*/
int stm_cmd_assistant_sds_compare(sds *cli_array, int cli_argc, sds *register_cmd_array, int register_argc)
{
if (0 == cli_argc || 0 == register_argc)
{
return -2;
}
// compare the first n-1 words, must be exactly match
int min_argc = MIN(cli_argc, register_argc);
for (int i = 0; i < min_argc - 1; i++)
{
// previous words must be exactly match, so use strcasecmp()
if (strcasecmp(cli_array[i], register_cmd_array[i]) != 0)
{
return -2;
}
}
// compare the last common word use substring match
if (strncasecmp(cli_array[min_argc - 1], register_cmd_array[min_argc - 1], strlen(cli_array[min_argc - 1])) == 0)
{
if (strcasecmp(cli_array[min_argc - 1], register_cmd_array[min_argc - 1]) == 0)
{
if (cli_argc == register_argc)
{
return 0;
}
else if (cli_argc < register_argc)
{
return -1;
}
else
{
return 1;
}
}
else
{
// cli command is not complete
return -1;
}
}
return -2;
}
int stm_cmd_assistant_set_completion_cb(struct stm_cmd_assistant *aide, stm_cmd_assistant_completion_cb *cb)
{
aide->cmd_completion_cb = cb;
return 0;
}
int stm_cmd_assistant_set_hints_cb(struct stm_cmd_assistant *aide, stm_cmd_assistant_hints_cb *cb)
{
aide->cmd_hints_cb = cb;
return 0;
}
static void cjson_traversal_completion(struct stm_cmd_assistant *aide, sds *sds_cli_array, int cli_argc, cJSON *cjson_root, void *arg)
{
int array_size = cJSON_GetArraySize(cjson_root);
for (int sz = 0; sz < array_size; sz++)
{
cJSON *array_item = cJSON_GetArrayItem(cjson_root, sz);
cJSON *prefix_item = cJSON_GetObjectItem(array_item, "prefix");
int register_cmd_section_num;
sds *register_cmd_array = sdssplitargs(prefix_item->valuestring, &register_cmd_section_num);
int match = stm_cmd_assistant_sds_compare(sds_cli_array, cli_argc, register_cmd_array, register_cmd_section_num);
sdsfreesplitres(register_cmd_array, register_cmd_section_num);
if (match == 0 || match == -1)
{
aide->cmd_completion_cb(arg, prefix_item->valuestring);
}
}
return;
}
static void stm_cmd_assistant_completion_cb_fun(struct stm_cmd_assistant *aide, const char *buf, void *arg)
{
/* input cmd buf must <= registed command
if buf_len < command_len ,auto completion the longest prefix
if buf_len == command_len, add a blank space
*/
int argc = 0;
sds *cli_cmd_array = sdssplitargs(buf, &argc);
cjson_traversal_completion(aide, cli_cmd_array, argc, aide->cjson_root, arg);
sdsfreesplitres(cli_cmd_array, argc);
}
void stm_cmd_assistant_input_line(struct stm_cmd_assistant *aide, const char *line, void *arg)
{
stm_cmd_assistant_completion_cb_fun(aide, line, arg);
}
static const char *cjson_traversal_hints(sds *sds_cli_array, int cli_argc, cJSON *cjson_root)
{
int array_size = cJSON_GetArraySize(cjson_root);
for (int sz = 0; sz < array_size; sz++)
{
cJSON *array_item = cJSON_GetArrayItem(cjson_root, sz);
cJSON *prefix_item = cJSON_GetObjectItem(array_item, "prefix");
int register_cmd_section_num;
sds *register_cmd_array = sdssplitargs(prefix_item->valuestring, &register_cmd_section_num);
int match = stm_cmd_assistant_sds_compare(sds_cli_array, cli_argc, register_cmd_array, register_cmd_section_num);
sdsfreesplitres(register_cmd_array, register_cmd_section_num);
if (match == 0) // hints must be exactly match
{
cJSON *hint_item = cJSON_GetObjectItem(array_item, "hints");
if (hint_item)
{
return hint_item->valuestring;
}
}
}
return NULL;
}
const char *stm_cmd_assistant_input_line_for_hints(struct stm_cmd_assistant *aide, const char *line)
{
int argc = 0;
sds *cli_cmd_array = sdssplitargs(line, &argc);
const char *hints = cjson_traversal_hints(cli_cmd_array, argc, aide->cjson_root);
sdsfreesplitres(cli_cmd_array, argc);
return hints;
}
static long long stm_cmd_assistant_get_item(struct stm_cmd_assistant *aide, const char *cmd_line, const char *json_item_name, const char *format)
{
long long item_value = 0;
int array_size = cJSON_GetArraySize(aide->cjson_root);
int cli_argc;
sds *cli_cmd_array = sdssplitargs(cmd_line, &cli_argc);
int last_match_cmd_section_num = -1;
for (int sz = 0; sz < array_size; sz++)
{
cJSON *array_item = cJSON_GetArrayItem(aide->cjson_root, sz);
cJSON *prefix_item = cJSON_GetObjectItem(array_item, "prefix");
int register_cmd_section_num;
sds *register_cmd_array = sdssplitargs(prefix_item->valuestring, &register_cmd_section_num);
int match = stm_cmd_assistant_sds_compare(cli_cmd_array, cli_argc, register_cmd_array, register_cmd_section_num);
sdsfreesplitres(register_cmd_array, register_cmd_section_num);
if (match >= 0)
{
cJSON *hint_item = cJSON_GetObjectItem(array_item, json_item_name);
if (hint_item)
{
/* longest match */
if (register_cmd_section_num > last_match_cmd_section_num)
{
last_match_cmd_section_num = register_cmd_section_num;
sscanf(hint_item->valuestring, format, &item_value);
}
}
}
}
sdsfreesplitres(cli_cmd_array, cli_argc);
return item_value;
}
void *stm_cmd_assistant_get_cb(struct stm_cmd_assistant *aide, const char *cmd_line)
{
return (void *)stm_cmd_assistant_get_item(aide, cmd_line, "cmd_cb", "%p");
}
void *stm_cmd_assistant_get_user_arg(struct stm_cmd_assistant *aide, const char *cmd_line)
{
return (void *)stm_cmd_assistant_get_item(aide, cmd_line, "cmd_arg", "%p");
}
int stm_cmd_assistant_get_arity(struct stm_cmd_assistant *aide, const char *cmd_line)
{
return (int)stm_cmd_assistant_get_item(aide, cmd_line, "arity", "%d");
}
sds stm_cmd_assistant_list_cmd_brief(struct stm_cmd_assistant *aide)
{
sds res = sdsempty();
int array_size = cJSON_GetArraySize(aide->cjson_root);
for (int sz = 0; sz < array_size; sz++)
{
cJSON *item = cJSON_GetArrayItem(aide->cjson_root, sz);
cJSON *cmd_name_item = cJSON_GetObjectItem(item, "prefix");
cJSON *cmd_desc_item = cJSON_GetObjectItem(item, "desc");
res = sdscatprintf(res, "\"%s\", %s \r\n", cmd_name_item->valuestring, cmd_desc_item ? cmd_desc_item->valuestring : "");
}
return res;
}
sds stm_cmd_assistant_list_cmd_verbose(struct stm_cmd_assistant *aide)
{
if (NULL == aide->cjson_root)
{
return sdsempty();
}
char *json_str = cJSON_PrintUnformatted(aide->cjson_root);
sds res = sdsnew(json_str);
free(json_str);
return res;
}

View File

@@ -0,0 +1,60 @@
#pragma once
#ifdef __cplusplus
extern "C"
{
#endif
#include "cJSON.h"
#include "sds/sds.h"
#ifndef MIN
#define MIN(x, y) ((x) < (y) ? (x) : (y))
#endif
#ifndef MAX
#define MAX(x, y) ((x) > (y) ? (x) : (y))
#endif
struct stm_cmd_assistant;
struct stm_cmd_spec;
struct stm_cmd_assistant *stm_cmd_assistant_new();
void stm_cmd_assistant_free(struct stm_cmd_assistant *aide);
int stm_cmd_assistant_json_load(struct stm_cmd_assistant *aide, const char *json_str);
char *stm_cmd_assistant_brief_print(void);
char *stm_cmd_assistant_verbose_print(int format);
int stm_cmd_assistant_is_help(const char *line);
cJSON *stm_cmd_assistant_search(struct stm_cmd_assistant *aide, const char *cli_cmd_line);
int stm_cmd_assistant_register(struct stm_cmd_assistant *aide, const char *cli_cmd_line);
int stm_cmd_assistant_register_usage(struct stm_cmd_assistant *aide, const char *cli_cmd_line, const char *usage);
// return value shoule be free after uesd.
char *stm_cmd_assistant_serialize(struct stm_cmd_assistant *aide);
int stm_cmd_assistant_dserialize(struct stm_cmd_assistant *aide, const char *json);
/*
* return value:
* 0: success
* -1: failed
* 1: already exist
*/
int stm_cmd_assistant_register_cmd(struct stm_cmd_assistant *aide, const char *cmd, void *cmd_cb, void *cmd_arg,
const char *flags, const char *hint, const char *description);
int stm_cmd_assistant_sds_compare(sds *cli_array, int cli_argc, sds *register_cmd_array, int register_argc);
typedef void(stm_cmd_assistant_completion_cb)(void *arg, const char *candidate_completion);
typedef char *(stm_cmd_assistant_hints_cb)(const char *line);
int stm_cmd_assistant_set_completion_cb(struct stm_cmd_assistant *aide, stm_cmd_assistant_completion_cb *cb);
int stm_cmd_assistant_set_hints_cb(struct stm_cmd_assistant *aide, stm_cmd_assistant_hints_cb *cb);
void stm_cmd_assistant_input_line(struct stm_cmd_assistant *aide, const char *line, void *arg);
const char *stm_cmd_assistant_input_line_for_hints(struct stm_cmd_assistant *aide, const char *line);
struct stm_cmd_assistant *stm_cmd_assistant_get(void);
// struct stm_cmd_spec *stm_cmd_assistant_get_cmd(struct stm_cmd_assistant *aide, const char *cmd);
void *stm_cmd_assistant_get_cb(struct stm_cmd_assistant *aide, const char *cmd);
void *stm_cmd_assistant_get_user_arg(struct stm_cmd_assistant *aide, const char *cmd);
int stm_cmd_assistant_get_arity(struct stm_cmd_assistant *aide, const char *cmd);
sds stm_cmd_assistant_list_cmd_brief(struct stm_cmd_assistant *aide);
sds stm_cmd_assistant_list_cmd_verbose(struct stm_cmd_assistant *aide);
#ifdef __cplusplus
}
#endif

View File

@@ -0,0 +1,331 @@
#pragma once
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <stddef.h>
#include <pthread.h>
#include <netinet/in.h>
#include <pcap/pcap.h>
#ifdef __cplusplus
extern "C"
{
#endif
#include "stellar/monitor.h"
#include "sds/sds.h"
#include "stellar/module.h"
#include "stellar/log.h"
#include <event2/event.h>
#include <event2/listener.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <event2/thread.h>
#include <event2/http.h>
#include "monitor_rpc.h"
/********************************** limit definition *****************************************/
#ifndef STELLAR_MAX_THREAD_NUM
#define STELLAR_MAX_THREAD_NUM (256)
#endif
#define STM_RINGBUF_SIZE (1024 * 1024) /* per thread */
#define STM_CONNECTION_IDLE_TIMEOUT 300 /* How many seconds elapsed without input command, connection will closed */
#define STM_REQUEST_TIMEOUT 5
#define STM_SERVER_LISTEN_IP "127.0.0.1"
#define STM_SERVER_LISTEN_PORT 80
#define STM_TZSP_UDP_PORT 37008 /* default port of TZSP protocol: https://en.wikipedia.org/wiki/TZSP# */
#define STM_SESSION_DEFAULT_SEARCH_COUNT 100 /* if no count params, max search session number */
#define STM_SESSION_DEFAULT_LIMIT_NUM 10 /* if no limit params, max support result session number */
#define STM_SESSION_MAX_LIMIT_NUM 1000
#define STM_UINT64_READABLE_STRING_MAX_LEN 21 /* MAX value is: 18446744073709551615 */
#define STM_UINT32_READABLE_STRING_MAX_LEN 11 /* MAX value is: 4294967295 */
#define STM_CONNECTIVITY_DEFALUT_COUNT 5 /* ping default count */
#define STM_CONNECTIVITY_DEFALUT_SIZE 64 /* ping default bytes */
#define STM_CONNECTIVITY_MAX_COUNT 100 /* ping max count */
#define STM_CONNECTIVITY_MAX_SIZE 65535 /* ping max bytes */
/************************************************************************/
#define STM_CMD_CALLBACK_THREAD_LOCAL_MAGIC (0x1234ABCD)
#define STM_RINGBUF_HDR_MAGIC (0x0ABCD12345678)
#define STM_RINGBUF_THREAD_IDX_SERVER 0
#define STM_RINGBUF_THREAD_IDX_AGENT 1
#define STM_MONITOR_THREAD_ID 0 // There are only two threads, use fix id
#define STM_WORKER_THREAD_ID 1 // There are only two threads, use fix id
#define STM_LOG_MODULE_NAME "monitor"
#define STM_STAT_OUTPUT_PATH "log/monitor.fs4"
#define STM_STAT_OUTPUT_INTERVAL_MS 3000
#define STM_RESTFUL_VERSION "v1"
#define STM_RESTFUL_RESOURCE "stellar_monitor"
#define STM_RESTFUL_URI_CMD_KEY "raw_cmd" // example: http://127.0.0.1:80/v1/stellar_monitor?raw_cmd=show%20session
#define STM_CLIENT_SERVER_SYNC_CMD "show command verbose"
#define STM_CLI_CMD_HINTS_COLOR 90
#define STM_CLI_CMD_HINTS_BOLD 0
#ifndef UNUSED
#define UNUSED __attribute__((unused))
#endif
#ifdef NDEBUG // release version
#define STM_DBG_PRINT(fmt, args...)
#else
#define STM_DBG_PRINT(fmt, args...) fprintf(stderr, fmt, ##args)
#endif
#ifndef CALLOC
#define CALLOC(type, number) ((type *)calloc(sizeof(type), number))
#endif
#ifndef FREE
#define FREE(ptr) \
{ \
if (ptr) \
{ \
free((void *)ptr); \
ptr = NULL; \
} \
}
#endif
#define STM_TIME_START() \
struct timespec __start_time, __end_time; \
unsigned long long diff; \
clock_gettime(CLOCK_MONOTONIC, &__start_time);
#define STM_TIME_DIFF() \
{ \
clock_gettime(CLOCK_MONOTONIC, &__end_time); \
if (__start_time.tv_sec == __end_time.tv_sec) \
{ \
diff = (unsigned long long)(__end_time.tv_nsec - __start_time.tv_nsec); \
} \
diff = ((unsigned long long)__end_time.tv_sec * 1000 * 1000 * 1000 + __end_time.tv_nsec) - ((unsigned long long)__start_time.tv_sec * 1000 * 1000 * 1000 + __start_time.tv_nsec); \
}
#ifndef MIN
#define MIN(x, y) ((x) < (y) ? (x) : (y))
#endif
#ifndef MAX
#define MAX(x, y) ((x) > (y) ? (x) : (y))
#endif
#ifndef TRUE
#define TRUE 1
#endif
#ifndef FALSE
#define FALSE 0
#endif
#define STM_LOG_DEBUG(format, ...) STELLAR_LOG_DEBUG(__thread_local_logger, STM_LOG_MODULE_NAME, format, ##__VA_ARGS__)
#define STM_LOG_INFO(format, ...) STELLAR_LOG_INFO(__thread_local_logger, STM_LOG_MODULE_NAME, format, ##__VA_ARGS__)
#define STM_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, STM_LOG_MODULE_NAME, format, ##__VA_ARGS__)
#define STM_LOG_FATAL(format, ...) STELLAR_LOG_FATAL(__thread_local_logger, STM_LOG_MODULE_NAME, format, ##__VA_ARGS__)
enum stm_http_response_code
{
STM_HTTP_200_OK = 200,
STM_HTTP_204_NO_CONTENT = 204,
STM_HTTP_403_FORBIDDEN = 403,
STM_HTTP_408_REQUEST_TIMEOUT = 408,
STM_HTTP_413_PAYLOAD_TOO_LARGE = 413,
};
enum stm_stat_type
{
STM_STAT_CLI_CONNECTION_NEW,
STM_STAT_CLI_CONNECTION_CLOSE,
STM_STAT_CLI_REQUEST_SUCC,
STM_STAT_CLI_RESPONSE_SUCC,
STM_STAT_CLI_REQUEST_ERR, // RESTFul Syntax error!
STM_STAT_CLI_RESPONSE_ERR, // attention: empty result is not error!
STM_STAT_MAX,
};
struct stm_spinlock;
struct stellar_monitor_config
{
// int thread_count;
size_t ringbuf_size; /* bytes */
int connection_idle_timeout;
int cli_request_timeout;
char *listen_ipaddr;
unsigned short listen_port_host_order;
unsigned short data_link_bind_port_host_order; // for TZSP protocol
int output_interval_ms;
char *output_path;
};
struct stm_key_value_tuple
{
char *key;
char *value;
};
struct stm_key_value
{
int tuple_num;
struct stm_key_value_tuple *tuple;
};
typedef struct evhttp_request stm_network_connection;
struct stm_cmd_transaction
{
struct stm_cmd_request *cmd_req;
struct stm_cmd_reply *cmd_res[STELLAR_MAX_THREAD_NUM]; // multi thread merge to one
};
struct stm_connection_manager
{
struct timeval link_start_time;
struct timeval last_active_time;
struct evhttp_connection *conn;
char peer_ipaddr[INET6_ADDRSTRLEN];
uint16_t peer_port_host_order;
struct stm_connection_manager *next, *prev;
};
struct stm_stat_counter
{
int counter_id;
uint64_t count;
uint64_t bytes;
};
struct stm_stat
{
void *fs4_ins;
struct stm_stat_counter counters[STM_STAT_MAX];
};
struct monitor_connection
{
struct evhttp_connection *current_evconn_ref;
};
/* optional API */
struct monitor_connection;
typedef void(monitor_connection_close_cb)(struct monitor_connection *conn, void *arg);
int monitor_register_connection_close_cb(struct stellar_monitor *monitor, monitor_connection_close_cb *cb, void *arg);
struct stm_conn_close_cb_manager
{
monitor_connection_close_cb *cb;
void *arg;
struct stm_conn_close_cb_manager *next, *prev;
};
struct stm_pktdump_runtime;
struct stellar_monitor
{
struct module_manager *mod_mgr_ref;
struct logger *logger_ref;
int worker_thread_num;
struct stellar_monitor_config *config;
struct stm_cmd_assistant *aide; // reference, share with stellar
struct stm_connection_manager *connection_mgr; // used to tracking all connections, for cli "who" command
struct stm_conn_close_cb_manager *conn_close_mgr;
// struct stm_ringbuf_mgr *ringbuf_mgr[STELLAR_MAX_THREAD_NUM];
struct event_base *evt_base;
// struct event *ev_timeout;
struct evhttp *evt_http_server;
pthread_t evt_main_loop_tid;
struct timeval time_now;
struct stm_stat *stat;
struct stm_spinlock *lock; // for dynamic register command, conn_close_cb
int (*gettime_cb)(struct timeval *tv, struct timezone *tz);
struct monitor_connection current_conn;
struct stm_pktdump_runtime *packet_dump;
struct monitor_rpc **rpc_ins_array; // multir threads
};
enum monitor_reply_type
{
MONITOR_REPLY_INTEGER,
MONITOR_REPLY_DOUBLE,
MONITOR_REPLY_STRING,
MONITOR_REPLY_ERROR,
MONITOR_REPLY_STATUS,
MONITOR_REPLY_NIL,
};
struct monitor_reply
{
enum monitor_reply_type type;
long long integer; /* The integer when type is SWARMKV_REPLY_INTEGER */
double dval; /* The double when type is SWARMKV_REPLY_DOUBLE */
int len; /* Length of string */
char *str;
int http_code;
const char *http_reason;
};
struct monitor_cli_args
{
const char *short_opt;
const char *long_opt;
int require_arg_value;
int value_is_multi_words; // "a b c d e f g"
char *value; // should be free after use
};
/************************************************************************************************************/
/* monitor call gettimeofday(2) by default */
struct stellar_monitor_config *stellar_monitor_config_new(const char *toml);
int stellar_monitor_set_gettime_callback(struct stellar_monitor *stm, int (*gettime_cb)(struct timeval *tv, struct timezone *tz));
struct stellar_monitor *stellar_monitor_get(void);
struct stm_connection_manager *stm_connection_insert(struct evhttp_connection *evconn);
void stm_connection_update(struct stm_connection_manager *conn_mgr, const struct evhttp_connection *evconn);
void stm_connection_delete(struct evhttp_connection *evconn);
const struct stm_connection_manager *stm_connection_search(const struct stm_connection_manager *conn_mgr_head, const struct evhttp_connection *evconn);
struct stm_key_value *stm_cmd_key_value_new(void);
void stm_cmd_key_value_append(struct stm_key_value **kv, const char *key, const char *value);
void stm_cmd_key_value_free(struct stm_key_value *kv);
/************************************** command manager **********************************************/
struct stm_stat *stm_stat_init(struct stellar_monitor *stm);
sds stm_config_print(const struct stellar_monitor_config *config);
void stm_stat_free(struct stm_stat *stat);
void stm_stat_update(struct stm_stat *stat, int thread_idx, enum stm_stat_type type, long long value);
long long stm_get_stat_count(struct stm_stat *stat, enum stm_stat_type type);
long long stm_get_stat_bytes(struct stm_stat *stat, enum stm_stat_type type);
sds monitor_reply_to_string(const struct monitor_reply *reply);
void monitor_reply_free(struct monitor_reply *reply);
int monitor_util_parse_cmd_args(int argc, const char *argv[], struct monitor_cli_args cli_args[], size_t cli_args_array_size);
char *stm_http_url_encode(const char *originalText);
struct stm_spinlock *stm_spinlock_new(void);
void stm_spinlock_lock(struct stm_spinlock *splock);
void stm_spinlock_unlock(struct stm_spinlock *splock);
void stm_spinlock_free(struct stm_spinlock *splock);
struct stm_pktdump_runtime *stm_packet_dump_new(struct stellar_monitor *stm, const struct stellar_monitor_config *config);
void stm_pktdump_enforcer_free(struct stellar_monitor *stm);
struct monitor_rpc *stm_rpc_new(void);
void stm_rpc_free(struct monitor_rpc *rpc_ins);
int stm_rpc_exec(int thread_idx, struct monitor_rpc *rpc_ins);
struct iovec stm_rpc_call(struct monitor_rpc *rpc_ins, struct iovec rpc_request, monitor_rpc_callabck *cb, void *user_args);
void monitor_rpc_free(struct monitor_rpc *rpc_ins);
struct monitor_rpc *monitor_rpc_new(struct stellar_monitor *stm, struct module_manager *mod_mgr);
struct stellar_monitor *monitor_new(const char *toml_file, struct module_manager *mod_mgr, struct logger *logh);
/* Must be called in 'monitor_cmd_cb' context */
struct monitor_connection *monitor_get_current_connection(struct stellar_monitor *monitor);
/* Get the remote address and port associated with this connection. */
int monitor_get_peer_addr(struct monitor_connection *conn, char **peer_ip, unsigned short *peer_port);
/* command enforcer */
int show_session_enforcer_init(struct module_manager *mod_mgr, struct stellar_monitor *stm);
#ifdef __cplusplus
}
#endif

View File

@@ -0,0 +1,137 @@
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <stddef.h>
#include "monitor_private.h"
#include "monitor_ringbuf.h"
struct monitor_ringbuf_wrap
{
ringbuf_t *ringbuf;
char *ringbuf_data;
unsigned long long push_number; // only for statistics
unsigned long long push_bytes; // only for statistics
unsigned long long pop_number; // only for statistics
unsigned long long pop_bytes; // only for statistics
};
ssize_t stm_ringbuf_stream_start(int thread_id, struct monitor_ringbuf_wrap *rbf, size_t require_size)
{
ringbuf_worker_t *rb_worker = ringbuf_register(rbf->ringbuf, thread_id);
ssize_t offset = ringbuf_acquire(rbf->ringbuf, rb_worker, require_size);
if (offset < 0)
{
STM_DBG_PRINT("stm ringbuf stream prealloc buffer(): ringbuf_acquire fail, no valid space!\n");
return 0;
}
return offset;
}
int stm_ringbuf_stream_append(int thread_id, struct monitor_ringbuf_wrap *rbf, size_t rbf_offset, const void *value, size_t len)
{
(void)thread_id;
memcpy(rbf->ringbuf_data + rbf_offset, value, len);
rbf->push_number++;
rbf->push_bytes += len;
return 0;
}
void stm_ringbuf_stream_finish(int thread_id, struct monitor_ringbuf_wrap *rbf)
{
ringbuf_worker_t *rb_worker = ringbuf_register(rbf->ringbuf, thread_id);
ringbuf_produce(rbf->ringbuf, rb_worker);
}
int stm_ringbuf_easy_push(int thread_id, struct monitor_ringbuf_wrap *rbf, const void *push_value /*must continuous*/, size_t push_len)
{
ringbuf_worker_t *rb_worker = ringbuf_register(rbf->ringbuf, thread_id);
ssize_t offset = ringbuf_acquire(rbf->ringbuf, rb_worker, push_len);
if (offset < 0)
{
STM_DBG_PRINT("stm ringbuf easy push(): ringbuf_acquire fail, no valid space!\n");
return -1;
}
memcpy(rbf->ringbuf_data + offset, push_value, push_len);
ringbuf_produce(rbf->ringbuf, rb_worker);
rbf->push_number++;
rbf->push_bytes += push_len;
// STM_DBG_PRINT("stm ringbuf push() success, len:%llu, number:%llu\n", push_len, rbf->push_number);
return 0;
}
void *stm_ringbuf_pop(struct monitor_ringbuf_wrap *rbf, size_t *pop_len)
{
size_t len = 0, offset = 0;
len = ringbuf_consume(rbf->ringbuf, &offset);
if (0 == len)
{
// STM_DBG_PRINT("stm_ringbuf_pop(): not valid data\n");
*pop_len = 0;
return NULL;
}
rbf->pop_number++;
*pop_len = len;
// STM_DBG_PRINT("stm_ringbuf_pop() success, len:%llu, number:%llu\n", len, rbf->pop_number);
return rbf->ringbuf_data + offset;
}
void stm_ringbuf_release(struct monitor_ringbuf_wrap *rbf, int rel_len)
{
ringbuf_release(rbf->ringbuf, rel_len);
rbf->pop_bytes += rel_len;
}
struct monitor_ringbuf_wrap *stm_ringbuf_wrap_new(int thread_tot_num, size_t ringbuf_size)
{
struct monitor_ringbuf_wrap *rbf = (struct monitor_ringbuf_wrap *)calloc(1, sizeof(struct monitor_ringbuf_wrap));
size_t ringbuf_obj_size;
ringbuf_get_sizes(thread_tot_num, &ringbuf_obj_size, NULL);
rbf->ringbuf = (ringbuf_t *)calloc(1, ringbuf_obj_size);
rbf->ringbuf_data = (char *)calloc(1, ringbuf_size);
ringbuf_setup(rbf->ringbuf, thread_tot_num, ringbuf_size);
return rbf;
}
void stm_ringbuf_wrap_free(struct monitor_ringbuf_wrap *rbf)
{
if (NULL == rbf)
{
return;
}
if (rbf->ringbuf)
{
free(rbf->ringbuf);
}
if (rbf->ringbuf_data)
{
free(rbf->ringbuf_data);
}
free(rbf);
}
void stm_ringbuf_get_statistics(const struct monitor_ringbuf_wrap *rbf, unsigned long long *push_number, unsigned long long *push_bytes, unsigned long long *pop_number, unsigned long long *pop_bytes)
{
if (NULL == rbf)
{
return;
}
if (push_number)
{
*push_number = rbf->push_number;
}
if (push_bytes)
{
*push_bytes = rbf->push_bytes;
}
if (pop_number)
{
*pop_number = rbf->pop_number;
}
if (pop_bytes)
{
*pop_bytes = rbf->pop_bytes;
}
}

View File

@@ -0,0 +1,30 @@
#pragma once
#include <stddef.h>
#ifdef __cplusplus
extern "C"
{
#endif
#include "ringbuf/ringbuf.h"
/*
Message format:
|| header | payload ....(variable-length) || header | payload ....(variable-length) || ....
*/
struct monitor_ringbuf_wrap;
struct monitor_ringbuf_wrap *stm_ringbuf_wrap_new(int thread_tot_num, size_t ringbuf_size);
int stm_ringbuf_easy_push(int thread_id, struct monitor_ringbuf_wrap *rbf, const void *value, size_t len);
void stm_ringbuf_release(struct monitor_ringbuf_wrap *rbf, int rel_len);
void *stm_ringbuf_pop(struct monitor_ringbuf_wrap *rbf, size_t *pop_len);
void stm_ringbuf_wrap_free(struct monitor_ringbuf_wrap *rbf);
ssize_t stm_ringbuf_stream_start(int thread_id, struct monitor_ringbuf_wrap *rbf, size_t require_size);
int stm_ringbuf_stream_append(int thread_id, struct monitor_ringbuf_wrap *rbf, size_t rbf_offset, const void *value, size_t len);
void stm_ringbuf_stream_finish(int thread_id, struct monitor_ringbuf_wrap *rbf);
void stm_ringbuf_get_statistics(const struct monitor_ringbuf_wrap *rbf, unsigned long long *push_number,
unsigned long long *push_bytes, unsigned long long *pop_number, unsigned long long *pop_bytes);
#ifdef __cplusplus
}
#endif

115
infra/monitor/monitor_rpc.c Normal file
View File

@@ -0,0 +1,115 @@
#include <stddef.h>
#include "stellar/monitor.h"
#include "monitor_private.h"
#include "monitor_rpc.h"
#include "stellar/module.h"
#define RPC_WORKER_THREAD_BUSY 1
#define RPC_WORKER_THREAD_IDLE 0
struct monitor_rpc_msg_hdr
{
unsigned int type;
unsigned int length; // total messaage length, include this header, = payload length + sizeof(struct monitor_rpc_msg_hdr)
char value[0]; // variable-length, continuous
} __attribute__((packed));
enum monitor_rpc_ringbuf_dir
{ // full duplex, dir: 0: worker thread to monitor thread; 1: monitor thread to worker thread
RPC_RINBUG_DIR_W2M = 0,
RPC_RINBUG_DIR_M2W = 1,
RPC_RINBUG_DIR_MAX = 2,
};
struct monitor_rpc
{
volatile long atomic_val;
monitor_rpc_callabck *rpc_cb;
void *rpc_args;
struct iovec rpc_request;
struct iovec rpc_response;
};
struct iovec stm_rpc_call(struct monitor_rpc *rpc_ins, struct iovec rpc_request, monitor_rpc_callabck *cb, void *user_args)
{
while (__sync_or_and_fetch(&rpc_ins->atomic_val, 0) == RPC_WORKER_THREAD_BUSY)
{
// wait for the last rpc response, not support concurrent rpc yet!
usleep(1000);
}
rpc_ins->rpc_cb = cb;
rpc_ins->rpc_args = user_args;
rpc_ins->rpc_request = rpc_request;
__sync_fetch_and_or(&rpc_ins->atomic_val, 1);
while (__sync_or_and_fetch(&rpc_ins->atomic_val, 0) == RPC_WORKER_THREAD_BUSY)
{
// wait for the rpc response...
usleep(1000);
}
return rpc_ins->rpc_response;
}
int stm_rpc_exec(int thread_idx, struct monitor_rpc *rpc_ins)
{
if (0 == __sync_or_and_fetch(&rpc_ins->atomic_val, RPC_WORKER_THREAD_IDLE))
{
return 0;
}
rpc_ins->rpc_response = rpc_ins->rpc_cb(thread_idx, rpc_ins->rpc_request, rpc_ins->rpc_args);
__sync_fetch_and_and(&rpc_ins->atomic_val, RPC_WORKER_THREAD_IDLE);
return 1;
}
/*
* Communicate between different threads by ringbuf.
*/
struct iovec monitor_worker_thread_rpc(struct stellar_monitor *stm, int worker_thread_idx, struct iovec rpc_request, monitor_rpc_callabck *cb, void *user_args)
{
int worker_thread_num = module_manager_get_max_thread_num(stm->mod_mgr_ref);
if (worker_thread_idx >= worker_thread_num)
{
struct iovec response = {0};
return response;
}
struct monitor_rpc *rpc_ins = stm->rpc_ins_array[worker_thread_idx];
return stm_rpc_call(rpc_ins, rpc_request, cb, user_args);
}
__thread long long rpc_idle_num = 0;
void module_rpc_worker_thread_polling_cb(struct module_manager *mod_mgr, void *polling_arg)
{
struct stellar_monitor *stm = (struct stellar_monitor *)polling_arg;
int thread_idx = module_manager_get_thread_id(mod_mgr);
struct monitor_rpc *rpc_ins = stm->rpc_ins_array[thread_idx];
stm_rpc_exec(thread_idx, rpc_ins);
}
struct monitor_rpc *stm_rpc_new(void)
{
struct monitor_rpc *rpc_ins = (struct monitor_rpc *)calloc(1, sizeof(struct monitor_rpc));
return rpc_ins;
}
void stm_rpc_free(struct monitor_rpc *rpc_ins)
{
if (NULL == rpc_ins)
{
return;
}
free(rpc_ins);
}
struct monitor_rpc *monitor_rpc_new(struct stellar_monitor *stm, struct module_manager *mod_mgr)
{
module_manager_polling_subscribe(mod_mgr, module_rpc_worker_thread_polling_cb, (void *)stm);
return stm_rpc_new();
}
void monitor_rpc_free(struct monitor_rpc *rpc_ins)
{
stm_rpc_free(rpc_ins);
}

View File

@@ -0,0 +1,10 @@
#pragma once
#include <stdint.h>
#include <stddef.h>
#include "stellar/monitor.h"
#include <bits/types/struct_iovec.h>
struct monitor_rpc;
typedef struct iovec(monitor_rpc_callabck)(int worker_thread_idx, struct iovec user_data, void *user_args);
struct iovec monitor_worker_thread_rpc(struct stellar_monitor *monitor, int worker_thread_idx, struct iovec user_data, monitor_rpc_callabck *cb, void *user_args);

View File

@@ -0,0 +1,591 @@
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <stddef.h>
#include <pthread.h>
#include <assert.h>
#include <evhttp.h>
#include <event2/event.h>
#include <event2/keyvalq_struct.h>
#include <sys/queue.h>
#include "stellar/log.h"
#include "monitor_private.h"
#include "monitor_cmd_assistant.h"
#include "monitor/monitor_utils.h"
#include "toml/toml.h"
#include "sds/sds.h"
#include "uthash/utlist.h"
static __thread struct stellar_monitor *__thread_local_stm;
static __thread pthread_t __thread_local_tid;
static __thread pthread_t __stm_libevevt_callback_thread_local_tid;
static __thread struct logger *__stm_thread_local_logger = NULL;
static void stm_save_thread_local_context(struct stellar_monitor *stm)
{
__thread_local_stm = stm;
__thread_local_tid = pthread_self();
__stm_thread_local_logger = stm->logger_ref;
}
static void stm_connection_close_notify(struct stellar_monitor *stm, UNUSED struct evhttp_connection *evconn)
{
struct stm_conn_close_cb_manager *ele, *tmp;
DL_FOREACH_SAFE(stm->conn_close_mgr, ele, tmp)
{
ele->cb(&stm->current_conn, ele->arg);
}
}
static void on_connection_close_cb(UNUSED struct evhttp_connection *ev_conn, UNUSED void *arg)
{
__stm_libevevt_callback_thread_local_tid = pthread_self();
struct stellar_monitor *stm = stellar_monitor_get();
stm->current_conn.current_evconn_ref = ev_conn;
stm_spinlock_lock(stm->lock);
stm_connection_delete(ev_conn);
stm_connection_close_notify(stm, ev_conn);
stm_spinlock_unlock(stm->lock);
char *peer_ip_addr;
uint16_t peer_port;
evhttp_connection_get_peer(ev_conn, &peer_ip_addr, &peer_port);
STELLAR_LOG_INFO(stm->logger_ref, STM_LOG_MODULE_NAME, "cli connection closed, client %s:%u\n", peer_ip_addr, peer_port);
stm_stat_update(stm->stat, stm->worker_thread_num, STM_STAT_CLI_CONNECTION_CLOSE, 1);
stm->current_conn.current_evconn_ref = NULL;
}
static void stm_command_send_reply_by_cstr(struct evhttp_request *request, int http_status_code, const char *reply, UNUSED void *params)
{
struct evbuffer *buffer = evbuffer_new();
evbuffer_add(buffer, reply, strlen(reply));
evhttp_send_reply(request, http_status_code, "OK", buffer);
evbuffer_free(buffer);
}
static void stm_command_send_reply(struct evhttp_request *request, struct monitor_reply *reply)
{
struct evbuffer *buffer = evbuffer_new();
sds reply_str = monitor_reply_to_string(reply);
evbuffer_add(buffer, reply_str, sdslen(reply_str));
evhttp_send_reply(request, reply->http_code, reply->http_reason, buffer);
evbuffer_free(buffer);
sdsfree(reply_str);
monitor_reply_free(reply);
}
static void stm_command_notfound(struct evhttp_request *request, UNUSED void *arg)
{
struct stellar_monitor *stm = stellar_monitor_get();
const char *req_str_uri = evhttp_request_get_uri(request);
struct evkeyvalq headers = {};
evhttp_parse_query(req_str_uri, &headers);
const char *raw_cmd_content = evhttp_find_header(&headers, STM_RESTFUL_URI_CMD_KEY);
stm_command_send_reply(request, monitor_reply_new_error(error_format_unknown_command, raw_cmd_content));
STELLAR_LOG_ERROR(stm->logger_ref, STM_LOG_MODULE_NAME, "invlid http uri: %s\r\n", evhttp_request_get_uri(request));
evhttp_clear_headers(&headers);
}
static void stm_exec_command(struct stellar_monitor *stm, struct evhttp_request *request, const char *cmd_line)
{
stm_spinlock_lock(stm->lock);
monitor_cmd_cb *cmd_cb = stm_cmd_assistant_get_cb(stm->aide, cmd_line);
if (NULL == cmd_cb)
{
stm_command_notfound(request, NULL);
stm_spinlock_unlock(stm->lock);
return;
}
void *cmd_user_arg = stm_cmd_assistant_get_user_arg(stm->aide, cmd_line);
int argc;
sds *cmd_argv = sdssplitargs(cmd_line, &argc);
struct monitor_reply *reply = cmd_cb(stm, argc, cmd_argv, cmd_user_arg);
stm_command_send_reply(request, reply);
sdsfreesplitres(cmd_argv, argc);
stm_spinlock_unlock(stm->lock);
}
static void stm_new_request_cb(struct evhttp_request *request, UNUSED void *privParams)
{
__stm_libevevt_callback_thread_local_tid = pthread_self();
struct stellar_monitor *stm = stellar_monitor_get();
struct evhttp_connection *ev_conn = evhttp_request_get_connection(request);
stm->current_conn.current_evconn_ref = ev_conn;
stm_spinlock_lock(stm->lock);
stm_connection_insert(ev_conn);
stm_spinlock_unlock(stm->lock);
evhttp_connection_set_closecb(ev_conn, on_connection_close_cb, request);
// evhttp_request_set_error_cb(request, on_request_error_cb);
const char *req_str_uri = evhttp_request_get_uri(request);
char *peer_ip_addr;
uint16_t peer_port;
evhttp_connection_get_peer(ev_conn, &peer_ip_addr, &peer_port);
STELLAR_LOG_INFO(stm->logger_ref, STM_LOG_MODULE_NAME, "new cli request, client:%s:%u, uri: %s\n", peer_ip_addr, peer_port, req_str_uri);
struct evkeyvalq headers = {};
evhttp_parse_query(req_str_uri, &headers);
const char *raw_cmd_content = evhttp_find_header(&headers, STM_RESTFUL_URI_CMD_KEY);
if (NULL == raw_cmd_content)
{
stm_command_send_reply_by_cstr(request, HTTP_BADREQUEST, "http uri syntax error\r\n", NULL);
evhttp_clear_headers(&headers);
return;
}
stm_exec_command(stm, request, raw_cmd_content);
evhttp_clear_headers(&headers);
}
static int stm_event_http_init(struct stellar_monitor *stm)
{
// Create a new event handler
stm->evt_base = event_base_new();
// Create a http server using that handler
stm->evt_http_server = evhttp_new(stm->evt_base);
// Limit serving GET requests
evhttp_set_allowed_methods(stm->evt_http_server, EVHTTP_REQ_GET);
char restful_path[256] = {0}; /* must start with '/' */
snprintf(restful_path, sizeof(restful_path), "/%s/%s", STM_RESTFUL_VERSION, STM_RESTFUL_RESOURCE);
evhttp_set_cb(stm->evt_http_server, restful_path, stm_new_request_cb, stm->evt_base);
// Set the callback for anything not recognized
evhttp_set_gencb(stm->evt_http_server, stm_command_notfound, NULL);
if (evhttp_bind_socket(stm->evt_http_server, stm->config->listen_ipaddr, stm->config->listen_port_host_order) != 0)
{
STELLAR_LOG_FATAL(stm->logger_ref, STM_LOG_MODULE_NAME, "Could not bind to %s:%u\r\n", stm->config->listen_ipaddr, stm->config->listen_port_host_order);
return -1;
}
evhttp_set_timeout(stm->evt_http_server, stm->config->connection_idle_timeout);
STELLAR_LOG_INFO(stm->logger_ref, STM_LOG_MODULE_NAME, "accept http uri path: %s\r\n", restful_path);
return 0;
}
static void *stm_event_main_loop(void *arg)
{
struct stellar_monitor *stm = (struct stellar_monitor *)arg;
stm_save_thread_local_context(stm);
event_base_dispatch(stm->evt_base);
return NULL;
}
static void stm_event_http_free(struct stellar_monitor *stm)
{
event_base_loopbreak(stm->evt_base);
pthread_cancel(stm->evt_main_loop_tid);
pthread_join(stm->evt_main_loop_tid, NULL);
evhttp_free(stm->evt_http_server);
// event_free(stm->ev_timeout);
event_base_free(stm->evt_base);
}
static void stm_server_set_default_cfg(struct stellar_monitor_config *config)
{
config->ringbuf_size = STM_RINGBUF_SIZE;
config->connection_idle_timeout = STM_CONNECTION_IDLE_TIMEOUT;
config->cli_request_timeout = STM_REQUEST_TIMEOUT;
config->listen_ipaddr = "0.0.0.0";
config->listen_port_host_order = STM_SERVER_LISTEN_PORT;
config->data_link_bind_port_host_order = STM_TZSP_UDP_PORT;
config->output_interval_ms = STM_STAT_OUTPUT_INTERVAL_MS;
}
int stellar_monitor_set_gettime_callback(struct stellar_monitor *stm, int (*gettime_cb)(struct timeval *tv, struct timezone *tz))
{
if (NULL == gettime_cb)
{
return -1;
}
stm->gettime_cb = gettime_cb;
return 0;
}
struct stellar_monitor_config *stellar_monitor_config_new(const char *toml_file)
{
struct stellar_monitor_config *config = CALLOC(struct stellar_monitor_config, 1);
stm_server_set_default_cfg(config);
int64_t int64_val = 0;
char errbuf[256];
FILE *fp = NULL;
toml_table_t *root = NULL;
toml_table_t *table = NULL;
toml_raw_t ptr = NULL;
fp = fopen(toml_file, "r");
if (fp == NULL)
{
fprintf(stderr, "config file %s open failed, %s", toml_file, strerror(errno));
goto fail_exit;
}
root = toml_parse_file(fp, errbuf, sizeof(errbuf));
if (root == NULL)
{
fprintf(stderr, "config file %s parse failed, %s", toml_file, errbuf);
goto fail_exit;
}
table = toml_table_in(root, "monitor");
if (table == NULL)
{
fprintf(stderr, "config file %s missing [monitor]", toml_file);
goto fail_exit;
}
/* listen_port */
ptr = toml_raw_in(table, "listen_port");
if (ptr != NULL && toml_rtoi(ptr, &int64_val) == 0)
{
if (int64_val < 1 || int64_val > 65535)
{
fprintf(stderr, "invalid monitor.listen_port %ld\n", int64_val);
FREE(config);
goto fail_exit;
}
config->listen_port_host_order = (uint16_t)int64_val;
}
/* data link bind port */
ptr = toml_raw_in(table, "data_link_bind_port");
if (ptr != NULL && toml_rtoi(ptr, &int64_val) == 0)
{
if (int64_val < 1 || int64_val > 65535)
{
fprintf(stderr, "invalid monitor.data_link_bind_port %ld\n", int64_val);
FREE(config);
goto fail_exit;
}
config->data_link_bind_port_host_order = (uint16_t)int64_val;
}
/* connection_idle_timeout */
ptr = toml_raw_in(table, "connection_idle_timeout");
if (ptr != NULL && toml_rtoi(ptr, &int64_val) == 0)
{
if (int64_val < 1 || int64_val > 3600)
{
fprintf(stderr, "invalid monitor.connection_idle_timeout %ld, should be [1, 3600]\n", int64_val);
FREE(config);
goto fail_exit;
}
config->connection_idle_timeout = (int)int64_val;
}
/* cli_request_timeout */
ptr = toml_raw_in(table, "cli_request_timeout");
if (ptr != NULL || toml_rtoi(ptr, &int64_val) == 0)
{
if (int64_val < 1 || int64_val > 360)
{
fprintf(stderr, "invalid monitor.cli_request_timeout %ld, , should be [1, 360]\n", int64_val);
FREE(config);
goto fail_exit;
}
config->cli_request_timeout = (int)int64_val;
}
/* stat */
ptr = toml_raw_in(table, "stat_output_path");
if (ptr == NULL || toml_rtos(ptr, &config->output_path) != 0)
{
config->output_path = strdup(STM_STAT_OUTPUT_PATH);
}
ptr = toml_raw_in(table, "stat_output_interval_ms");
if (ptr != NULL && toml_rtoi(ptr, &int64_val) == 0)
{
if (int64_val < 1000 || int64_val > 1000 * 60)
{
fprintf(stderr, "invalid monitor.stat_output_interval_ms %ld, , should be [1, 600000]\n", int64_val);
FREE(config);
goto fail_exit;
}
config->output_interval_ms = (int)int64_val;
}
fail_exit:
if (root)
{
toml_free(root);
}
if (fp)
{
fclose(fp);
}
return config;
}
struct stellar_monitor *stellar_monitor_get(void)
{
if (pthread_self() != __thread_local_tid)
{
assert(0);
// fprintf(stderr, "ERR stellar_monitor_get() failed, caller must in same thread context!\n");
return NULL;
}
return __thread_local_stm;
}
// support dynamic register command, independent of the order of initialization
int monitor_register_cmd(struct stellar_monitor *stm, const char *cmd, monitor_cmd_cb *cb, const char *flags,
const char *hint, const char *desc, void *arg)
{
stm_spinlock_lock(stm->lock);
int ret = stm_cmd_assistant_register_cmd(stm->aide, cmd, cb, arg, flags, hint, desc);
stm_spinlock_unlock(stm->lock);
return ret;
}
int monitor_register_connection_close_cb(struct stellar_monitor *stm, monitor_connection_close_cb *cb, void *arg)
{
stm_spinlock_lock(stm->lock);
struct stm_conn_close_cb_manager *ele = CALLOC(struct stm_conn_close_cb_manager, 1);
ele->cb = cb;
ele->arg = arg;
DL_APPEND(stm->conn_close_mgr, ele);
stm_spinlock_unlock(stm->lock);
return 0;
}
static struct monitor_reply *monitor_cmd_show_brief_cb(struct stellar_monitor *stm)
{
sds cmd_brief = sdsempty();
cmd_brief = sdscatfmt(cmd_brief, "%s, %s\r\n", "\"command\"", "description");
cmd_brief = sdscatfmt(cmd_brief, "-----------------------------\r\n");
sds cmd_brief_cont = stm_cmd_assistant_list_cmd_brief(stm->aide);
cmd_brief = sdscatsds(cmd_brief, cmd_brief_cont);
struct monitor_reply *reply = monitor_reply_new_string("%s", cmd_brief);
sdsfree(cmd_brief);
sdsfree(cmd_brief_cont);
return reply;
}
static struct monitor_reply *monitor_cmd_show_verbose_cb(struct stellar_monitor *stm)
{
sds cmd_verbose = stm_cmd_assistant_list_cmd_verbose(stm->aide);
struct monitor_reply *reply = monitor_reply_new_string("%s", cmd_verbose);
sdsfree(cmd_verbose);
return reply;
}
static struct monitor_reply *monitor_server_builtin_show_command_cb(struct stellar_monitor *stm UNUSED, int argc, char *argv[], UNUSED void *arg)
{
if (argc != 3)
{
return monitor_reply_new_error(error_format_wrong_number_of_args, "show command");
}
if (stm_strncasecmp_exactly(argv[2], "brief", 5) == 0)
{
return monitor_cmd_show_brief_cb((struct stellar_monitor *)arg);
}
else if (stm_strncasecmp_exactly(argv[2], "verbose", 7) == 0)
{
return monitor_cmd_show_verbose_cb((struct stellar_monitor *)arg);
}
return monitor_reply_new_error(error_format_unknown_arg, argv[2]);
}
static struct monitor_reply *monitor_server_builtin_ping_cb(struct stellar_monitor *stm UNUSED, int argc, char *argv[], UNUSED void *arg)
{
if (argc == 1)
{
return monitor_reply_new_string("pong");
}
else if (argc == 2)
{
return monitor_reply_new_string("%s", argv[1]);
}
return monitor_reply_new_error(error_format_wrong_number_of_args, "ping");
}
static struct monitor_reply *monitor_server_builtin_who_cb(struct stellar_monitor *stm, int argc UNUSED, char *argv[] UNUSED, UNUSED void *arg)
{
struct stm_connection_manager *conn_mgr = stm->connection_mgr;
struct stm_connection_manager *ele, *tmp;
sds who = sdsempty();
char timestr[64];
DL_FOREACH_SAFE(conn_mgr, ele, tmp)
{
struct timeval tv = ele->link_start_time;
struct tm *tm = localtime(&tv.tv_sec);
strftime(timestr, sizeof(timestr), "%Y-%m-%d %H:%M:%S", tm);
who = sdscatprintf(who, "%s %s:%u", timestr, ele->peer_ipaddr, ele->peer_port_host_order);
if (stm->current_conn.current_evconn_ref == ele->conn)
{
who = sdscat(who, "\033[1m [current]\033[0m");
}
who = sdscat(who, "\r\n");
}
sdsIncrLen(who, -2); // delete last \r\n
struct monitor_reply *reply = monitor_reply_new_string("%s", who);
sdsfree(who);
return reply;
}
static int stm_builtin_cmd_register(struct stellar_monitor *stm)
{
int ret = 0;
ret += monitor_register_cmd(stm, "show command", monitor_server_builtin_show_command_cb, "readonly", "[ brief|verbose ]", "show all registered commands info", (void *)stm);
assert(ret == 0);
ret += monitor_register_cmd(stm, "who", monitor_server_builtin_who_cb, "readonly", "<cr>", "show who is logged on", (void *)stm);
assert(ret == 0);
ret += monitor_register_cmd(stm, "ping", monitor_server_builtin_ping_cb, "readonly", "[message]", "ping the server", (void *)stm);
assert(ret == 0);
return ret;
}
struct monitor_connection *monitor_get_current_connection(struct stellar_monitor *monitor)
{
if (__stm_libevevt_callback_thread_local_tid != pthread_self())
{
return NULL;
}
return &monitor->current_conn;
}
int monitor_get_peer_addr(struct monitor_connection *conn, char **peer_ip, uint16_t *peer_port)
{
if (NULL == conn || conn->current_evconn_ref == NULL)
{
if (peer_ip)
{
*peer_ip = NULL;
}
if (peer_port)
{
*peer_port = 0;
}
return -1;
}
evhttp_connection_get_peer(conn->current_evconn_ref, peer_ip, peer_port);
return 0;
}
void monitor_free(struct stellar_monitor *stm)
{
STELLAR_LOG_FATAL(stm->logger_ref, STM_LOG_MODULE_NAME, "free and exit\n");
stm_event_http_free(stm);
stm_stat_free(stm->stat);
stm_cmd_assistant_free(stm->aide);
stm_spinlock_free(stm->lock);
if (stm->rpc_ins_array)
{
for (int tid = 0; tid < stm->worker_thread_num; tid++)
{
monitor_rpc_free(stm->rpc_ins_array[tid]);
}
free(stm->rpc_ins_array);
}
__thread_local_stm = NULL;
FREE(stm->config->output_path);
FREE(stm->config);
FREE(stm);
}
struct stellar_monitor *monitor_module_to_monitor(struct module *monitor_module)
{
if (monitor_module == NULL)
{
return NULL;
}
return (struct stellar_monitor *)module_get_ctx(monitor_module);
}
struct stellar_monitor *stellar_module_get_monitor(struct module_manager *mod_mgr)
{
assert(mod_mgr);
struct module *monitor_mod = module_manager_get_module(mod_mgr, MONITOR_MODULE_NAME);
return monitor_module_to_monitor(monitor_mod);
}
void monitor_on_exit(struct module_manager *mod_mgr __attribute__((unused)), struct module *mod)
{
if (mod)
{
struct stellar_monitor *stm = module_get_ctx(mod);
monitor_free(stm);
module_free(mod);
}
}
struct stellar_monitor *monitor_new(const char *toml_file, struct module_manager *mod_mgr, struct logger *logh)
{
struct stellar_monitor *stm = (struct stellar_monitor *)calloc(1, sizeof(struct stellar_monitor));
stm->logger_ref = logh;
stm->mod_mgr_ref = mod_mgr;
struct stellar_monitor_config *config = stellar_monitor_config_new(toml_file);
if (NULL == config)
{
STELLAR_LOG_FATAL(logh, STM_LOG_MODULE_NAME, "get config failed!\n");
goto fail_exit;
}
stm->config = config;
stm->worker_thread_num = module_manager_get_max_thread_num(mod_mgr);
stm->lock = stm_spinlock_new();
stm->worker_thread_num = module_manager_get_max_thread_num(mod_mgr);
assert(stm->worker_thread_num > 0);
stm->gettime_cb = gettimeofday;
stm->aide = stm_cmd_assistant_new();
if (stm_event_http_init(stm) < 0)
{
STELLAR_LOG_FATAL(stm->logger_ref, STM_LOG_MODULE_NAME, "libevent http server init() failed!\n");
goto fail_exit;
}
stm->stat = stm_stat_init(stm);
stm_builtin_cmd_register(stm);
stm_save_thread_local_context(stm);
pthread_create(&stm->evt_main_loop_tid, NULL, stm_event_main_loop, (void *)stm);
sds config_print = stm_config_print(config);
STELLAR_LOG_FATAL(stm->logger_ref, STM_LOG_MODULE_NAME, "config: %s\n", config_print);
sdsfree(config_print);
stm->rpc_ins_array = (struct monitor_rpc **)calloc(stm->worker_thread_num, sizeof(struct monitor_rpc *));
for (int tid = 0; tid < stm->worker_thread_num; tid++)
{
stm->rpc_ins_array[tid] = monitor_rpc_new(stm, mod_mgr);
if (stm->rpc_ins_array[tid] == NULL)
{
STELLAR_LOG_FATAL(stm->logger_ref, STM_LOG_MODULE_NAME, "rpc init failed\n");
goto fail_exit;
}
}
stm_save_thread_local_context(stm);
return stm;
fail_exit:
monitor_free(stm);
return NULL;
}
struct module *monitor_on_init(struct module_manager *mod_mgr)
{
assert(mod_mgr);
const char *toml_file = module_manager_get_toml_path(mod_mgr);
assert(toml_file);
struct logger *logh = module_manager_get_logger(mod_mgr);
assert(logh);
struct stellar_monitor *stm = monitor_new(toml_file, mod_mgr, logh);
struct module *stm_mod = module_new(MONITOR_MODULE_NAME, (void *)stm);
if (stm_mod == NULL)
{
STELLAR_LOG_FATAL(logh, STM_LOG_MODULE_NAME, "moudule new '%s' fail\n", MONITOR_MODULE_NAME);
monitor_free(stm);
return NULL;
}
// show_session_enforcer_init(mod_mgr, stm);
// stm_pktdump_enforcer_init(mod_mgr, stm);
STELLAR_LOG_FATAL(logh, STM_LOG_MODULE_NAME, "init success\n");
return stm_mod;
}

View File

@@ -0,0 +1,72 @@
// https://www.cs.utexas.edu/~pingali/CS378/2015sp/lectures/Spinlocks%20and%20Read-Write%20Locks.htm
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <stddef.h>
#include <pthread.h>
#if 0 /* use gcc builtin function */
struct stm_spinlock
{
long value;
};
struct stm_spinlock *stm_spinlock_new(void)
{
struct stm_spinlock *splock = (struct stm_spinlock *)calloc(1, sizeof(struct stm_spinlock));
return splock;
}
void stm_spinlock_lock(struct stm_spinlock *splock)
{
while (__sync_lock_test_and_set(&splock->value, 1))
{
}
}
void stm_spinlock_unlock(struct stm_spinlock *splock)
{
__sync_lock_release(&splock->value);
}
void stm_spinlock_free(struct stm_spinlock *splock)
{
if (splock)
{
free(splock);
}
}
#else /* pthread spin lock */
struct stm_spinlock
{
pthread_spinlock_t lock_ins;
};
struct stm_spinlock *stm_spinlock_new(void)
{
struct stm_spinlock *splock = (struct stm_spinlock *)calloc(1, sizeof(struct stm_spinlock));
pthread_spin_init(&splock->lock_ins, PTHREAD_PROCESS_PRIVATE);
return splock;
}
void stm_spinlock_lock(struct stm_spinlock *splock)
{
pthread_spin_lock(&splock->lock_ins);
}
void stm_spinlock_unlock(struct stm_spinlock *splock)
{
pthread_spin_unlock(&splock->lock_ins);
}
void stm_spinlock_free(struct stm_spinlock *splock)
{
if (splock)
{
pthread_spin_destroy(&splock->lock_ins);
free(splock);
}
}
#endif

View File

@@ -0,0 +1,8 @@
#pragma once
struct stm_spinlock;
struct stm_spinlock *stm_spinlock_new(void);
void stm_spinlock_lock(struct stm_spinlock *splock);
void stm_spinlock_unlock(struct stm_spinlock *splock);
void stm_spinlock_free(struct stm_spinlock *splock);

View File

@@ -0,0 +1,76 @@
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <stddef.h>
#include <arpa/inet.h>
#include <getopt.h>
#include <evhttp.h>
#include "monitor_private.h"
#include <fieldstat/fieldstat_easy.h>
static const char *stm_stat_field_name[] = {
"connection_new",
"connection_close",
"request_succ",
"request_err",
"response_succ",
"response_err",
NULL,
};
long long stm_get_stat_count(struct stm_stat *stat, enum stm_stat_type type)
{
if ((int)type < 0 || type >= STM_STAT_MAX)
{
return 0;
}
return stat->counters[type].count;
}
long long stm_get_stat_bytes(struct stm_stat *stat, enum stm_stat_type type)
{
if ((int)type < 0 || type >= STM_STAT_MAX)
{
return 0;
}
return stat->counters[type].bytes;
}
void stm_stat_update(struct stm_stat *stat, int thread_idx, enum stm_stat_type type, long long value)
{
if ((int)type < 0 || type >= STM_STAT_MAX)
{
return;
}
fieldstat_easy_counter_incrby(stat->fs4_ins, thread_idx, stat->counters[type].counter_id, NULL, 0, value);
}
struct stm_stat *stm_stat_init(struct stellar_monitor *stm)
{
const struct stellar_monitor_config *config = stm->config;
assert(sizeof(stm_stat_field_name) / sizeof(stm_stat_field_name[0]) == STM_STAT_MAX + 1);
struct stm_stat *stat = CALLOC(struct stm_stat, 1);
/* worker thread count + 1, reserved for libevent callback thread context */
stat->fs4_ins = fieldstat_easy_new(stm->worker_thread_num + 1, "monitor", NULL, 0);
for (int i = 0; stm_stat_field_name[i] != NULL; i++)
{
stat->counters[i].counter_id = fieldstat_easy_register_counter(stat->fs4_ins, stm_stat_field_name[i]);
}
fieldstat_easy_enable_auto_output(stat->fs4_ins, config->output_path, MAX(config->output_interval_ms / 1000, 1));
return stat;
}
void stm_stat_free(struct stm_stat *stat)
{
if (NULL == stat)
{
return;
}
if (stat->fs4_ins)
{
fieldstat_easy_free(stat->fs4_ins);
}
FREE(stat);
}

View File

@@ -0,0 +1,83 @@
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <stddef.h>
#include <pthread.h>
#include "uthash/utlist.h"
#include "monitor_private.h"
/******************************************** connection manager *****************************************/
struct stm_connection_manager *stm_connection_insert(struct evhttp_connection *evconn)
{
struct stellar_monitor *stm = stellar_monitor_get();
struct stm_connection_manager *conn_mgr = stm->connection_mgr;
struct stm_connection_manager *ele, *tmp;
DL_FOREACH_SAFE(conn_mgr, ele, tmp) // check if current connection already exist
{
if (ele->conn == evconn)
{
stm->gettime_cb(&ele->last_active_time, NULL);
return ele;
}
}
stm_stat_update(stm->stat, stm->worker_thread_num, STM_STAT_CLI_CONNECTION_NEW, 1);
struct stm_connection_manager *new_conn = (struct stm_connection_manager *)calloc(1, sizeof(struct stm_connection_manager));
char *tmp_ip_addr;
uint16_t tmp_port;
evhttp_connection_get_peer(evconn, &tmp_ip_addr, &tmp_port);
if (tmp_ip_addr)
{
strncpy(new_conn->peer_ipaddr, tmp_ip_addr, INET6_ADDRSTRLEN - 1);
}
new_conn->peer_port_host_order = tmp_port;
stm->gettime_cb(&new_conn->link_start_time, NULL);
new_conn->last_active_time = new_conn->link_start_time;
new_conn->conn = evconn;
DL_APPEND(stm->connection_mgr, new_conn);
return new_conn;
}
void stm_connection_delete(struct evhttp_connection *evconn)
{
struct stellar_monitor *stm = stellar_monitor_get();
struct stm_connection_manager *conn_mgr = stm->connection_mgr;
struct stm_connection_manager *ele, *tmp;
DL_FOREACH_SAFE(conn_mgr, ele, tmp)
{
if (ele->conn == evconn)
{
DL_DELETE(conn_mgr, ele);
FREE(ele);
}
}
stm->connection_mgr = conn_mgr;
}
void stm_connection_update(struct stm_connection_manager *conn_mgr, const struct evhttp_connection *evconn)
{
struct stellar_monitor *stm = stellar_monitor_get();
struct stm_connection_manager *ele, *tmp;
DL_FOREACH_SAFE(conn_mgr, ele, tmp)
{
if (ele->conn == evconn)
{
stm->gettime_cb(&ele->last_active_time, NULL);
}
}
}
const struct stm_connection_manager *stm_connection_search(const struct stm_connection_manager *conn_mgr_head, const struct evhttp_connection *evconn)
{
const struct stm_connection_manager *ele, *tmp;
DL_FOREACH_SAFE(conn_mgr_head, ele, tmp)
{
if (ele->conn == evconn)
{
return ele;
}
}
return NULL;
}

View File

@@ -0,0 +1,672 @@
#include "sds/sds.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <ctype.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <netinet/udp.h>
#include <netinet/ip.h>
#include <netinet/ip6.h>
#include <netinet/in.h>
#include <threads.h>
#ifdef __cplusplus
extern "C"
{
#endif
#include "monitor_private.h"
#include "stellar/session.h"
#include "tuple/tuple.h"
struct stm_key_value *stm_cmd_key_value_new(void)
{
struct stm_key_value *kv = (struct stm_key_value *)calloc(1, sizeof(struct stm_key_value));
kv->tuple_num = 0;
kv->tuple = NULL;
return kv;
}
void stm_cmd_key_value_append(struct stm_key_value **kv, const char *key, const char *value)
{
if (NULL == *kv)
{
*kv = stm_cmd_key_value_new();
}
struct stm_key_value *new_kv = *kv;
new_kv->tuple = (struct stm_key_value_tuple *)realloc(new_kv->tuple, (new_kv->tuple_num + 1) * sizeof(struct stm_key_value_tuple));
new_kv->tuple[new_kv->tuple_num].key = strdup(key);
new_kv->tuple[new_kv->tuple_num].value = strdup(value);
new_kv->tuple_num++;
*kv = new_kv;
}
void stm_cmd_key_value_free(struct stm_key_value *kv)
{
if (NULL == kv)
{
return;
}
for (int i = 0; i < kv->tuple_num; i++)
{
FREE(kv->tuple[i].key);
FREE(kv->tuple[i].value);
}
FREE(kv->tuple);
FREE(kv);
}
void stm_cmd_request_free(struct stm_cmd_request *req)
{
FREE(req);
}
int stm_strncasecmp_exactly(const char *s1, const char *s2, size_t n)
{
if (NULL == s1 || NULL == s2)
{
return -1;
}
size_t len1 = strlen(s1);
size_t len2 = strlen(s2);
if (len1 != len2 || len1 != n)
{
return -1;
}
return strncasecmp(s1, s2, n);
}
const char *stm_session_state_ntop(enum session_state state)
{
switch (state)
{
case SESSION_STATE_OPENING:
return "opening";
break;
case SESSION_STATE_ACTIVE:
return "active";
break;
case SESSION_STATE_CLOSING:
return "closing";
break;
default:
break;
}
return "unknown";
}
const char *stm_session_flow_dir_ntop(uint32_t dir)
{
if (SESSION_SEEN_C2S_FLOW == dir)
{
return "C2S";
}
else if (SESSION_SEEN_S2C_FLOW == dir)
{
return "S2C";
}
else if ((SESSION_SEEN_C2S_FLOW | SESSION_SEEN_S2C_FLOW) == dir)
{
return "BIDIRECTION";
}
return "UNKNOWN";
}
int stm_time_range_pton(const char *time_str, time_t now, time_t time_range[2])
{
const char *delim = "-";
char *save_ptr;
const char *time_str_min_example = "last-1-days";
if (NULL == time_str || NULL == time_range)
{
return -1;
}
if (strlen(time_str) < strlen(time_str_min_example))
{
return -1;
}
char *local_time_str = strdup(time_str);
char *fix_prefix = strtok_r(local_time_str, delim, &save_ptr);
if (stm_strncasecmp_exactly(fix_prefix, "last", 4) != 0)
{
free(local_time_str);
return -1;
}
char *number_string = strtok_r(NULL, delim, &save_ptr);
if (NULL == number_string)
{
free(local_time_str);
return -1;
}
long number = strtol(number_string, NULL, 10);
if (number <= 0 || number > 3600)
{
free(local_time_str);
return -1;
}
char *unit_string = strtok_r(NULL, delim, &save_ptr);
if (NULL == unit_string)
{
free(local_time_str);
return -1;
}
long long multiple_second = 1;
if (stm_strncasecmp_exactly(unit_string, "seconds", 7) == 0)
{
multiple_second = 1;
}
else if (stm_strncasecmp_exactly(unit_string, "minutes", 7) == 0)
{
multiple_second = 60;
}
else if (stm_strncasecmp_exactly(unit_string, "hours", 5) == 0)
{
multiple_second = 60 * 60;
}
else if (stm_strncasecmp_exactly(unit_string, "days", 4) == 0)
{
multiple_second = 60 * 60 * 24;
}
else
{
free(local_time_str);
return -1;
}
if ((long long)now < number * multiple_second)
{
time_range[0] = 0;
}
else
{
time_range[0] = now - number * multiple_second;
}
time_range[1] = now;
while (strtok_r(NULL, delim, &save_ptr))
{
}
free(local_time_str);
return 0;
}
int stm_time_in_range(time_t t, const time_t time_range[2])
{
if (NULL == time_range)
{
return 0;
}
if (t >= time_range[0] && t <= time_range[1])
{
return 1;
}
return 0;
}
uint32_t stm_inet_pton(const char *ipstr, void *ipv4_value, void *ipv6_value)
{
unsigned int tmp_ipv4;
struct in6_addr tmp_ipv6;
if (NULL == ipstr || NULL == ipv4_value || NULL == ipv6_value)
{
return 0;
}
if (inet_pton(AF_INET, ipstr, &tmp_ipv4) == 1)
{
memcpy(ipv4_value, &tmp_ipv4, sizeof(int));
return AF_INET;
}
if (inet_pton(AF_INET6, ipstr, &tmp_ipv6) == 1)
{
memcpy(ipv6_value, &tmp_ipv6, sizeof(struct in6_addr));
return AF_INET6;
}
return 0;
}
/**
* Calculate a 128-bit mask given a network prefix.
*/
void in6_addr_mask(struct in6_addr *mask, uint8_t bits)
{
for (uint8_t i = 0; i < sizeof(mask->s6_addr); i++)
{
mask->s6_addr[i] = bits ? (uint8_t)0xFF << (8 - (bits > 8 ? 8 : bits)) : 0;
if (bits < 8)
bits = 0;
else
bits -= 8;
}
}
/**
* Calculate the first address in a network given a mask.
*/
void in6_addr_network(struct in6_addr *network, const struct in6_addr *addr, const struct in6_addr *mask)
{
for (uint8_t i = 0; i < sizeof(network->s6_addr); i++)
{
network->s6_addr[i] = addr->s6_addr[i] & mask->s6_addr[i];
}
}
/**
* Calculate the last address in a network given a mask.
*/
void in6_addr_end(struct in6_addr *end, const struct in6_addr *addr, const struct in6_addr *mask)
{
for (uint8_t i = 0; i < sizeof(end->s6_addr); i++)
{
end->s6_addr[i] = (addr->s6_addr[i] & mask->s6_addr[i]) | ~mask->s6_addr[i];
}
}
int stm_ipv4_cidr_to_range(uint32_t ipaddr, uint32_t ipmask, uint32_t iprange[2])
{
uint32_t network_addr = ipaddr & ipmask;
uint32_t broadcast_addr = (ipaddr & ipmask) | ~ipmask;
iprange[0] = network_addr;
iprange[1] = broadcast_addr;
return 0;
}
int stm_ipv6_cidr_to_range(const struct in6_addr *ipaddr, const struct in6_addr *ipmask, struct in6_addr iprange[2])
{
struct in6_addr network_addr = {};
struct in6_addr broadcast_addr = {};
in6_addr_network(&network_addr, ipaddr, ipmask);
in6_addr_end(&broadcast_addr, ipaddr, ipmask);
memcpy(&iprange[0], &network_addr, sizeof(struct in6_addr));
memcpy(&iprange[1], &broadcast_addr, sizeof(struct in6_addr));
return 0;
}
uint32_t stm_ip_cidr_pton(const char *ipcidr, void *ipv4_value, void *ipv4_mask, void *ipv6_value, void *ipv6_mask)
{
int ipver = 0;
const char *delim = "/";
char *save_ptr;
if (NULL == ipcidr || NULL == ipv4_value || NULL == ipv4_mask || NULL == ipv6_value || NULL == ipv6_mask)
{
return 0;
}
if (strchr(ipcidr, '/') == NULL)
{
return 0;
}
char *local_ip_cidr = strdup(ipcidr);
char *ipaddr = strtok_r(local_ip_cidr, delim, &save_ptr);
ipver = stm_inet_pton(ipaddr, ipv4_value, ipv6_value);
if (ipver != AF_INET && ipver != AF_INET6)
{
free(local_ip_cidr);
return 0;
}
char *cidr = strtok_r(NULL, delim, &save_ptr);
if (NULL == cidr)
{
free(local_ip_cidr);
return 0;
}
int cidr_num = atoi(cidr);
if (ipver == AF_INET)
{
if (cidr_num <= 0 || cidr_num > 32)
{
free(local_ip_cidr);
return 0;
}
uint32_t mask = 0;
for (int i = 0; i < cidr_num; i++)
{
mask |= (uint32_t)1 << (31 - i);
}
mask = ntohl(mask);
memcpy(ipv4_mask, &mask, sizeof(int));
}
else if (ipver == AF_INET6)
{
if (cidr_num <= 0 || cidr_num > 128)
{
free(local_ip_cidr);
return -1;
}
struct in6_addr mask = {};
for (int i = 0; i < cidr_num; i++)
{
mask.s6_addr[i / 8] |= 1 << (7 - i % 8);
}
memcpy(ipv6_mask, &mask, sizeof(struct in6_addr));
}
while (strtok_r(NULL, delim, &save_ptr))
;
free(local_ip_cidr);
return ipver;
}
void stm_mem_fill_rand(char *buf, size_t len, unsigned char range_min, unsigned char range_max)
{
unsigned char *p = (unsigned char *)buf;
for (size_t i = 0; i < len; i++)
{
p[i] = (rand() % range_min + 1) + (range_max - range_min);
}
}
int stm_string_isdigit(const char *str)
{
if (NULL == str || strlen(str) == 0)
{
return 0;
}
for (size_t i = 0; i < strlen(str); i++)
{
if (!isdigit(str[i]))
{
return 0;
}
}
return 1;
}
/*
* input("hello, world!", "hello", "hi", &output) -> output = "hi, world!"
* return: the number of sub string have been replaced.
*/
int stm_replace_str(const char *raw, const char *search, const char *replace, char **outline)
{
const char *p = NULL;
int search_len = strlen(search);
int replace_len = strlen(replace);
int raw_len = strlen(raw);
p = strstr(raw, search);
if (p == NULL)
{
return 0;
}
const char *remain_ptr = p + search_len;
char *new_line = (char *)calloc(1, replace_len + (raw_len - search_len) + 1);
strcpy(new_line, replace);
strcpy(new_line + replace_len, remain_ptr);
*outline = new_line;
return 1;
}
int stm_timeout(struct timeval start, struct timeval end, int timeout_sec)
{
return (end.tv_sec * 1000 * 1000 + end.tv_usec) - (start.tv_sec * 1000 * 1000 + start.tv_usec) >= timeout_sec * 1000 * 1000;
}
struct monitor_reply *monitor_reply_nil(void)
{
struct monitor_reply *reply = CALLOC(struct monitor_reply, 1);
reply->type = MONITOR_REPLY_NIL;
reply->http_code = HTTP_OK;
reply->http_reason = "OK";
return reply;
}
/* string should without "\r\n", will add "\r\n" in monitor_reply_to_string() */
struct monitor_reply *monitor_reply_new_string(const char *format, ...)
{
struct monitor_reply *reply = CALLOC(struct monitor_reply, 1);
reply->type = MONITOR_REPLY_STRING;
reply->http_code = HTTP_OK;
reply->http_reason = "OK";
va_list ap;
va_start(ap, format);
reply->str = sdscatvprintf(sdsempty(), format, ap);
reply->len = strlen(reply->str);
va_end(ap);
return reply;
}
/* string should without "\r\n", will add "\r\n" in monitor_reply_to_string() */
struct monitor_reply *monitor_reply_new_error(const char *format, ...)
{
struct monitor_reply *reply = CALLOC(struct monitor_reply, 1);
reply->type = MONITOR_REPLY_ERROR;
reply->http_code = HTTP_BADREQUEST;
reply->http_reason = "ERROR";
va_list ap;
va_start(ap, format);
reply->str = sdscatvprintf(sdsempty(), format, ap);
reply->len = strlen(reply->str);
va_end(ap);
return reply;
}
struct monitor_reply *monitor_reply_new_status(const char *format, ...)
{
struct monitor_reply *reply = CALLOC(struct monitor_reply, 1);
reply->type = MONITOR_REPLY_STATUS;
reply->http_code = HTTP_OK;
reply->http_reason = "OK";
va_list ap;
va_start(ap, format);
reply->str = sdscatvprintf(sdsempty(), format, ap);
reply->len = strlen(reply->str);
va_end(ap);
return reply;
}
struct monitor_reply *monitor_reply_new_integer(long long integer)
{
struct monitor_reply *reply = CALLOC(struct monitor_reply, 1);
reply->type = MONITOR_REPLY_INTEGER;
reply->http_code = HTTP_OK;
reply->http_reason = "OK";
reply->integer = integer;
return reply;
}
struct monitor_reply *monitor_reply_new_double(double dval)
{
struct monitor_reply *reply = CALLOC(struct monitor_reply, 1);
reply->type = MONITOR_REPLY_DOUBLE;
reply->http_code = HTTP_OK;
reply->http_reason = "OK";
reply->dval = dval;
return reply;
}
sds monitor_reply_to_string(const struct monitor_reply *reply)
{
sds res = sdsempty();
switch (reply->type)
{
case MONITOR_REPLY_INTEGER:
res = sdscatprintf(res, "(integer) %lld\r\n", reply->integer);
break;
case MONITOR_REPLY_DOUBLE:
res = sdscatprintf(res, "(double) %f\r\n", reply->dval);
break;
case MONITOR_REPLY_STRING:
case MONITOR_REPLY_STATUS:
res = sdscatlen(res, reply->str, reply->len);
res = sdscat(res, "\r\n");
break;
case MONITOR_REPLY_NIL:
res = sdscat(res, "(nil)\r\n");
break;
case MONITOR_REPLY_ERROR:
res = sdscatprintf(res, "(error) %s\r\n", reply->str);
break;
default:
break;
}
return res;
}
void monitor_reply_free(struct monitor_reply *reply)
{
switch (reply->type)
{
case MONITOR_REPLY_STRING:
case MONITOR_REPLY_ERROR:
case MONITOR_REPLY_STATUS:
sdsfree(reply->str);
reply->str = NULL;
break;
default:
break;
}
free(reply);
}
static struct monitor_cli_args *monitor_util_search_cmd_args(const char *opt_name, const struct monitor_cli_args cli_args[], size_t cli_args_array_size)
{
if (NULL == cli_args)
{
return NULL;
}
for (size_t i = 0; i < cli_args_array_size; i++)
{
if (stm_strncasecmp_exactly(opt_name, cli_args[i].short_opt, strlen(cli_args[i].short_opt)) == 0 || stm_strncasecmp_exactly(opt_name, cli_args[i].long_opt, strlen(cli_args[i].long_opt)) == 0)
{
return (struct monitor_cli_args *)&cli_args[i];
}
}
return NULL;
}
sds monitor_util_copy_arg_value(int argc, const char *argv[], int *copy_args_num)
{
sds res = sdsempty();
int num = 0;
for (int i = 0; i < argc; i++)
{
if ((strncmp(argv[i], "-", 1) == 0) || (strncmp(argv[i], "--", 2) == 0))
{
break;
}
else
{
res = sdscat(res, argv[i]);
if ((i + 1 < argc) && (strncmp(argv[i + 1], "-", 1) != 0) && (strncmp(argv[i + 1], "--", 2) != 0)) // not the last one
{
res = sdscat(res, " ");
}
num++;
}
}
*copy_args_num = num;
return res;
}
int monitor_util_parse_cmd_args(int argc, const char *argv[], struct monitor_cli_args cli_args[], size_t cli_args_array_size)
{
if (argc <= 0 || NULL == argv || NULL == cli_args)
{
return -1;
}
int parse_stage = 0; // 0: arg; 1: value
struct monitor_cli_args *one_arg = NULL;
for (int i = 1; i < argc;) // skip program self name
{
if (0 == parse_stage)
{
one_arg = monitor_util_search_cmd_args(argv[i], cli_args, cli_args_array_size);
if (NULL == one_arg)
{
fprintf(stderr, "unknown option: %s\n", argv[i]);
return -1;
}
if (one_arg->require_arg_value && (i + 1 >= argc))
{
fprintf(stderr, "option requires value: %s\n", argv[i]);
return -1;
}
parse_stage = 1;
i += 1;
}
else
{
if (NULL == one_arg)
{
fprintf(stderr, "unknown option: %s\n", argv[i]);
return -1;
}
int copy_args_num = 0;
if (one_arg->value_is_multi_words)
{
one_arg->value = monitor_util_copy_arg_value(argc - i, &argv[i], &copy_args_num);
}
else
{
one_arg->value = sdsnew(argv[i]);
copy_args_num = 1;
}
i += copy_args_num;
parse_stage = 0;
one_arg = NULL;
}
}
return 0;
}
sds stm_config_print(const struct stellar_monitor_config *config)
{
sds res = sdsempty();
res = sdscatprintf(res, "cli_request_timeout: %ds, ", config->cli_request_timeout);
res = sdscatprintf(res, "connection_idle_timeout: %ds, ", config->connection_idle_timeout);
res = sdscatprintf(res, "listen_ip_addr: %s, ", config->listen_ipaddr);
res = sdscatprintf(res, "listen_port %u, ", config->listen_port_host_order);
res = sdscatprintf(res, "data_link_bind_port: %u, ", config->data_link_bind_port_host_order);
res = sdscatprintf(res, "stat_output_path: %s, ", config->output_path);
res = sdscatprintf(res, "stat_output_interval_ms: %dms ", config->output_interval_ms);
return res;
}
char *stm_http_url_encode(const char *originalText)
{
// allocate memory for the worst possible case (all characters need to be encoded)
char *encodedText = (char *)malloc(sizeof(char) * strlen(originalText) * 3 + 1);
const char *hex = "0123456789abcdef";
int pos = 0;
for (size_t i = 0; i < strlen(originalText); i++)
{
if (('a' <= originalText[i] && originalText[i] <= 'z') || ('A' <= originalText[i] && originalText[i] <= 'Z') || ('0' <= originalText[i] && originalText[i] <= '9'))
{
encodedText[pos++] = originalText[i];
}
else if (originalText[i] == ' ')
{
encodedText[pos++] = '%';
encodedText[pos++] = hex[originalText[i] >> 4];
encodedText[pos++] = hex[originalText[i] & 15];
}
/* todo: other characters ? */
else
{
encodedText[pos++] = originalText[i];
}
}
encodedText[pos] = '\0';
return encodedText;
}
const char *stm_get0_readable_session_addr(const struct tuple6 *addr, char *addr_buf, size_t max_buf_len)
{
tuple4_to_str((struct tuple4 *)addr, addr_buf, max_buf_len);
return addr_buf;
}
#ifdef __cplusplus
}
#endif

View File

@@ -0,0 +1,30 @@
#pragma once
#include <stdint.h>
#include <stddef.h>
#include <time.h>
#include <netinet/in.h>
#include "stellar/session.h"
#include "tuple/tuple.h"
#ifdef __cplusplus
extern "C"
{
#endif
int stm_replace_str(const char *raw, const char *search, const char *replace, char **outline);
int stm_strncasecmp_exactly(const char *s1, const char *s2, size_t n);
struct stm_cmd_reply *stm_build_reply_error(const char *format, ...);
struct stm_cmd_reply *stm_cmd_reply_dup(const struct stm_cmd_reply *src);
const char *stm_session_flow_dir_ntop(uint32_t dir);
int stm_time_range_pton(const char *time_str, time_t now, time_t time_range[2]);
int stm_time_in_range(time_t t, const time_t time_range[2]);
uint32_t stm_inet_pton(const char *ipstr, void *ipv4_value, void *ipv6_value);
uint32_t stm_ip_cidr_pton(const char *ipcidr, void *ipv4_value, void *ipv4_mask, void *ipv6_value, void *ipv6_mask);
int stm_ipv4_cidr_to_range(uint32_t ipaddr, uint32_t ipmask, uint32_t iprange[2]);
int stm_ipv6_cidr_to_range(const struct in6_addr *ipaddr, const struct in6_addr *ipmask, struct in6_addr iprange[2]);
void stm_mem_fill_rand(char *buf, size_t len, unsigned char range_min, unsigned char range_max);
int stm_string_isdigit(const char *str);
int stm_timeout(struct timeval start, struct timeval end, int timeout_sec);
const char *stm_session_state_ntop(enum session_state state);
const char *stm_get0_readable_session_addr(const struct tuple6 *addr, char *addr_buf, size_t max_buf_len);
#ifdef __cplusplus
}
#endif

10
infra/monitor/version.map Normal file
View File

@@ -0,0 +1,10 @@
VERS_3.0{
global:
extern "C" {
stellar_monitor_*;
monitor_*;
stm_*;
stellar_module_get_monitor;
};
local: *;
};

View File

@@ -8,7 +8,7 @@ extern "C"
#include <stdbool.h>
#include <sys/queue.h>
#include "tuple.h"
#include "tuple/tuple.h"
#include "stellar/packet.h"
#define PACKET_MAX_LAYERS 32

View File

@@ -593,5 +593,15 @@ void session_manager_on_thread_exit(struct module_manager *mod_mgr, int thread_i
{
struct session_manager *sess_mgr = module_get_ctx(mod);
assert(sess_mgr);
assert(thread_id < (int)sess_mgr->cfg->thread_num);
session_manager_clean(sess_mgr, thread_id);
}
// temp add for show session command
struct session_manager_rte *session_manager_get_runtime(struct session_manager *sess_mgr, uint16_t thread_id)
{
assert(sess_mgr);
assert(thread_id < sess_mgr->cfg->thread_num);
return sess_mgr->rte[thread_id];
}

View File

@@ -7,6 +7,7 @@ extern "C"
#include "tuple.h"
#include "stellar/session.h"
#include "session_manager_cfg.h"
enum session_scan_flags
{

View File

@@ -74,5 +74,7 @@ global:
lpi_plus_exit;
lpi_plus_appid_subscribe;
monitor_on_init;
monitor_on_exit;
local: *;
};