This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
stellar-stellar/infra/session_manager/session_monitor.c

1064 lines
32 KiB
C

#include <ctype.h>
#include <assert.h>
#include <stdlib.h>
#include "tuple.h"
#include "log_internal.h"
#include "utils_internal.h"
#include "session_internal.h"
#include "session_manager.h"
#include "session_manager_cfg.h"
#include "session_manager_rte.h"
#include "session_manager_stat.h"
#include "sds/sds.h"
#include "monitor/monitor_rpc.h"
#include "stellar/monitor.h"
#include "stellar/module.h"
#pragma GCC diagnostic ignored "-Wunused-parameter"
#pragma GCC diagnostic ignored "-Wunused-function"
#define DISPLAY_SESSION_DEFAULT_COUNT 10
#define DISPLAY_SESSION_MAX_COUNT 1000
#define SCAN_SESSION_DEFAULT_COUNT 1000
#define SESSION_MONITOR_MODULE_NAME "session_monitor_module"
#define SESSION_MONITOR_LOG_FATAL(format, ...) STELLAR_LOG_FATAL(__thread_local_logger, "session monitor", format, ##__VA_ARGS__)
#define SESSION_MONITOR_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "session monitor", format, ##__VA_ARGS__)
struct session_monitor
{
uint64_t thread_num;
uint64_t capacity; // per thread session table capacity
struct logger *logger;
struct session_manager *manager;
struct stellar_monitor *monitor;
};
/******************************************************************************
* parse utils
******************************************************************************/
/*
* https://docs.paloaltonetworks.com/pan-os/11-1/pan-os-cli-quick-start/cli-command-hierarchy/pan-os-11-1-cli-ops-command-hierarchy
* https://docs.paloaltonetworks.com/content/dam/techdocs/en_US/pdf/eol/pan-os-panorama-50-cli.pdf
*
* Syntax
*
* show session
* {
* all |
* {
* thread <number> |
* cursor <value> |
* count <value> |
* state <opening|active|closing|discard|closed> |
* type <tcp|udp> |
* saddr <ip_address[/mask]> |
* daddr <ip_address[/mask]> |
* sport <port_number> |
* dport <port_number> |
* stime <N[s|m|h|d]> |
* ptime <N[s|m|h|d]>
* }
* id <number>
* {
* thread <number>
* }
* info
* {
* thread <number>
* }
* }
*/
struct cmd_opts
{
uint64_t sess_id;
int thread_id; // -1 for all
struct session_filter filter;
};
enum pcode
{
PCODE_OK,
PCODE_HELP,
PCODE_ERR_INVALID_ARGS,
PCODE_ERR_INVALID_SESSID,
PCODE_ERR_INVALID_THREAD,
PCODE_ERR_INVALID_CURSOR,
PCODE_ERR_INVALID_COUNT,
PCODE_ERR_INVALID_DISPLAY,
PCODE_ERR_INVALID_TYPE,
PCODE_ERR_INVALID_STATE,
PCODE_ERR_INVALID_SPORT,
PCODE_ERR_INVALID_DPORT,
PCODE_ERR_INVALID_SADDR,
PCODE_ERR_INVALID_DADDR,
PCODE_ERR_INVALID_STIME,
PCODE_ERR_INVALID_PTIME,
};
typedef enum pcode parse_func(struct cmd_opts *opts, const char *val, void *ctx);
struct parser
{
const char *key;
parse_func *parse;
};
static const char *pcode_to_str(enum pcode code)
{
switch (code)
{
case PCODE_OK:
return "ok";
case PCODE_HELP:
return "help";
case PCODE_ERR_INVALID_ARGS:
return "invalid args";
case PCODE_ERR_INVALID_SESSID:
return "invalid session id";
case PCODE_ERR_INVALID_THREAD:
return "invalid thread id";
case PCODE_ERR_INVALID_CURSOR:
return "invalid cursor";
case PCODE_ERR_INVALID_COUNT:
return "invalid count";
case PCODE_ERR_INVALID_DISPLAY:
return "invalid display";
case PCODE_ERR_INVALID_TYPE:
return "invalid type";
case PCODE_ERR_INVALID_STATE:
return "invalid state";
case PCODE_ERR_INVALID_SPORT:
return "invalid sport";
case PCODE_ERR_INVALID_DPORT:
return "invalid dport";
case PCODE_ERR_INVALID_SADDR:
return "invalid saddr";
case PCODE_ERR_INVALID_DADDR:
return "invalid daddr";
case PCODE_ERR_INVALID_STIME:
return "invalid stime";
case PCODE_ERR_INVALID_PTIME:
return "invalid ptime";
default:
return "unknown";
}
}
static int is_digit_str(const char *val)
{
for (size_t i = 0; i < strlen(val); i++)
{
if (!isdigit(val[i]))
{
return 0;
}
}
return 1;
}
// return 0 on success
// return -1 on failure
// val format: N[s|m|h|d]
static int time_to_ms(const char *val, uint64_t *ts)
{
if (val == NULL)
{
return -1;
}
int len = strlen(val);
if (len > 21) // 18446744073709551615 in string
{
return -1;
}
char buff[32] = {0};
memcpy(buff, val, len);
char unit = buff[len - 1];
if (unit != 's' && unit != 'm' && unit != 'h' && unit != 'd')
{
return -1;
}
buff[len - 1] = '\0';
if (is_digit_str(buff) == 0)
{
return -1;
}
if (atoll(buff) < 1)
{
return -1;
}
switch (unit)
{
case 's':
*ts = atoll(buff) * 1000;
break;
case 'm':
*ts = atoll(buff) * 1000 * 60;
break;
case 'h':
*ts = atoll(buff) * 1000 * 60 * 60;
break;
case 'd':
*ts = atoll(buff) * 1000 * 60 * 60 * 24;
break;
default:
return -1;
}
return 0;
}
// return AF_INET/AF_INET6 on success
// return 0 on failure
// val format: <IPv4_address|IPv6_address>[/mask]
static int cidr_to_range(const char *val, union ip_address addr[2])
{
int mask = 0;
char buff[64] = {0};
char *delimit = NULL;
uint32_t ip4_addr = {0};
uint32_t ip4_mask = {0};
struct in6_addr ip6_addr = {0};
struct in6_addr ip6_mask = {0};
if (val == NULL)
{
return 0;
}
memcpy(buff, val, strlen(val));
delimit = strchr(buff, '/');
if (delimit == NULL)
{
if (inet_pton(AF_INET, buff, &ip4_addr) == 1)
{
memcpy(&addr[0], &ip4_addr, sizeof(struct in_addr));
memcpy(&addr[1], &ip4_addr, sizeof(struct in_addr));
return AF_INET;
}
if (inet_pton(AF_INET6, buff, &ip6_addr) == 1)
{
memcpy(&addr[0], &ip6_addr, sizeof(struct in6_addr));
memcpy(&addr[1], &ip6_addr, sizeof(struct in6_addr));
return AF_INET6;
}
}
else
{
*delimit = '\0';
delimit++;
if (!is_digit_str(delimit))
{
return 0;
}
mask = atoi(delimit);
if (inet_pton(AF_INET, buff, &ip4_addr) == 1)
{
if (mask <= 0 || mask > 32)
{
return 0;
}
for (int i = 0; i < mask; i++)
{
ip4_mask |= (uint32_t)1 << (31 - i);
}
ip4_mask = ntohl(ip4_mask);
addr[0].v4.s_addr = (uint32_t)(ip4_addr & ip4_mask);
addr[1].v4.s_addr = (uint32_t)((ip4_addr & ip4_mask) | ~ip4_mask);
return AF_INET;
}
if (inet_pton(AF_INET6, buff, &ip6_addr) == 1)
{
if (mask <= 0 || mask > 128)
{
return 0;
}
for (int i = 0; i < mask; i++)
{
ip6_mask.s6_addr[i / 8] |= 1 << (7 - i % 8);
}
for (int i = 0; i < 16; i++)
{
addr[0].v6.s6_addr[i] = (ip6_addr.s6_addr[i] & ip6_mask.s6_addr[i]);
addr[1].v6.s6_addr[i] = (ip6_addr.s6_addr[i] & ip6_mask.s6_addr[i]) | ~ip6_mask.s6_addr[i];
}
return AF_INET6;
}
}
return 0;
}
static enum pcode parse_help(struct cmd_opts *opts, const char *val, void *ctx)
{
return PCODE_HELP;
}
static enum pcode parse_id(struct cmd_opts *opts, const char *val, void *ctx)
{
if (val == NULL || is_digit_str(val) == 0)
{
return PCODE_ERR_INVALID_SESSID;
}
opts->sess_id = atoll(val);
return PCODE_OK;
}
static enum pcode parse_thread(struct cmd_opts *opts, const char *val, void *ctx)
{
struct session_monitor *mnt = (struct session_monitor *)ctx;
if (val == NULL)
{
return PCODE_ERR_INVALID_THREAD;
}
if (strcasecmp(val, "all") == 0)
{
opts->thread_id = -1;
return PCODE_OK;
}
if (is_digit_str(val) == 0)
{
return PCODE_ERR_INVALID_THREAD;
}
if (atoll(val) < 0 || (uint64_t)atoll(val) >= mnt->thread_num)
{
return PCODE_ERR_INVALID_THREAD;
}
opts->thread_id = atoll(val);
return PCODE_OK;
}
static enum pcode parse_cursor(struct cmd_opts *opts, const char *val, void *ctx)
{
struct session_monitor *mnt = (struct session_monitor *)ctx;
if (val == NULL || is_digit_str(val) == 0)
{
return PCODE_ERR_INVALID_CURSOR;
}
if ((uint64_t)atoll(val) >= mnt->capacity)
{
return PCODE_ERR_INVALID_CURSOR;
}
opts->filter.cursor = atoll(val);
return PCODE_OK;
}
static enum pcode parse_count(struct cmd_opts *opts, const char *val, void *ctx)
{
struct session_monitor *mnt = (struct session_monitor *)ctx;
if (val == NULL || is_digit_str(val) == 0)
{
return PCODE_ERR_INVALID_COUNT;
}
if (atoll(val) < 1 || (uint64_t)atoll(val) > mnt->capacity)
{
return PCODE_ERR_INVALID_COUNT;
}
opts->filter.count = atoll(val);
return PCODE_OK;
}
static enum pcode parse_display(struct cmd_opts *opts, const char *val, void *ctx)
{
if (val == NULL || is_digit_str(val) == 0)
{
return PCODE_ERR_INVALID_COUNT;
}
if (atoll(val) < 1 || atoll(val) > DISPLAY_SESSION_MAX_COUNT)
{
return PCODE_ERR_INVALID_COUNT;
}
opts->filter.limit = atoll(val);
return PCODE_OK;
}
static enum pcode parse_type(struct cmd_opts *opts, const char *val, void *ctx)
{
if (val == NULL)
{
return PCODE_ERR_INVALID_TYPE;
}
if (strcasecmp(val, "tcp") == 0)
{
opts->filter.type = SESSION_TYPE_TCP;
return PCODE_OK;
}
else if (strcasecmp(val, "udp") == 0)
{
opts->filter.type = SESSION_TYPE_UDP;
return PCODE_OK;
}
else
{
return PCODE_ERR_INVALID_TYPE;
}
}
static enum pcode parse_state(struct cmd_opts *opts, const char *val, void *ctx)
{
if (val == NULL)
{
return PCODE_ERR_INVALID_STATE;
}
if (strcasecmp(val, "opening") == 0)
{
opts->filter.state = SESSION_STATE_OPENING;
return PCODE_OK;
}
else if (strcasecmp(val, "active") == 0)
{
opts->filter.state = SESSION_STATE_ACTIVE;
return PCODE_OK;
}
else if (strcasecmp(val, "closing") == 0)
{
opts->filter.state = SESSION_STATE_CLOSING;
return PCODE_OK;
}
else if (strcasecmp(val, "discard") == 0)
{
opts->filter.state = SESSION_STATE_DISCARD;
return PCODE_OK;
}
else if (strcasecmp(val, "closed") == 0)
{
opts->filter.state = SESSION_STATE_CLOSED;
return PCODE_OK;
}
else
{
return PCODE_ERR_INVALID_STATE;
}
}
static enum pcode parse_sport(struct cmd_opts *opts, const char *val, void *ctx)
{
if (val == NULL || is_digit_str(val) == 0)
{
return PCODE_ERR_INVALID_SPORT;
}
if (atoi(val) < 1 || atoi(val) > 65535)
{
return PCODE_ERR_INVALID_SPORT;
}
opts->filter.src_port = htons(atoi(val));
return PCODE_OK;
}
static enum pcode parse_dport(struct cmd_opts *opts, const char *val, void *ctx)
{
if (val == NULL || is_digit_str(val) == 0)
{
return PCODE_ERR_INVALID_DPORT;
}
if (atoi(val) < 1 || atoi(val) > 65535)
{
return PCODE_ERR_INVALID_DPORT;
}
opts->filter.dst_port = htons(atoi(val));
return PCODE_OK;
}
static enum pcode parse_sadd(struct cmd_opts *opts, const char *val, void *ctx)
{
if (val == NULL)
{
return PCODE_ERR_INVALID_SADDR;
}
opts->filter.src_family = cidr_to_range(val, opts->filter.src_addr_range);
if (opts->filter.src_family != AF_INET && opts->filter.src_family != AF_INET6)
{
return PCODE_ERR_INVALID_SADDR;
}
return PCODE_OK;
}
static enum pcode parse_dadd(struct cmd_opts *opts, const char *val, void *ctx)
{
if (val == NULL)
{
return PCODE_ERR_INVALID_DADDR;
}
opts->filter.dst_family = cidr_to_range(val, opts->filter.dst_addr_range);
if (opts->filter.dst_family != AF_INET && opts->filter.dst_family != AF_INET6)
{
return PCODE_ERR_INVALID_DADDR;
}
return PCODE_OK;
}
static enum pcode parse_stime(struct cmd_opts *opts, const char *val, void *ctx)
{
if (val == NULL)
{
return PCODE_ERR_INVALID_STIME;
}
uint64_t ts_ms;
uint64_t now_ms = clock_get_real_time_ms();
if (time_to_ms(val, &ts_ms) != 0)
{
return PCODE_ERR_INVALID_STIME;
}
opts->filter.sess_created_ts_in_ms = now_ms - ts_ms;
return PCODE_OK;
}
static enum pcode parse_ptime(struct cmd_opts *opts, const char *val, void *ctx)
{
if (val == NULL)
{
return PCODE_ERR_INVALID_PTIME;
}
uint64_t ts_ms;
uint64_t now_ms = clock_get_real_time_ms();
if (time_to_ms(val, &ts_ms) != 0)
{
return PCODE_ERR_INVALID_PTIME;
}
opts->filter.pkt_received_ts_in_ms = now_ms - ts_ms;
return PCODE_OK;
}
static enum pcode cmd_opts_parse(struct parser parses[], int max_parser, struct cmd_opts *opts, int argc, char **argv, void *ctx)
{
memset(opts, 0, sizeof(struct cmd_opts));
opts->thread_id = 0;
opts->filter.cursor = 0;
opts->filter.count = SCAN_SESSION_DEFAULT_COUNT;
opts->filter.limit = DISPLAY_SESSION_DEFAULT_COUNT;
int j = 0;
for (int i = 0; i < argc; i = i + 2)
{
char *key = argv[i];
char *val = i + 1 < argc ? argv[i + 1] : NULL;
if (val == NULL)
{
return PCODE_ERR_INVALID_ARGS;
}
for (j = 0; j < max_parser; j++)
{
if (strcasecmp(parses[j].key, key) != 0)
{
continue;
}
enum pcode ret = parses[j].parse(opts, val, ctx);
if (ret == PCODE_OK)
{
break;
}
else
{
return ret;
}
}
if (j == max_parser)
{
return PCODE_ERR_INVALID_ARGS;
}
}
return PCODE_OK;
}
/******************************************************************************
* session monitor
******************************************************************************/
static char *show_session_id_usage(struct session_monitor *mnt)
{
static char buff[2048] = {0};
snprintf(buff, sizeof(buff), "Usage: show session id <id> [options]\n"
"Options:\n"
" help -- Display help info\n"
" thread <val> -- Thread index [0, %lu] or 'all', default: 0\n",
mnt->thread_num - 1);
return buff;
}
static char *show_session_all_usage(struct session_monitor *mnt)
{
static char buff[2048] = {0};
snprintf(buff, sizeof(buff), "Usage: show session all [options]\n"
"Options:\n"
" help -- Display help info\n"
" thread <val> -- Thread index [0, %lu] or 'all', default: 0\n"
" cursor <val> -- Start from the cursor [0, %lu], default: 0\n"
" count <N> -- Scan N sessions [1, %lu], default: %d\n"
" display <N> -- Display N matched sessions [1, %d], default: %d\n"
" state <opening|active|closing|discard|closed> -- Session state\n"
" type <tcp|udp> -— Session type\n"
" saddr <address[/mask]> -- Source IP address\n"
" daddr <address[/mask]> -- Destination IP address\n"
" sport <port_number> -- Source port [1, 65535]\n"
" dport <port_number> -- Destination port [1, 65535]\n"
" stime <N[s|m|h|d]> -- Session created in the last N seconds/minutes/hours/days [1, 2^64-1]\n"
" ptime <N[s|m|h|d]> -- Packet received in the last N seconds/minutes/hours/days [1, 2^64-1]\n",
mnt->thread_num - 1,
mnt->capacity - 1,
mnt->capacity, SCAN_SESSION_DEFAULT_COUNT,
DISPLAY_SESSION_MAX_COUNT, DISPLAY_SESSION_DEFAULT_COUNT);
return buff;
}
static char *show_session_info_usage(struct session_monitor *mnt)
{
static char buff[2048] = {0};
snprintf(buff, sizeof(buff), "Usage: show session info [options]\n"
"Options:\n"
" help -- Display help info\n"
" thread <val> -- Thread index [0, %lu] or 'all', default: 0\n",
mnt->thread_num - 1);
return buff;
}
struct show_sess_id_ctx
{
uint64_t sess_id;
struct session matched_sess;
uint64_t find;
};
struct show_sess_all_ctx
{
struct session_filter filter;
uint64_t matched_ids[DISPLAY_SESSION_MAX_COUNT];
uint64_t size;
uint64_t used;
};
struct show_sess_info_ctx
{
struct session_manager_stat stat;
};
static struct iovec worker_thread_lookup_session(int thread_idx, struct iovec req, void *args)
{
struct session_monitor *mnt = (struct session_monitor *)args;
struct show_sess_id_ctx *ctx = (struct show_sess_id_ctx *)req.iov_base;
struct session_manager_rte *rte = session_manager_get_rte(mnt->manager, thread_idx);
if (rte == NULL)
{
return req;
}
struct session *sess = session_manager_rte_lookup_session_by_id(rte, ctx->sess_id);
if (sess)
{
memcpy(&ctx->matched_sess, sess, sizeof(struct session));
ctx->find = 1;
}
return req;
}
static struct iovec worker_thread_scan_session(int thread_idx, struct iovec req, void *args)
{
struct session_monitor *mnt = (struct session_monitor *)args;
struct show_sess_all_ctx *ctx = (struct show_sess_all_ctx *)req.iov_base;
struct session_manager_rte *rte = session_manager_get_rte(mnt->manager, thread_idx);
if (rte == NULL)
{
return req;
}
ctx->used += session_manager_rte_scan_session(rte, &ctx->filter, &ctx->matched_ids[ctx->used], ctx->size - ctx->used);
return req;
}
static struct iovec worker_thread_stat_session(int thread_idx, struct iovec req, void *args)
{
struct session_monitor *mnt = (struct session_monitor *)args;
struct show_sess_info_ctx *ctx = (struct show_sess_info_ctx *)req.iov_base;
struct session_manager_rte *rte = session_manager_get_rte(mnt->manager, thread_idx);
if (rte == NULL)
{
return req;
}
ctx->stat = *session_manager_rte_get_stat(rte);
return req;
}
static sds append_session_detail(sds ss, const struct session *sess, int thread_id)
{
ss = sdscatprintf(ss, "thread[%d]\n", thread_id);
ss = sdscatprintf(ss, " id : %ld\n", session_get_id(sess));
ss = sdscatprintf(ss, " type : %s\n", session_type_to_str(session_get_type(sess)));
ss = sdscatprintf(ss, " state : %s\n", session_state_to_str(session_get_current_state(sess)));
ss = sdscatprintf(ss, " tuple : %s\n", session_get_readable_addr(sess));
ss = sdscatprintf(ss, " session created timestamp : %ld (ms)\n", session_get_timestamp(sess, SESSION_TIMESTAMP_START));
ss = sdscatprintf(ss, " last packet received timestamp : %ld (ms)\n", session_get_timestamp(sess, SESSION_TIMESTAMP_LAST));
ss = sdscatprintf(ss, " C2S received packets : %ld\n", session_get_stat(sess, FLOW_TYPE_C2S, STAT_RAW_PACKETS_RECEIVED));
ss = sdscatprintf(ss, " S2C received packets : %ld\n", session_get_stat(sess, FLOW_TYPE_S2C, STAT_RAW_PACKETS_RECEIVED));
ss = sdscatprintf(ss, " C2S received bytes : %ld\n", session_get_stat(sess, FLOW_TYPE_C2S, STAT_RAW_BYTES_RECEIVED));
ss = sdscatprintf(ss, " S2C received bytes : %ld\n", session_get_stat(sess, FLOW_TYPE_S2C, STAT_RAW_BYTES_RECEIVED));
return ss;
}
static sds append_session_brief(sds ss, const struct session *sess, int thread_id)
{
if (sdslen(ss) == 0)
{
ss = sdscatprintf(ss, " %5s", "thread");
ss = sdscatprintf(ss, " %12s", "session_id");
ss = sdscatprintf(ss, " %8s", "type");
ss = sdscatprintf(ss, " %6s", "state");
ss = sdscatprintf(ss, " %40s", "saddr sport");
ss = sdscatprintf(ss, " %40s", "daddr dport");
ss = sdscatprintf(ss, " %8s\n", "domain");
ss = sdscatprintf(ss, "--------------------------------------------------------------------------------------------------------------------------------------\n");
}
char src_addr[INET6_ADDRSTRLEN] = {0};
char dst_addr[INET6_ADDRSTRLEN] = {0};
const struct tuple6 *tuple = session_get_tuple6(sess);
if (tuple->addr_family == AF_INET)
{
inet_ntop(AF_INET, &tuple->src_addr.v4, src_addr, INET6_ADDRSTRLEN);
inet_ntop(AF_INET, &tuple->dst_addr.v4, dst_addr, INET6_ADDRSTRLEN);
}
else
{
inet_ntop(AF_INET6, &tuple->src_addr.v6, src_addr, INET6_ADDRSTRLEN);
inet_ntop(AF_INET6, &tuple->dst_addr.v6, dst_addr, INET6_ADDRSTRLEN);
}
ss = sdscatprintf(ss, " %3d", thread_id);
ss = sdscatprintf(ss, " %18ld", session_get_id(sess));
ss = sdscatprintf(ss, " %4s", session_type_to_str(session_get_type(sess)));
ss = sdscatprintf(ss, " %8s", session_state_to_str(session_get_current_state(sess)));
ss = sdscatprintf(ss, " %33s %5d", src_addr, ntohs(tuple->src_port));
ss = sdscatprintf(ss, " %33s %5d", dst_addr, ntohs(tuple->dst_port));
ss = sdscatprintf(ss, " %8ld\n", tuple->domain);
return ss;
}
static sds append_session_stat(sds ss, const struct session_manager_stat *stat, int thread_id)
{
ss = sdscatprintf(ss, "thread[%d]\n", thread_id);
ss = sdscatprintf(ss, " history_tcp_sessions : %ld\n", stat->history_tcp_sessions);
ss = sdscatprintf(ss, " tcp_sess_used : %ld\n", stat->tcp_sess_used);
ss = sdscatprintf(ss, " tcp_sess_opening : %ld\n", stat->tcp_sess_opening);
ss = sdscatprintf(ss, " tcp_sess_active : %ld\n", stat->tcp_sess_active);
ss = sdscatprintf(ss, " tcp_sess_closing : %ld\n", stat->tcp_sess_closing);
ss = sdscatprintf(ss, " tcp_sess_discard : %ld\n", stat->tcp_sess_discard);
ss = sdscatprintf(ss, " tcp_sess_closed : %ld\n\n", stat->tcp_sess_closed);
ss = sdscatprintf(ss, " history_udp_sessions : %ld\n", stat->history_udp_sessions);
ss = sdscatprintf(ss, " udp_sess_used : %ld\n", stat->udp_sess_used);
ss = sdscatprintf(ss, " udp_sess_opening : %ld\n", stat->udp_sess_opening);
ss = sdscatprintf(ss, " udp_sess_active : %ld\n", stat->udp_sess_active);
ss = sdscatprintf(ss, " udp_sess_closing : %ld\n", stat->udp_sess_closing);
ss = sdscatprintf(ss, " udp_sess_discard : %ld\n", stat->udp_sess_discard);
ss = sdscatprintf(ss, " udp_sess_closed : %ld\n\n", stat->udp_sess_closed);
ss = sdscatprintf(ss, " tcp_sess_evicted : %ld\n", stat->tcp_sess_evicted);
ss = sdscatprintf(ss, " udp_sess_evicted : %ld\n\n", stat->udp_sess_evicted);
return ss;
}
static sds rpc_show_session_id(struct session_monitor *mnt, int thread_id, uint64_t sess_id, sds ss)
{
struct show_sess_id_ctx lookup_ctx = {.sess_id = sess_id, .matched_sess = {0}, .find = 0};
struct iovec lookup_ctx_iov = {.iov_base = &lookup_ctx, .iov_len = sizeof(struct show_sess_id_ctx)};
monitor_worker_thread_rpc(mnt->monitor, thread_id, lookup_ctx_iov, worker_thread_lookup_session, mnt);
if (lookup_ctx.find == 1)
{
ss = append_session_detail(ss, &lookup_ctx.matched_sess, thread_id);
}
return ss;
}
static sds rpc_show_session_all(struct session_monitor *mnt, int thread_id, struct show_sess_all_ctx *scan_ctx, sds ss)
{
struct iovec scan_ctx_iov = {.iov_base = scan_ctx, .iov_len = sizeof(struct show_sess_all_ctx)};
uint64_t start = scan_ctx->used;
monitor_worker_thread_rpc(mnt->monitor, thread_id, scan_ctx_iov, worker_thread_scan_session, mnt);
for (uint64_t j = start; j < scan_ctx->used; j++)
{
struct show_sess_id_ctx lookup_ctx = {.sess_id = scan_ctx->matched_ids[j], .matched_sess = {0}, .find = 0};
struct iovec lookup_ctx_iov = {.iov_base = &lookup_ctx, .iov_len = sizeof(struct show_sess_id_ctx)};
monitor_worker_thread_rpc(mnt->monitor, thread_id, lookup_ctx_iov, worker_thread_lookup_session, mnt);
if (lookup_ctx.find == 1)
{
ss = append_session_brief(ss, &lookup_ctx.matched_sess, thread_id);
}
}
return ss;
}
static sds rpc_show_session_info(struct session_monitor *mnt, int thread_id, sds ss)
{
struct show_sess_info_ctx ctx = {0};
struct iovec ctx_iov = {.iov_base = &ctx, .iov_len = sizeof(struct show_sess_info_ctx)};
monitor_worker_thread_rpc(mnt->monitor, thread_id, ctx_iov, worker_thread_stat_session, mnt);
ss = append_session_stat(ss, &ctx.stat, thread_id);
return ss;
}
static struct monitor_reply *monitor_show_session_id(struct stellar_monitor *monitor, int argc, char *argv[], void *ctx)
{
struct session_monitor *mnt = (struct session_monitor *)ctx;
// show session id <val> [options]
if (argc < 3 || strcasecmp(argv[0], "show") != 0 || strcasecmp(argv[1], "session") != 0 || strcasecmp(argv[2], "id") != 0)
{
return monitor_reply_new_string(show_session_id_usage(mnt));
}
struct parser parsers[] = {
{"help", parse_help},
{"id", parse_id},
{"thread", parse_thread},
};
struct cmd_opts opts;
enum pcode ret = cmd_opts_parse(parsers, sizeof(parsers) / sizeof(parsers[0]), &opts, argc - 2, argv + 2, mnt);
if (ret == PCODE_HELP)
{
return monitor_reply_new_string(show_session_id_usage(mnt));
}
if (ret != PCODE_OK)
{
return monitor_reply_new_string("%s\n%s", pcode_to_str(ret), show_session_id_usage(mnt));
}
sds ss = sdsempty();
if (opts.thread_id == -1) // all thread
{
for (uint64_t i = 0; i < mnt->thread_num; i++)
{
ss = rpc_show_session_id(mnt, i, opts.sess_id, ss);
if (sdslen(ss))
{
break;
}
}
}
else
{
ss = rpc_show_session_id(mnt, opts.thread_id, opts.sess_id, ss);
}
if (sdslen(ss) == 0)
{
ss = sdscatprintf(ss, "no found");
}
struct monitor_reply *reply = monitor_reply_new_string(ss);
sdsfree(ss);
return reply;
}
static struct monitor_reply *monitor_show_session_all(struct stellar_monitor *monitor, int argc, char *argv[], void *ctx)
{
struct session_monitor *mnt = (struct session_monitor *)ctx;
// show session all [options]
if (argc < 3 || strcasecmp(argv[0], "show") != 0 || strcasecmp(argv[1], "session") != 0 || strcasecmp(argv[2], "all") != 0)
{
return monitor_reply_new_string(show_session_all_usage(mnt));
}
struct parser parsers[] = {
{"help", parse_help},
{"thread", parse_thread},
{"cursor", parse_cursor},
{"count", parse_count},
{"display", parse_display},
{"type", parse_type},
{"state", parse_state},
{"sport", parse_sport},
{"dport", parse_dport},
{"saddr", parse_sadd},
{"daddr", parse_dadd},
{"stime", parse_stime},
{"ptime", parse_ptime},
};
struct cmd_opts opts;
enum pcode ret = cmd_opts_parse(parsers, sizeof(parsers) / sizeof(parsers[0]), &opts, argc - 3, argv + 3, mnt);
if (ret == PCODE_HELP || ret == PCODE_ERR_INVALID_ARGS)
{
return monitor_reply_new_string(show_session_all_usage(mnt));
}
if (ret != PCODE_OK)
{
return monitor_reply_new_string("%s\n%s", pcode_to_str(ret), show_session_all_usage(mnt));
}
sds ss = sdsempty();
struct show_sess_all_ctx scan_ctx = {.filter = opts.filter, .matched_ids = {0}, .size = opts.filter.limit, .used = 0};
if (opts.thread_id == -1) // all thread
{
for (uint16_t i = 0; i < mnt->thread_num; i++)
{
ss = rpc_show_session_all(mnt, i, &scan_ctx, ss);
if (scan_ctx.used >= scan_ctx.size)
{
break;
}
}
}
else
{
ss = rpc_show_session_all(mnt, opts.thread_id, &scan_ctx, ss);
}
struct monitor_reply *reply = monitor_reply_new_string(ss);
sdsfree(ss);
return reply;
}
static struct monitor_reply *monitor_show_session_info(struct stellar_monitor *monitor, int argc, char *argv[], void *ctx)
{
struct session_monitor *mnt = (struct session_monitor *)ctx;
// show session info [options]
if (argc < 3 || strcasecmp(argv[0], "show") != 0 || strcasecmp(argv[1], "session") != 0 || strcasecmp(argv[2], "info") != 0)
{
return monitor_reply_new_string(show_session_info_usage(mnt));
}
struct parser parsers[] = {
{"help", parse_help},
{"thread", parse_thread},
};
struct cmd_opts opts;
enum pcode ret = cmd_opts_parse(parsers, sizeof(parsers) / sizeof(parsers[0]), &opts, argc - 3, argv + 3, mnt);
if (ret != PCODE_OK)
{
return monitor_reply_new_string(show_session_info_usage(mnt));
}
sds ss = sdsempty();
if (opts.thread_id == -1) // all thread
{
for (uint16_t i = 0; i < mnt->thread_num; i++)
{
ss = rpc_show_session_info(mnt, i, ss);
}
}
else
{
ss = rpc_show_session_info(mnt, opts.thread_id, ss);
}
struct monitor_reply *reply = monitor_reply_new_string(ss);
sdsfree(ss);
return reply;
}
struct module *session_monitor_on_init(struct module_manager *mod_mgr)
{
assert(mod_mgr);
struct session_manager *manager = module_to_session_manager(module_manager_get_module(mod_mgr, SESSION_MANAGER_MODULE_NAME));
assert(manager);
struct stellar_monitor *monitor = monitor_module_to_monitor(module_manager_get_module(mod_mgr, MONITOR_MODULE_NAME));
assert(monitor);
struct logger *logger = module_manager_get_logger(mod_mgr);
assert(logger);
int thread_num = module_manager_get_max_thread_num(mod_mgr);
struct session_monitor *mnt = (struct session_monitor *)calloc(1, sizeof(struct session_monitor));
if (mnt == NULL)
{
SESSION_MONITOR_LOG_ERROR("failed to alloc session_monitor");
return NULL;
}
struct session_manager_cfg *cfg = session_manager_get_cfg(manager);
mnt->thread_num = thread_num;
mnt->capacity = cfg->tcp_session_max + cfg->udp_session_max;
mnt->monitor = monitor;
mnt->manager = manager;
mnt->logger = logger;
monitor_register_cmd(mnt->monitor, "show session id", monitor_show_session_id, "readonly", "", "", mnt);
monitor_register_cmd(mnt->monitor, "show session all", monitor_show_session_all, "readonly", "", "", mnt);
monitor_register_cmd(mnt->monitor, "show session info", monitor_show_session_info, "readonly", "", "", mnt);
struct module *sess_mnt_mod = module_new(SESSION_MONITOR_MODULE_NAME, NULL);
if (sess_mnt_mod == NULL)
{
SESSION_MONITOR_LOG_ERROR("failed to create session_monitor");
free(mnt);
return NULL;
}
module_set_ctx(sess_mnt_mod, mnt);
SESSION_MONITOR_LOG_FATAL("session_monitor init");
return sess_mnt_mod;
}
void session_monitor_on_exit(struct module_manager *mod_mgr, struct module *mod)
{
if (mod)
{
struct session_monitor *mnt = module_get_ctx(mod);
free(mnt);
module_free(mod);
SESSION_MONITOR_LOG_FATAL("session_monitor exit");
}
}