diff --git a/conf/stellar.toml b/conf/stellar.toml index 8303360..fdd7ba9 100644 --- a/conf/stellar.toml +++ b/conf/stellar.toml @@ -72,6 +72,11 @@ cli_request_timeout = 3 # second pktdump_task_max_num = 3 +[[module]] + path = "" + init = "monitor_on_init" + exit = "monitor_on_exit" + [[module]] path = "" init = "packet_manager_on_init" @@ -87,6 +92,6 @@ thread_exit = "session_manager_on_thread_exit" [[module]] - path="" - init="monitor_on_init" - exit="monitor_on_exit" + path = "" + init = "session_monitor_on_init" + exit = "session_monitor_on_exit" diff --git a/infra/session_manager/CMakeLists.txt b/infra/session_manager/CMakeLists.txt index 0fa2260..ae68fdc 100644 --- a/infra/session_manager/CMakeLists.txt +++ b/infra/session_manager/CMakeLists.txt @@ -3,8 +3,9 @@ add_library(session_manager session_pool.c session_table.c session_timer.c - session_filter.c + session_dabloom.c session_transition.c + session_monitor.c session_manager.c session_manager_cfg.c session_manager_rte.c @@ -13,6 +14,6 @@ add_library(session_manager target_include_directories(session_manager PUBLIC ${CMAKE_CURRENT_LIST_DIR}) target_include_directories(session_manager PUBLIC ${CMAKE_SOURCE_DIR}/infra/) target_include_directories(session_manager PUBLIC ${CMAKE_SOURCE_DIR}/include) -target_link_libraries(session_manager timeout packet_manager tcp_reassembly mq exdata fieldstat4) +target_link_libraries(session_manager timeout packet_manager tcp_reassembly mq exdata fieldstat4 monitor) add_subdirectory(test) \ No newline at end of file diff --git a/infra/session_manager/session_manager.c b/infra/session_manager/session_manager.c index 3691805..89496c9 100644 --- a/infra/session_manager/session_manager.c +++ b/infra/session_manager/session_manager.c @@ -598,10 +598,15 @@ void session_manager_on_thread_exit(struct module_manager *mod_mgr, int thread_i session_manager_clean(sess_mgr, thread_id); } -// temp add for show session command -struct session_manager_rte *session_manager_get_runtime(struct session_manager *sess_mgr, uint16_t thread_id) +struct session_manager_rte *session_manager_get_rte(struct session_manager *sess_mgr, uint16_t thread_id) { assert(sess_mgr); assert(thread_id < sess_mgr->cfg->thread_num); return sess_mgr->rte[thread_id]; +} + +struct session_manager_cfg *session_manager_get_cfg(struct session_manager *sess_mgr) +{ + assert(sess_mgr); + return sess_mgr->cfg; } \ No newline at end of file diff --git a/infra/session_manager/session_manager.h b/infra/session_manager/session_manager.h new file mode 100644 index 0000000..724f019 --- /dev/null +++ b/infra/session_manager/session_manager.h @@ -0,0 +1,16 @@ +#pragma once + +#ifdef __cplusplus +extern "C" +{ +#endif + +#include + +struct session_manager; +struct session_manager_rte *session_manager_get_rte(struct session_manager *sess_mgr, uint16_t thread_id); +struct session_manager_cfg *session_manager_get_cfg(struct session_manager *sess_mgr); + +#ifdef __cplusplus +} +#endif diff --git a/infra/session_manager/session_manager_cfg.c b/infra/session_manager/session_manager_cfg.c index 5542831..84280e6 100644 --- a/infra/session_manager/session_manager_cfg.c +++ b/infra/session_manager/session_manager_cfg.c @@ -36,9 +36,9 @@ struct session_manager_cfg *session_manager_cfg_new(const char *toml_file) ret += load_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.handshake", &sess_mgr_cfg->tcp_timeout_ms.handshake, 1, 60000); ret += load_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.data", &sess_mgr_cfg->tcp_timeout_ms.data, 1, 15999999000); ret += load_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.half_closed", &sess_mgr_cfg->tcp_timeout_ms.half_closed, 1, 604800000); - ret += load_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.time_wait", &sess_mgr_cfg->tcp_timeout_ms.time_wait, 1, 60000); + ret += load_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.time_wait", &sess_mgr_cfg->tcp_timeout_ms.time_wait, 1, 600000); ret += load_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.discard_default", &sess_mgr_cfg->tcp_timeout_ms.discard_default, 1, 15999999000); - ret += load_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.unverified_rst", &sess_mgr_cfg->tcp_timeout_ms.unverified_rst, 1, 60000); + ret += load_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.unverified_rst", &sess_mgr_cfg->tcp_timeout_ms.unverified_rst, 1, 600000); // UDP timeout ret += load_toml_integer_config(toml_file, "session_manager.udp_timeout_ms.data", &sess_mgr_cfg->udp_timeout_ms.data, 1, 15999999000); diff --git a/infra/session_manager/session_manager_rte.c b/infra/session_manager/session_manager_rte.c index e424faf..56e229b 100644 --- a/infra/session_manager/session_manager_rte.c +++ b/infra/session_manager/session_manager_rte.c @@ -1,11 +1,11 @@ #include #include "packet_helper.h" -#include "packet_filter.h" +#include "packet_dabloom.h" #include "session_pool.h" #include "session_table.h" #include "session_timer.h" -#include "session_filter.h" +#include "session_dabloom.h" #include "session_internal.h" #include "session_transition.h" #include "session_manager_log.h" @@ -26,8 +26,8 @@ struct session_manager_rte struct session_table *udp_table; struct session_timer *sess_timer; - struct packet_filter *dup_pkt_filter; - struct session_filter *evc_sess_filter; + struct packet_dabloom *dup_pkt_dab; + struct session_dabloom *evc_sess_dab; struct session_manager_cfg cfg; struct session_manager_stat stat; @@ -420,7 +420,7 @@ static int session_manager_rte_bypass_packet_on_udp_table_limit(struct session_m static int session_manager_rte_bypass_packet_on_session_evicted(struct session_manager_rte *sess_mgr_rte, const struct tuple6 *key) { - if (sess_mgr_rte->cfg.evicted_session_bloom_filter.enable && session_filter_lookup(sess_mgr_rte->evc_sess_filter, key, sess_mgr_rte->now_ms)) + if (sess_mgr_rte->cfg.evicted_session_bloom_filter.enable && session_dabloom_lookup(sess_mgr_rte->evc_sess_dab, key, sess_mgr_rte->now_ms)) { sess_mgr_rte->stat.udp_pkts_bypass_session_evicted++; return 1; @@ -439,7 +439,7 @@ static int session_manager_rte_bypass_duplicated_packet(struct session_manager_r enum flow_type type = identify_flow_type_by_history(sess, key); if (session_get_stat(sess, type, STAT_RAW_PACKETS_RECEIVED) < 3 || session_has_duplicate_traffic(sess)) { - if (packet_filter_lookup(sess_mgr_rte->dup_pkt_filter, pkt, sess_mgr_rte->now_ms)) + if (packet_dabloom_lookup(sess_mgr_rte->dup_pkt_dab, pkt, sess_mgr_rte->now_ms)) { session_inc_stat(sess, type, STAT_DUPLICATE_PACKETS_BYPASS, 1); session_inc_stat(sess, type, STAT_DUPLICATE_BYTES_BYPASS, packet_get_raw_len(pkt)); @@ -463,7 +463,7 @@ static int session_manager_rte_bypass_duplicated_packet(struct session_manager_r } else { - packet_filter_add(sess_mgr_rte->dup_pkt_filter, pkt, sess_mgr_rte->now_ms); + packet_dabloom_add(sess_mgr_rte->dup_pkt_dab, pkt, sess_mgr_rte->now_ms); return 0; } } @@ -514,7 +514,7 @@ static void session_manager_rte_evicte_session(struct session_manager_rte *sess_ session_table_del(sess_mgr_rte->udp_table, sess); if (sess_mgr_rte->cfg.evicted_session_bloom_filter.enable) { - session_filter_add(sess_mgr_rte->evc_sess_filter, session_get_tuple6(sess), sess_mgr_rte->now_ms); + session_dabloom_add(sess_mgr_rte->evc_sess_dab, session_get_tuple6(sess), sess_mgr_rte->now_ms); } SESS_MGR_STAT_UPDATE(&sess_mgr_rte->stat, curr_state, next_state, udp); sess_mgr_rte->stat.udp_sess_evicted++; @@ -609,7 +609,7 @@ static struct session *session_manager_rte_new_tcp_session(struct session_manage if (sess_mgr_rte->cfg.duplicated_packet_bloom_filter.enable) { - packet_filter_add(sess_mgr_rte->dup_pkt_filter, pkt, sess_mgr_rte->now_ms); + packet_dabloom_add(sess_mgr_rte->dup_pkt_dab, pkt, sess_mgr_rte->now_ms); } SESS_MGR_STAT_INC(&sess_mgr_rte->stat, next_state, tcp); @@ -783,20 +783,20 @@ struct session_manager_rte *session_manager_rte_new(const struct session_manager } if (sess_mgr_rte->cfg.evicted_session_bloom_filter.enable) { - sess_mgr_rte->evc_sess_filter = session_filter_new(sess_mgr_rte->cfg.evicted_session_bloom_filter.capacity, - sess_mgr_rte->cfg.evicted_session_bloom_filter.time_window_ms, - sess_mgr_rte->cfg.evicted_session_bloom_filter.error_rate, now_ms); - if (sess_mgr_rte->evc_sess_filter == NULL) + sess_mgr_rte->evc_sess_dab = session_dabloom_new(sess_mgr_rte->cfg.evicted_session_bloom_filter.capacity, + sess_mgr_rte->cfg.evicted_session_bloom_filter.time_window_ms, + sess_mgr_rte->cfg.evicted_session_bloom_filter.error_rate, now_ms); + if (sess_mgr_rte->evc_sess_dab == NULL) { goto error; } } if (sess_mgr_rte->cfg.duplicated_packet_bloom_filter.enable) { - sess_mgr_rte->dup_pkt_filter = packet_filter_new(sess_mgr_rte->cfg.duplicated_packet_bloom_filter.capacity, - sess_mgr_rte->cfg.duplicated_packet_bloom_filter.time_window_ms, - sess_mgr_rte->cfg.duplicated_packet_bloom_filter.error_rate, now_ms); - if (sess_mgr_rte->dup_pkt_filter == NULL) + sess_mgr_rte->dup_pkt_dab = packet_dabloom_new(sess_mgr_rte->cfg.duplicated_packet_bloom_filter.capacity, + sess_mgr_rte->cfg.duplicated_packet_bloom_filter.time_window_ms, + sess_mgr_rte->cfg.duplicated_packet_bloom_filter.error_rate, now_ms); + if (sess_mgr_rte->dup_pkt_dab == NULL) { goto error; } @@ -842,11 +842,11 @@ void session_manager_rte_free(struct session_manager_rte *sess_mgr_rte) } if (sess_mgr_rte->cfg.evicted_session_bloom_filter.enable) { - session_filter_free(sess_mgr_rte->evc_sess_filter); + session_dabloom_free(sess_mgr_rte->evc_sess_dab); } if (sess_mgr_rte->cfg.duplicated_packet_bloom_filter.enable) { - packet_filter_free(sess_mgr_rte->dup_pkt_filter); + packet_dabloom_free(sess_mgr_rte->dup_pkt_dab); } snowflake_free(sess_mgr_rte->sf); session_timer_free(sess_mgr_rte->sess_timer); @@ -1113,7 +1113,7 @@ uint64_t session_manager_rte_clean_session(struct session_manager_rte *sess_mgr_ return cleaned_sess_num; } -uint64_t session_manager_rte_scan_session(struct session_manager_rte *sess_mgr_rte, const struct session_scan_opts *opts, uint64_t mached_sess_id[], uint64_t array_size) +uint64_t session_manager_rte_scan_session(struct session_manager_rte *sess_mgr_rte, const struct session_filter *filter, uint64_t mached_sess_id[], uint64_t array_size) { uint64_t capacity = 0; uint64_t max_loop = 0; @@ -1121,22 +1121,22 @@ uint64_t session_manager_rte_scan_session(struct session_manager_rte *sess_mgr_r const struct session *sess = NULL; const struct tuple6 *tuple = NULL; - if (sess_mgr_rte == NULL || opts == NULL || mached_sess_id == NULL || array_size == 0) + if (sess_mgr_rte == NULL || filter == NULL || mached_sess_id == NULL || array_size == 0) { return mached_sess_num; } - if (opts->count == 0) + if (filter->count == 0) { return mached_sess_num; } capacity = sess_mgr_rte->cfg.tcp_session_max + sess_mgr_rte->cfg.udp_session_max; - if (opts->cursor >= capacity) + if (filter->cursor >= capacity) { return mached_sess_num; } - max_loop = MIN(capacity, opts->cursor + opts->count); - for (uint64_t i = opts->cursor; i < max_loop; i++) + max_loop = MIN(capacity, filter->cursor + filter->count); + for (uint64_t i = filter->cursor; i < max_loop; i++) { sess = session_pool_get0(sess_mgr_rte->sess_pool, i); tuple = session_get_tuple6(sess); @@ -1145,60 +1145,56 @@ uint64_t session_manager_rte_scan_session(struct session_manager_rte *sess_mgr_r continue; } - if ((opts->flags & SESSION_SCAN_TYPE) && opts->type != session_get_type(sess)) + if (filter->type && filter->type != session_get_type(sess)) { continue; } - if ((opts->flags & SESSION_SCAN_STATE) && opts->state != session_get_current_state(sess)) + if (filter->state && filter->state != session_get_current_state(sess)) { continue; } - if ((opts->flags & SESSION_SCAN_CREATE_TIME) && - (session_get_timestamp(sess, SESSION_TIMESTAMP_START) < opts->create_time_ms[0] || - session_get_timestamp(sess, SESSION_TIMESTAMP_START) > opts->create_time_ms[1])) + if (filter->sess_created_ts_in_ms && session_get_timestamp(sess, SESSION_TIMESTAMP_START) < filter->sess_created_ts_in_ms) { continue; } - if ((opts->flags & SESSION_SCAN_LASPKT_TIME) && - (session_get_timestamp(sess, SESSION_TIMESTAMP_LAST) < opts->laspkt_time_ms[0] || - session_get_timestamp(sess, SESSION_TIMESTAMP_LAST) > opts->laspkt_time_ms[1])) + if (filter->pkt_received_ts_in_ms && session_get_timestamp(sess, SESSION_TIMESTAMP_LAST) < filter->pkt_received_ts_in_ms) { continue; } - if ((opts->flags & SESSION_SCAN_SPORT) && opts->src_port != tuple->src_port) + if (filter->src_port && filter->src_port != tuple->src_port) { continue; } - if ((opts->flags & SESSION_SCAN_DPORT) && opts->dst_port != tuple->dst_port) + if (filter->dst_port && filter->dst_port != tuple->dst_port) { continue; } - if (opts->flags & SESSION_SCAN_SIP) + if (filter->src_family) { - if (opts->addr_family != tuple->addr_family) + if (filter->src_family != tuple->addr_family) { continue; } - if ((opts->addr_family == AF_INET) && !ipv4_in_range(&tuple->src_addr.v4, &opts->src_addr[0].v4, &opts->src_addr[1].v4)) + if ((filter->src_family == AF_INET) && !ipv4_in_range(&tuple->src_addr.v4, &filter->src_addr_range[0].v4, &filter->src_addr_range[1].v4)) { continue; } - if ((opts->addr_family == AF_INET6) && !ipv6_in_range(&tuple->src_addr.v6, &opts->src_addr[0].v6, &opts->src_addr[1].v6)) + if ((filter->src_family == AF_INET6) && !ipv6_in_range(&tuple->src_addr.v6, &filter->src_addr_range[0].v6, &filter->src_addr_range[1].v6)) { continue; } } - if (opts->flags & SESSION_SCAN_DIP) + if (filter->dst_family) { - if (opts->addr_family != tuple->addr_family) + if (filter->dst_family != tuple->addr_family) { continue; } - if ((opts->addr_family == AF_INET) && !ipv4_in_range(&tuple->dst_addr.v4, &opts->dst_addr[0].v4, &opts->dst_addr[1].v4)) + if ((filter->dst_family == AF_INET) && !ipv4_in_range(&tuple->dst_addr.v4, &filter->dst_addr_range[0].v4, &filter->dst_addr_range[1].v4)) { continue; } - if ((opts->addr_family == AF_INET6) && !ipv6_in_range(&tuple->dst_addr.v6, &opts->dst_addr[0].v6, &opts->dst_addr[1].v6)) + if ((filter->dst_family == AF_INET6) && !ipv6_in_range(&tuple->dst_addr.v6, &filter->dst_addr_range[0].v6, &filter->dst_addr_range[1].v6)) { continue; } @@ -1211,7 +1207,7 @@ uint64_t session_manager_rte_scan_session(struct session_manager_rte *sess_mgr_r } } - SESSION_MANAGER_LOG_DEBUG("session scan => cursor: %lu, count: %lu, mached_sess_num: %lu", opts->cursor, opts->count, mached_sess_num); + SESSION_MANAGER_LOG_INFO("session scan => cursor: %lu, count: %lu, mached_sess_num: %lu", filter->cursor, filter->count, mached_sess_num); return mached_sess_num; } @@ -1219,7 +1215,7 @@ void session_manager_rte_record_duplicated_packet(struct session_manager_rte *se { if (sess_mgr_rte->cfg.duplicated_packet_bloom_filter.enable) { - packet_filter_add(sess_mgr_rte->dup_pkt_filter, pkt, sess_mgr_rte->now_ms); + packet_dabloom_add(sess_mgr_rte->dup_pkt_dab, pkt, sess_mgr_rte->now_ms); } } diff --git a/infra/session_manager/session_manager_rte.h b/infra/session_manager/session_manager_rte.h index f04dceb..8adae88 100644 --- a/infra/session_manager/session_manager_rte.h +++ b/infra/session_manager/session_manager_rte.h @@ -9,37 +9,24 @@ extern "C" #include "stellar/session.h" #include "session_manager_cfg.h" -enum session_scan_flags +struct session_filter { - SESSION_SCAN_TYPE = 1 << 0, - SESSION_SCAN_STATE = 1 << 1, - SESSION_SCAN_SIP = 1 << 2, - SESSION_SCAN_DIP = 1 << 3, - SESSION_SCAN_SPORT = 1 << 4, - SESSION_SCAN_DPORT = 1 << 5, - SESSION_SCAN_CREATE_TIME = 1 << 6, - SESSION_SCAN_LASPKT_TIME = 1 << 7, -}; + uint64_t cursor; + uint64_t count; + uint64_t limit; -struct session_scan_opts -{ - // required - uint32_t flags; - uint32_t cursor; - uint32_t count; - - // optional enum session_type type; enum session_state state; - uint32_t addr_family; // AF_INET or AF_INET6 - union ip_address src_addr[2]; // network byte order - union ip_address dst_addr[2]; // network byte order - uint16_t src_port; // network byte order - uint16_t dst_port; // network byte order + uint32_t src_family; // AF_INET or AF_INET6 + uint32_t dst_family; // AF_INET or AF_INET6 + union ip_address src_addr_range[2]; // network byte order + union ip_address dst_addr_range[2]; // network byte order + uint16_t src_port; // network byte order + uint16_t dst_port; // network byte order - uint64_t create_time_ms[2]; // session create time range - uint64_t laspkt_time_ms[2]; // last packet time range + uint64_t sess_created_ts_in_ms; + uint64_t pkt_received_ts_in_ms; }; struct session_manager_rte; @@ -57,7 +44,7 @@ struct session *session_manager_rte_get_expired_session(struct session_manager_r struct session *session_manager_rte_get_evicted_session(struct session_manager_rte *sess_mgr_rte); uint64_t session_manager_rte_clean_session(struct session_manager_rte *sess_mgr_rte, uint64_t now_ms, struct session *cleaned_sess_ptr[], uint64_t array_size); -uint64_t session_manager_rte_scan_session(struct session_manager_rte *sess_mgr_rte, const struct session_scan_opts *opts, uint64_t mached_sess_id[], uint64_t array_size); +uint64_t session_manager_rte_scan_session(struct session_manager_rte *sess_mgr_rte, const struct session_filter *filter, uint64_t mached_sess_id[], uint64_t array_size); void session_manager_rte_record_duplicated_packet(struct session_manager_rte *sess_mgr_rte, const struct packet *pkt); diff --git a/infra/session_manager/session_monitor.c b/infra/session_manager/session_monitor.c new file mode 100644 index 0000000..5fe68a0 --- /dev/null +++ b/infra/session_manager/session_monitor.c @@ -0,0 +1,1064 @@ +#include +#include +#include + +#include "tuple.h" +#include "log_internal.h" +#include "utils_internal.h" +#include "session_internal.h" +#include "session_manager.h" +#include "session_manager_cfg.h" +#include "session_manager_rte.h" +#include "session_manager_stat.h" + +#include "sds/sds.h" +#include "monitor/monitor_rpc.h" +#include "stellar/monitor.h" +#include "stellar/module.h" + +#pragma GCC diagnostic ignored "-Wunused-parameter" +#pragma GCC diagnostic ignored "-Wunused-function" + +#define DISPLAY_SESSION_DEFAULT_COUNT 10 +#define DISPLAY_SESSION_MAX_COUNT 1000 +#define SCAN_SESSION_DEFAULT_COUNT 1000 +#define SESSION_MONITOR_MODULE_NAME "session_monitor_module" + +#define SESSION_MONITOR_LOG_FATAL(format, ...) STELLAR_LOG_FATAL(__thread_local_logger, "session monitor", format, ##__VA_ARGS__) +#define SESSION_MONITOR_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "session monitor", format, ##__VA_ARGS__) + +struct session_monitor +{ + uint64_t thread_num; + uint64_t capacity; // per thread session table capacity + + struct logger *logger; + struct session_manager *manager; + struct stellar_monitor *monitor; +}; + +/****************************************************************************** + * parse utils + ******************************************************************************/ + +/* + * https://docs.paloaltonetworks.com/pan-os/11-1/pan-os-cli-quick-start/cli-command-hierarchy/pan-os-11-1-cli-ops-command-hierarchy + * https://docs.paloaltonetworks.com/content/dam/techdocs/en_US/pdf/eol/pan-os-panorama-50-cli.pdf + * + * Syntax + * + * show session + * { + * all | + * { + * thread | + * cursor | + * count | + * state | + * type | + * saddr | + * daddr | + * sport | + * dport | + * stime | + * ptime + * } + * id + * { + * thread + * } + * info + * { + * thread + * } + * } + */ + +struct cmd_opts +{ + uint64_t sess_id; + int thread_id; // -1 for all + struct session_filter filter; +}; + +enum pcode +{ + PCODE_OK, + PCODE_HELP, + PCODE_ERR_INVALID_ARGS, + PCODE_ERR_INVALID_SESSID, + PCODE_ERR_INVALID_THREAD, + PCODE_ERR_INVALID_CURSOR, + PCODE_ERR_INVALID_COUNT, + PCODE_ERR_INVALID_DISPLAY, + PCODE_ERR_INVALID_TYPE, + PCODE_ERR_INVALID_STATE, + PCODE_ERR_INVALID_SPORT, + PCODE_ERR_INVALID_DPORT, + PCODE_ERR_INVALID_SADDR, + PCODE_ERR_INVALID_DADDR, + PCODE_ERR_INVALID_STIME, + PCODE_ERR_INVALID_PTIME, +}; + +typedef enum pcode parse_func(struct cmd_opts *opts, const char *val, void *ctx); + +struct parser +{ + const char *key; + parse_func *parse; +}; + +static const char *pcode_to_str(enum pcode code) +{ + switch (code) + { + case PCODE_OK: + return "ok"; + case PCODE_HELP: + return "help"; + case PCODE_ERR_INVALID_ARGS: + return "invalid args"; + case PCODE_ERR_INVALID_SESSID: + return "invalid session id"; + case PCODE_ERR_INVALID_THREAD: + return "invalid thread id"; + case PCODE_ERR_INVALID_CURSOR: + return "invalid cursor"; + case PCODE_ERR_INVALID_COUNT: + return "invalid count"; + case PCODE_ERR_INVALID_DISPLAY: + return "invalid display"; + case PCODE_ERR_INVALID_TYPE: + return "invalid type"; + case PCODE_ERR_INVALID_STATE: + return "invalid state"; + case PCODE_ERR_INVALID_SPORT: + return "invalid sport"; + case PCODE_ERR_INVALID_DPORT: + return "invalid dport"; + case PCODE_ERR_INVALID_SADDR: + return "invalid saddr"; + case PCODE_ERR_INVALID_DADDR: + return "invalid daddr"; + case PCODE_ERR_INVALID_STIME: + return "invalid stime"; + case PCODE_ERR_INVALID_PTIME: + return "invalid ptime"; + default: + return "unknown"; + } +} + +static int is_digit_str(const char *val) +{ + for (size_t i = 0; i < strlen(val); i++) + { + if (!isdigit(val[i])) + { + return 0; + } + } + + return 1; +} + +// return 0 on success +// return -1 on failure +// val format: N[s|m|h|d] +static int time_to_ms(const char *val, uint64_t *ts) +{ + if (val == NULL) + { + return -1; + } + + int len = strlen(val); + if (len > 21) // 18446744073709551615 in string + { + return -1; + } + + char buff[32] = {0}; + memcpy(buff, val, len); + char unit = buff[len - 1]; + if (unit != 's' && unit != 'm' && unit != 'h' && unit != 'd') + { + return -1; + } + + buff[len - 1] = '\0'; + if (is_digit_str(buff) == 0) + { + return -1; + } + + if (atoll(buff) < 1) + { + return -1; + } + + switch (unit) + { + case 's': + *ts = atoll(buff) * 1000; + break; + case 'm': + *ts = atoll(buff) * 1000 * 60; + break; + case 'h': + *ts = atoll(buff) * 1000 * 60 * 60; + break; + case 'd': + *ts = atoll(buff) * 1000 * 60 * 60 * 24; + break; + default: + return -1; + } + + return 0; +} + +// return AF_INET/AF_INET6 on success +// return 0 on failure +// val format: [/mask] +static int cidr_to_range(const char *val, union ip_address addr[2]) +{ + int mask = 0; + char buff[64] = {0}; + char *delimit = NULL; + uint32_t ip4_addr = {0}; + uint32_t ip4_mask = {0}; + struct in6_addr ip6_addr = {0}; + struct in6_addr ip6_mask = {0}; + + if (val == NULL) + { + return 0; + } + + memcpy(buff, val, strlen(val)); + delimit = strchr(buff, '/'); + if (delimit == NULL) + { + if (inet_pton(AF_INET, buff, &ip4_addr) == 1) + { + memcpy(&addr[0], &ip4_addr, sizeof(struct in_addr)); + memcpy(&addr[1], &ip4_addr, sizeof(struct in_addr)); + return AF_INET; + } + + if (inet_pton(AF_INET6, buff, &ip6_addr) == 1) + { + memcpy(&addr[0], &ip6_addr, sizeof(struct in6_addr)); + memcpy(&addr[1], &ip6_addr, sizeof(struct in6_addr)); + return AF_INET6; + } + } + else + { + *delimit = '\0'; + delimit++; + if (!is_digit_str(delimit)) + { + return 0; + } + + mask = atoi(delimit); + if (inet_pton(AF_INET, buff, &ip4_addr) == 1) + { + if (mask <= 0 || mask > 32) + { + return 0; + } + + for (int i = 0; i < mask; i++) + { + ip4_mask |= (uint32_t)1 << (31 - i); + } + ip4_mask = ntohl(ip4_mask); + + addr[0].v4.s_addr = (uint32_t)(ip4_addr & ip4_mask); + addr[1].v4.s_addr = (uint32_t)((ip4_addr & ip4_mask) | ~ip4_mask); + + return AF_INET; + } + + if (inet_pton(AF_INET6, buff, &ip6_addr) == 1) + { + if (mask <= 0 || mask > 128) + { + return 0; + } + + for (int i = 0; i < mask; i++) + { + ip6_mask.s6_addr[i / 8] |= 1 << (7 - i % 8); + } + + for (int i = 0; i < 16; i++) + { + addr[0].v6.s6_addr[i] = (ip6_addr.s6_addr[i] & ip6_mask.s6_addr[i]); + addr[1].v6.s6_addr[i] = (ip6_addr.s6_addr[i] & ip6_mask.s6_addr[i]) | ~ip6_mask.s6_addr[i]; + } + + return AF_INET6; + } + } + + return 0; +} + +static enum pcode parse_help(struct cmd_opts *opts, const char *val, void *ctx) +{ + return PCODE_HELP; +} + +static enum pcode parse_id(struct cmd_opts *opts, const char *val, void *ctx) +{ + if (val == NULL || is_digit_str(val) == 0) + { + return PCODE_ERR_INVALID_SESSID; + } + + opts->sess_id = atoll(val); + return PCODE_OK; +} + +static enum pcode parse_thread(struct cmd_opts *opts, const char *val, void *ctx) +{ + struct session_monitor *mnt = (struct session_monitor *)ctx; + + if (val == NULL) + { + return PCODE_ERR_INVALID_THREAD; + } + + if (strcasecmp(val, "all") == 0) + { + opts->thread_id = -1; + return PCODE_OK; + } + + if (is_digit_str(val) == 0) + { + return PCODE_ERR_INVALID_THREAD; + } + + if (atoll(val) < 0 || (uint64_t)atoll(val) >= mnt->thread_num) + { + return PCODE_ERR_INVALID_THREAD; + } + + opts->thread_id = atoll(val); + return PCODE_OK; +} + +static enum pcode parse_cursor(struct cmd_opts *opts, const char *val, void *ctx) +{ + struct session_monitor *mnt = (struct session_monitor *)ctx; + + if (val == NULL || is_digit_str(val) == 0) + { + return PCODE_ERR_INVALID_CURSOR; + } + + if ((uint64_t)atoll(val) >= mnt->capacity) + { + return PCODE_ERR_INVALID_CURSOR; + } + + opts->filter.cursor = atoll(val); + return PCODE_OK; +} + +static enum pcode parse_count(struct cmd_opts *opts, const char *val, void *ctx) +{ + struct session_monitor *mnt = (struct session_monitor *)ctx; + + if (val == NULL || is_digit_str(val) == 0) + { + return PCODE_ERR_INVALID_COUNT; + } + + if (atoll(val) < 1 || (uint64_t)atoll(val) > mnt->capacity) + { + return PCODE_ERR_INVALID_COUNT; + } + + opts->filter.count = atoll(val); + return PCODE_OK; +} + +static enum pcode parse_display(struct cmd_opts *opts, const char *val, void *ctx) +{ + if (val == NULL || is_digit_str(val) == 0) + { + return PCODE_ERR_INVALID_COUNT; + } + + if (atoll(val) < 1 || atoll(val) > DISPLAY_SESSION_MAX_COUNT) + { + return PCODE_ERR_INVALID_COUNT; + } + + opts->filter.limit = atoll(val); + return PCODE_OK; +} + +static enum pcode parse_type(struct cmd_opts *opts, const char *val, void *ctx) +{ + if (val == NULL) + { + return PCODE_ERR_INVALID_TYPE; + } + + if (strcasecmp(val, "tcp") == 0) + { + opts->filter.type = SESSION_TYPE_TCP; + return PCODE_OK; + } + else if (strcasecmp(val, "udp") == 0) + { + opts->filter.type = SESSION_TYPE_UDP; + return PCODE_OK; + } + else + { + return PCODE_ERR_INVALID_TYPE; + } +} + +static enum pcode parse_state(struct cmd_opts *opts, const char *val, void *ctx) +{ + if (val == NULL) + { + return PCODE_ERR_INVALID_STATE; + } + + if (strcasecmp(val, "opening") == 0) + { + opts->filter.state = SESSION_STATE_OPENING; + return PCODE_OK; + } + else if (strcasecmp(val, "active") == 0) + { + opts->filter.state = SESSION_STATE_ACTIVE; + return PCODE_OK; + } + else if (strcasecmp(val, "closing") == 0) + { + opts->filter.state = SESSION_STATE_CLOSING; + return PCODE_OK; + } + else if (strcasecmp(val, "discard") == 0) + { + opts->filter.state = SESSION_STATE_DISCARD; + return PCODE_OK; + } + else if (strcasecmp(val, "closed") == 0) + { + opts->filter.state = SESSION_STATE_CLOSED; + return PCODE_OK; + } + else + { + return PCODE_ERR_INVALID_STATE; + } +} + +static enum pcode parse_sport(struct cmd_opts *opts, const char *val, void *ctx) +{ + if (val == NULL || is_digit_str(val) == 0) + { + return PCODE_ERR_INVALID_SPORT; + } + + if (atoi(val) < 1 || atoi(val) > 65535) + { + return PCODE_ERR_INVALID_SPORT; + } + + opts->filter.src_port = htons(atoi(val)); + return PCODE_OK; +} + +static enum pcode parse_dport(struct cmd_opts *opts, const char *val, void *ctx) +{ + if (val == NULL || is_digit_str(val) == 0) + { + return PCODE_ERR_INVALID_DPORT; + } + + if (atoi(val) < 1 || atoi(val) > 65535) + { + return PCODE_ERR_INVALID_DPORT; + } + + opts->filter.dst_port = htons(atoi(val)); + return PCODE_OK; +} + +static enum pcode parse_sadd(struct cmd_opts *opts, const char *val, void *ctx) +{ + if (val == NULL) + { + return PCODE_ERR_INVALID_SADDR; + } + + opts->filter.src_family = cidr_to_range(val, opts->filter.src_addr_range); + if (opts->filter.src_family != AF_INET && opts->filter.src_family != AF_INET6) + { + return PCODE_ERR_INVALID_SADDR; + } + + return PCODE_OK; +} + +static enum pcode parse_dadd(struct cmd_opts *opts, const char *val, void *ctx) +{ + if (val == NULL) + { + return PCODE_ERR_INVALID_DADDR; + } + + opts->filter.dst_family = cidr_to_range(val, opts->filter.dst_addr_range); + if (opts->filter.dst_family != AF_INET && opts->filter.dst_family != AF_INET6) + { + return PCODE_ERR_INVALID_DADDR; + } + + return PCODE_OK; +} + +static enum pcode parse_stime(struct cmd_opts *opts, const char *val, void *ctx) +{ + if (val == NULL) + { + return PCODE_ERR_INVALID_STIME; + } + + uint64_t ts_ms; + uint64_t now_ms = clock_get_real_time_ms(); + if (time_to_ms(val, &ts_ms) != 0) + { + return PCODE_ERR_INVALID_STIME; + } + + opts->filter.sess_created_ts_in_ms = now_ms - ts_ms; + return PCODE_OK; +} + +static enum pcode parse_ptime(struct cmd_opts *opts, const char *val, void *ctx) +{ + if (val == NULL) + { + return PCODE_ERR_INVALID_PTIME; + } + + uint64_t ts_ms; + uint64_t now_ms = clock_get_real_time_ms(); + if (time_to_ms(val, &ts_ms) != 0) + { + return PCODE_ERR_INVALID_PTIME; + } + + opts->filter.pkt_received_ts_in_ms = now_ms - ts_ms; + return PCODE_OK; +} + +static enum pcode cmd_opts_parse(struct parser parses[], int max_parser, struct cmd_opts *opts, int argc, char **argv, void *ctx) +{ + memset(opts, 0, sizeof(struct cmd_opts)); + opts->thread_id = 0; + opts->filter.cursor = 0; + opts->filter.count = SCAN_SESSION_DEFAULT_COUNT; + opts->filter.limit = DISPLAY_SESSION_DEFAULT_COUNT; + + int j = 0; + for (int i = 0; i < argc; i = i + 2) + { + char *key = argv[i]; + char *val = i + 1 < argc ? argv[i + 1] : NULL; + + if (val == NULL) + { + return PCODE_ERR_INVALID_ARGS; + } + + for (j = 0; j < max_parser; j++) + { + if (strcasecmp(parses[j].key, key) != 0) + { + continue; + } + + enum pcode ret = parses[j].parse(opts, val, ctx); + if (ret == PCODE_OK) + { + break; + } + else + { + return ret; + } + } + if (j == max_parser) + { + return PCODE_ERR_INVALID_ARGS; + } + } + + return PCODE_OK; +} + +/****************************************************************************** + * session monitor + ******************************************************************************/ + +static char *show_session_id_usage(struct session_monitor *mnt) +{ + static char buff[2048] = {0}; + snprintf(buff, sizeof(buff), "Usage: show session id [options]\n" + "Options:\n" + " help -- Display help info\n" + " thread -- Thread index [0, %lu] or 'all', default: 0\n", + mnt->thread_num - 1); + + return buff; +} + +static char *show_session_all_usage(struct session_monitor *mnt) +{ + static char buff[2048] = {0}; + snprintf(buff, sizeof(buff), "Usage: show session all [options]\n" + "Options:\n" + " help -- Display help info\n" + " thread -- Thread index [0, %lu] or 'all', default: 0\n" + " cursor -- Start from the cursor [0, %lu], default: 0\n" + " count -- Scan N sessions [1, %lu], default: %d\n" + " display -- Display N matched sessions [1, %d], default: %d\n" + " state -- Session state\n" + " type -— Session type\n" + " saddr -- Source IP address\n" + " daddr -- Destination IP address\n" + " sport -- Source port [1, 65535]\n" + " dport -- Destination port [1, 65535]\n" + " stime -- Session created in the last N seconds/minutes/hours/days [1, 2^64-1]\n" + " ptime -- Packet received in the last N seconds/minutes/hours/days [1, 2^64-1]\n", + mnt->thread_num - 1, + mnt->capacity - 1, + mnt->capacity, SCAN_SESSION_DEFAULT_COUNT, + DISPLAY_SESSION_MAX_COUNT, DISPLAY_SESSION_DEFAULT_COUNT); + + return buff; +} + +static char *show_session_info_usage(struct session_monitor *mnt) +{ + static char buff[2048] = {0}; + snprintf(buff, sizeof(buff), "Usage: show session info [options]\n" + "Options:\n" + " help -- Display help info\n" + " thread -- Thread index [0, %lu] or 'all', default: 0\n", + mnt->thread_num - 1); + + return buff; +} + +struct show_sess_id_ctx +{ + uint64_t sess_id; + struct session matched_sess; + uint64_t find; +}; + +struct show_sess_all_ctx +{ + struct session_filter filter; + uint64_t matched_ids[DISPLAY_SESSION_MAX_COUNT]; + uint64_t size; + uint64_t used; +}; + +struct show_sess_info_ctx +{ + struct session_manager_stat stat; +}; + +static struct iovec worker_thread_lookup_session(int thread_idx, struct iovec req, void *args) +{ + struct session_monitor *mnt = (struct session_monitor *)args; + struct show_sess_id_ctx *ctx = (struct show_sess_id_ctx *)req.iov_base; + struct session_manager_rte *rte = session_manager_get_rte(mnt->manager, thread_idx); + if (rte == NULL) + { + return req; + } + + struct session *sess = session_manager_rte_lookup_session_by_id(rte, ctx->sess_id); + if (sess) + { + memcpy(&ctx->matched_sess, sess, sizeof(struct session)); + ctx->find = 1; + } + + return req; +} + +static struct iovec worker_thread_scan_session(int thread_idx, struct iovec req, void *args) +{ + struct session_monitor *mnt = (struct session_monitor *)args; + struct show_sess_all_ctx *ctx = (struct show_sess_all_ctx *)req.iov_base; + struct session_manager_rte *rte = session_manager_get_rte(mnt->manager, thread_idx); + if (rte == NULL) + { + return req; + } + + ctx->used += session_manager_rte_scan_session(rte, &ctx->filter, &ctx->matched_ids[ctx->used], ctx->size - ctx->used); + + return req; +} + +static struct iovec worker_thread_stat_session(int thread_idx, struct iovec req, void *args) +{ + struct session_monitor *mnt = (struct session_monitor *)args; + struct show_sess_info_ctx *ctx = (struct show_sess_info_ctx *)req.iov_base; + struct session_manager_rte *rte = session_manager_get_rte(mnt->manager, thread_idx); + if (rte == NULL) + { + return req; + } + + ctx->stat = *session_manager_rte_get_stat(rte); + + return req; +} + +static sds append_session_detail(sds ss, const struct session *sess, int thread_id) +{ + ss = sdscatprintf(ss, "thread[%d]\n", thread_id); + ss = sdscatprintf(ss, " id : %ld\n", session_get_id(sess)); + ss = sdscatprintf(ss, " type : %s\n", session_type_to_str(session_get_type(sess))); + ss = sdscatprintf(ss, " state : %s\n", session_state_to_str(session_get_current_state(sess))); + ss = sdscatprintf(ss, " tuple : %s\n", session_get_readable_addr(sess)); + ss = sdscatprintf(ss, " session created timestamp : %ld (ms)\n", session_get_timestamp(sess, SESSION_TIMESTAMP_START)); + ss = sdscatprintf(ss, " last packet received timestamp : %ld (ms)\n", session_get_timestamp(sess, SESSION_TIMESTAMP_LAST)); + ss = sdscatprintf(ss, " C2S received packets : %ld\n", session_get_stat(sess, FLOW_TYPE_C2S, STAT_RAW_PACKETS_RECEIVED)); + ss = sdscatprintf(ss, " S2C received packets : %ld\n", session_get_stat(sess, FLOW_TYPE_S2C, STAT_RAW_PACKETS_RECEIVED)); + ss = sdscatprintf(ss, " C2S received bytes : %ld\n", session_get_stat(sess, FLOW_TYPE_C2S, STAT_RAW_BYTES_RECEIVED)); + ss = sdscatprintf(ss, " S2C received bytes : %ld\n", session_get_stat(sess, FLOW_TYPE_S2C, STAT_RAW_BYTES_RECEIVED)); + return ss; +} + +static sds append_session_brief(sds ss, const struct session *sess, int thread_id) +{ + if (sdslen(ss) == 0) + { + ss = sdscatprintf(ss, " %5s", "thread"); + ss = sdscatprintf(ss, " %12s", "session_id"); + ss = sdscatprintf(ss, " %8s", "type"); + ss = sdscatprintf(ss, " %6s", "state"); + ss = sdscatprintf(ss, " %40s", "saddr sport"); + ss = sdscatprintf(ss, " %40s", "daddr dport"); + ss = sdscatprintf(ss, " %8s\n", "domain"); + ss = sdscatprintf(ss, "--------------------------------------------------------------------------------------------------------------------------------------\n"); + } + char src_addr[INET6_ADDRSTRLEN] = {0}; + char dst_addr[INET6_ADDRSTRLEN] = {0}; + const struct tuple6 *tuple = session_get_tuple6(sess); + if (tuple->addr_family == AF_INET) + { + inet_ntop(AF_INET, &tuple->src_addr.v4, src_addr, INET6_ADDRSTRLEN); + inet_ntop(AF_INET, &tuple->dst_addr.v4, dst_addr, INET6_ADDRSTRLEN); + } + else + { + inet_ntop(AF_INET6, &tuple->src_addr.v6, src_addr, INET6_ADDRSTRLEN); + inet_ntop(AF_INET6, &tuple->dst_addr.v6, dst_addr, INET6_ADDRSTRLEN); + } + ss = sdscatprintf(ss, " %3d", thread_id); + ss = sdscatprintf(ss, " %18ld", session_get_id(sess)); + ss = sdscatprintf(ss, " %4s", session_type_to_str(session_get_type(sess))); + ss = sdscatprintf(ss, " %8s", session_state_to_str(session_get_current_state(sess))); + ss = sdscatprintf(ss, " %33s %5d", src_addr, ntohs(tuple->src_port)); + ss = sdscatprintf(ss, " %33s %5d", dst_addr, ntohs(tuple->dst_port)); + ss = sdscatprintf(ss, " %8ld\n", tuple->domain); + + return ss; +} + +static sds append_session_stat(sds ss, const struct session_manager_stat *stat, int thread_id) +{ + ss = sdscatprintf(ss, "thread[%d]\n", thread_id); + ss = sdscatprintf(ss, " history_tcp_sessions : %ld\n", stat->history_tcp_sessions); + ss = sdscatprintf(ss, " tcp_sess_used : %ld\n", stat->tcp_sess_used); + ss = sdscatprintf(ss, " tcp_sess_opening : %ld\n", stat->tcp_sess_opening); + ss = sdscatprintf(ss, " tcp_sess_active : %ld\n", stat->tcp_sess_active); + ss = sdscatprintf(ss, " tcp_sess_closing : %ld\n", stat->tcp_sess_closing); + ss = sdscatprintf(ss, " tcp_sess_discard : %ld\n", stat->tcp_sess_discard); + ss = sdscatprintf(ss, " tcp_sess_closed : %ld\n\n", stat->tcp_sess_closed); + ss = sdscatprintf(ss, " history_udp_sessions : %ld\n", stat->history_udp_sessions); + ss = sdscatprintf(ss, " udp_sess_used : %ld\n", stat->udp_sess_used); + ss = sdscatprintf(ss, " udp_sess_opening : %ld\n", stat->udp_sess_opening); + ss = sdscatprintf(ss, " udp_sess_active : %ld\n", stat->udp_sess_active); + ss = sdscatprintf(ss, " udp_sess_closing : %ld\n", stat->udp_sess_closing); + ss = sdscatprintf(ss, " udp_sess_discard : %ld\n", stat->udp_sess_discard); + ss = sdscatprintf(ss, " udp_sess_closed : %ld\n\n", stat->udp_sess_closed); + ss = sdscatprintf(ss, " tcp_sess_evicted : %ld\n", stat->tcp_sess_evicted); + ss = sdscatprintf(ss, " udp_sess_evicted : %ld\n\n", stat->udp_sess_evicted); + return ss; +} + +static sds rpc_show_session_id(struct session_monitor *mnt, int thread_id, uint64_t sess_id, sds ss) +{ + struct show_sess_id_ctx lookup_ctx = {.sess_id = sess_id, .matched_sess = {0}, .find = 0}; + struct iovec lookup_ctx_iov = {.iov_base = &lookup_ctx, .iov_len = sizeof(struct show_sess_id_ctx)}; + monitor_worker_thread_rpc(mnt->monitor, thread_id, lookup_ctx_iov, worker_thread_lookup_session, mnt); + + if (lookup_ctx.find == 1) + { + ss = append_session_detail(ss, &lookup_ctx.matched_sess, thread_id); + } + + return ss; +} + +static sds rpc_show_session_all(struct session_monitor *mnt, int thread_id, struct show_sess_all_ctx *scan_ctx, sds ss) +{ + struct iovec scan_ctx_iov = {.iov_base = scan_ctx, .iov_len = sizeof(struct show_sess_all_ctx)}; + uint64_t start = scan_ctx->used; + monitor_worker_thread_rpc(mnt->monitor, thread_id, scan_ctx_iov, worker_thread_scan_session, mnt); + + for (uint64_t j = start; j < scan_ctx->used; j++) + { + struct show_sess_id_ctx lookup_ctx = {.sess_id = scan_ctx->matched_ids[j], .matched_sess = {0}, .find = 0}; + struct iovec lookup_ctx_iov = {.iov_base = &lookup_ctx, .iov_len = sizeof(struct show_sess_id_ctx)}; + monitor_worker_thread_rpc(mnt->monitor, thread_id, lookup_ctx_iov, worker_thread_lookup_session, mnt); + + if (lookup_ctx.find == 1) + { + ss = append_session_brief(ss, &lookup_ctx.matched_sess, thread_id); + } + } + + return ss; +} + +static sds rpc_show_session_info(struct session_monitor *mnt, int thread_id, sds ss) +{ + struct show_sess_info_ctx ctx = {0}; + struct iovec ctx_iov = {.iov_base = &ctx, .iov_len = sizeof(struct show_sess_info_ctx)}; + monitor_worker_thread_rpc(mnt->monitor, thread_id, ctx_iov, worker_thread_stat_session, mnt); + ss = append_session_stat(ss, &ctx.stat, thread_id); + + return ss; +} + +static struct monitor_reply *monitor_show_session_id(struct stellar_monitor *monitor, int argc, char *argv[], void *ctx) +{ + struct session_monitor *mnt = (struct session_monitor *)ctx; + + // show session id [options] + if (argc < 3 || strcasecmp(argv[0], "show") != 0 || strcasecmp(argv[1], "session") != 0 || strcasecmp(argv[2], "id") != 0) + { + return monitor_reply_new_string(show_session_id_usage(mnt)); + } + + struct parser parsers[] = { + {"help", parse_help}, + {"id", parse_id}, + {"thread", parse_thread}, + }; + struct cmd_opts opts; + enum pcode ret = cmd_opts_parse(parsers, sizeof(parsers) / sizeof(parsers[0]), &opts, argc - 2, argv + 2, mnt); + if (ret == PCODE_HELP) + { + return monitor_reply_new_string(show_session_id_usage(mnt)); + } + if (ret != PCODE_OK) + { + return monitor_reply_new_string("%s\n%s", pcode_to_str(ret), show_session_id_usage(mnt)); + } + + sds ss = sdsempty(); + if (opts.thread_id == -1) // all thread + { + for (uint64_t i = 0; i < mnt->thread_num; i++) + { + ss = rpc_show_session_id(mnt, i, opts.sess_id, ss); + if (sdslen(ss)) + { + break; + } + } + } + else + { + ss = rpc_show_session_id(mnt, opts.thread_id, opts.sess_id, ss); + } + + if (sdslen(ss) == 0) + { + ss = sdscatprintf(ss, "no found"); + } + + struct monitor_reply *reply = monitor_reply_new_string(ss); + sdsfree(ss); + + return reply; +} + +static struct monitor_reply *monitor_show_session_all(struct stellar_monitor *monitor, int argc, char *argv[], void *ctx) +{ + struct session_monitor *mnt = (struct session_monitor *)ctx; + + // show session all [options] + if (argc < 3 || strcasecmp(argv[0], "show") != 0 || strcasecmp(argv[1], "session") != 0 || strcasecmp(argv[2], "all") != 0) + { + return monitor_reply_new_string(show_session_all_usage(mnt)); + } + + struct parser parsers[] = { + {"help", parse_help}, + {"thread", parse_thread}, + {"cursor", parse_cursor}, + {"count", parse_count}, + {"display", parse_display}, + {"type", parse_type}, + {"state", parse_state}, + {"sport", parse_sport}, + {"dport", parse_dport}, + {"saddr", parse_sadd}, + {"daddr", parse_dadd}, + {"stime", parse_stime}, + {"ptime", parse_ptime}, + }; + + struct cmd_opts opts; + enum pcode ret = cmd_opts_parse(parsers, sizeof(parsers) / sizeof(parsers[0]), &opts, argc - 3, argv + 3, mnt); + if (ret == PCODE_HELP || ret == PCODE_ERR_INVALID_ARGS) + { + return monitor_reply_new_string(show_session_all_usage(mnt)); + } + if (ret != PCODE_OK) + { + return monitor_reply_new_string("%s\n%s", pcode_to_str(ret), show_session_all_usage(mnt)); + } + + sds ss = sdsempty(); + struct show_sess_all_ctx scan_ctx = {.filter = opts.filter, .matched_ids = {0}, .size = opts.filter.limit, .used = 0}; + if (opts.thread_id == -1) // all thread + { + for (uint16_t i = 0; i < mnt->thread_num; i++) + { + ss = rpc_show_session_all(mnt, i, &scan_ctx, ss); + if (scan_ctx.used >= scan_ctx.size) + { + break; + } + } + } + else + { + ss = rpc_show_session_all(mnt, opts.thread_id, &scan_ctx, ss); + } + struct monitor_reply *reply = monitor_reply_new_string(ss); + sdsfree(ss); + + return reply; +} + +static struct monitor_reply *monitor_show_session_info(struct stellar_monitor *monitor, int argc, char *argv[], void *ctx) +{ + struct session_monitor *mnt = (struct session_monitor *)ctx; + + // show session info [options] + if (argc < 3 || strcasecmp(argv[0], "show") != 0 || strcasecmp(argv[1], "session") != 0 || strcasecmp(argv[2], "info") != 0) + { + return monitor_reply_new_string(show_session_info_usage(mnt)); + } + + struct parser parsers[] = { + {"help", parse_help}, + {"thread", parse_thread}, + }; + struct cmd_opts opts; + enum pcode ret = cmd_opts_parse(parsers, sizeof(parsers) / sizeof(parsers[0]), &opts, argc - 3, argv + 3, mnt); + if (ret != PCODE_OK) + { + return monitor_reply_new_string(show_session_info_usage(mnt)); + } + + sds ss = sdsempty(); + if (opts.thread_id == -1) // all thread + { + for (uint16_t i = 0; i < mnt->thread_num; i++) + { + ss = rpc_show_session_info(mnt, i, ss); + } + } + else + { + ss = rpc_show_session_info(mnt, opts.thread_id, ss); + } + + struct monitor_reply *reply = monitor_reply_new_string(ss); + sdsfree(ss); + + return reply; +} + +struct module *session_monitor_on_init(struct module_manager *mod_mgr) +{ + assert(mod_mgr); + struct session_manager *manager = module_to_session_manager(module_manager_get_module(mod_mgr, SESSION_MANAGER_MODULE_NAME)); + assert(manager); + struct stellar_monitor *monitor = monitor_module_to_monitor(module_manager_get_module(mod_mgr, MONITOR_MODULE_NAME)); + assert(monitor); + struct logger *logger = module_manager_get_logger(mod_mgr); + assert(logger); + int thread_num = module_manager_get_max_thread_num(mod_mgr); + + struct session_monitor *mnt = (struct session_monitor *)calloc(1, sizeof(struct session_monitor)); + if (mnt == NULL) + { + SESSION_MONITOR_LOG_ERROR("failed to alloc session_monitor"); + return NULL; + } + + struct session_manager_cfg *cfg = session_manager_get_cfg(manager); + mnt->thread_num = thread_num; + mnt->capacity = cfg->tcp_session_max + cfg->udp_session_max; + mnt->monitor = monitor; + mnt->manager = manager; + mnt->logger = logger; + + monitor_register_cmd(mnt->monitor, "show session id", monitor_show_session_id, "readonly", "", "", mnt); + monitor_register_cmd(mnt->monitor, "show session all", monitor_show_session_all, "readonly", "", "", mnt); + monitor_register_cmd(mnt->monitor, "show session info", monitor_show_session_info, "readonly", "", "", mnt); + + struct module *sess_mnt_mod = module_new(SESSION_MONITOR_MODULE_NAME, NULL); + if (sess_mnt_mod == NULL) + { + SESSION_MONITOR_LOG_ERROR("failed to create session_monitor"); + free(mnt); + return NULL; + } + module_set_ctx(sess_mnt_mod, mnt); + + SESSION_MONITOR_LOG_FATAL("session_monitor init"); + return sess_mnt_mod; +} + +void session_monitor_on_exit(struct module_manager *mod_mgr, struct module *mod) +{ + if (mod) + { + struct session_monitor *mnt = module_get_ctx(mod); + free(mnt); + module_free(mod); + SESSION_MONITOR_LOG_FATAL("session_monitor exit"); + } +} \ No newline at end of file diff --git a/infra/session_manager/test/gtest_sess_mgr_scan.cpp b/infra/session_manager/test/gtest_sess_mgr_scan.cpp index f3300b9..d6e68b4 100644 --- a/infra/session_manager/test/gtest_sess_mgr_scan.cpp +++ b/infra/session_manager/test/gtest_sess_mgr_scan.cpp @@ -108,224 +108,333 @@ TEST(SESS_MGR_SCAN, OPTS) sess = session_manager_rte_new_session(sess_mgr_rte, &pkt, 4); EXPECT_TRUE(sess); - struct session_scan_opts scan = {}; - // scan.flags = SESSION_SCAN_TYPE | SESSION_SCAN_STATE | SESSION_SCAN_SIP | SESSION_SCAN_DIP | SESSION_SCAN_SPORT | SESSION_SCAN_DPORT | SESSION_SCAN_CREATE_TIME | SESSION_SCAN_LASPKT_TIME; - scan.cursor = 0; - scan.count = 1460; + struct session_filter filter = {}; - scan.laspkt_time_ms[0] = 0; - scan.laspkt_time_ms[1] = UINT64_MAX; + /************************************************************************** + * scan session type + **************************************************************************/ - // SESSION_SCAN_TYPE - scan.flags = SESSION_SCAN_TYPE; + // TCP + memset(&filter, 0, sizeof(filter)); + filter.cursor = 0; + filter.count = 1460; + filter.type = SESSION_TYPE_TCP; - scan.type = SESSION_TYPE_TCP; - mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &scan, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); + mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &filter, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); EXPECT_TRUE(mached_sess_num == 3); - mached_session_print("SESSION_SCAN_TYPE: (TCP)", sess_mgr_rte, mached_sess_id, mached_sess_num); + mached_session_print("scan session type: TCP", sess_mgr_rte, mached_sess_id, mached_sess_num); - scan.type = SESSION_TYPE_UDP; - mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &scan, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); + // UDP + memset(&filter, 0, sizeof(filter)); + filter.cursor = 0; + filter.count = 1460; + filter.type = SESSION_TYPE_UDP; + + mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &filter, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); EXPECT_TRUE(mached_sess_num == 1); - mached_session_print("SESSION_SCAN_TYPE: (UDP)", sess_mgr_rte, mached_sess_id, mached_sess_num); + mached_session_print("scan session type: UDP", sess_mgr_rte, mached_sess_id, mached_sess_num); - // SESSION_SCAN_STATE - scan.flags = SESSION_SCAN_STATE; + /************************************************************************** + * scan session state + **************************************************************************/ - scan.state = SESSION_STATE_OPENING; - mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &scan, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); + // OPENING + memset(&filter, 0, sizeof(filter)); + filter.cursor = 0; + filter.count = 1460; + filter.state = SESSION_STATE_OPENING; + + mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &filter, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); EXPECT_TRUE(mached_sess_num == 4); - mached_session_print("SESSION_SCAN_STATE: (OPENING)", sess_mgr_rte, mached_sess_id, mached_sess_num); + mached_session_print("scan session state: OPENING", sess_mgr_rte, mached_sess_id, mached_sess_num); - scan.state = SESSION_STATE_ACTIVE; - mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &scan, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); + // ACTIVE + memset(&filter, 0, sizeof(filter)); + filter.cursor = 0; + filter.count = 1460; + filter.state = SESSION_STATE_ACTIVE; + + mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &filter, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); EXPECT_TRUE(mached_sess_num == 0); - mached_session_print("SESSION_SCAN_STATE: (ACTIVE)", sess_mgr_rte, mached_sess_id, mached_sess_num); + mached_session_print("scan session state: ACTIVE", sess_mgr_rte, mached_sess_id, mached_sess_num); - scan.state = SESSION_STATE_CLOSING; - mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &scan, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); + // CLOSING + memset(&filter, 0, sizeof(filter)); + filter.cursor = 0; + filter.count = 1460; + filter.state = SESSION_STATE_CLOSING; + + mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &filter, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); EXPECT_TRUE(mached_sess_num == 0); - mached_session_print("SESSION_SCAN_STATE: (CLOSING)", sess_mgr_rte, mached_sess_id, mached_sess_num); + mached_session_print("scan session state: CLOSING", sess_mgr_rte, mached_sess_id, mached_sess_num); - scan.state = SESSION_STATE_DISCARD; - mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &scan, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); + // DISCARD + memset(&filter, 0, sizeof(filter)); + filter.cursor = 0; + filter.count = 1460; + filter.state = SESSION_STATE_DISCARD; + + mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &filter, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); EXPECT_TRUE(mached_sess_num == 0); - mached_session_print("SESSION_SCAN_STATE: (DISCARD)", sess_mgr_rte, mached_sess_id, mached_sess_num); + mached_session_print("scan session state: DISCARD", sess_mgr_rte, mached_sess_id, mached_sess_num); - scan.state = SESSION_STATE_CLOSED; - mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &scan, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); + // CLOSED + memset(&filter, 0, sizeof(filter)); + filter.cursor = 0; + filter.count = 1460; + filter.state = SESSION_STATE_CLOSED; + + mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &filter, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); EXPECT_TRUE(mached_sess_num == 0); - mached_session_print("SESSION_SCAN_STATE: (CLOSED)", sess_mgr_rte, mached_sess_id, mached_sess_num); + mached_session_print("scan session state: CLOSED", sess_mgr_rte, mached_sess_id, mached_sess_num); - // SESSION_SCAN_SIP - scan.flags = SESSION_SCAN_SIP; + /************************************************************************** + * scan source address + **************************************************************************/ - scan.addr_family = AF_INET; - scan.src_addr[0].v4 = v4_src_addr1; - scan.src_addr[1].v4 = v4_src_addr1; - mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &scan, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); + // IPv4 + memset(&filter, 0, sizeof(filter)); + filter.cursor = 0; + filter.count = 1460; + filter.src_family = AF_INET; + filter.src_addr_range[0].v4 = v4_src_addr1; + filter.src_addr_range[1].v4 = v4_src_addr1; + + mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &filter, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); EXPECT_TRUE(mached_sess_num == 1); - mached_session_print("SESSION_SCAN_SIP: (IPv4)", sess_mgr_rte, mached_sess_id, mached_sess_num); + mached_session_print("scan source address: IPv4", sess_mgr_rte, mached_sess_id, mached_sess_num); - scan.addr_family = AF_INET; - scan.src_addr[0].v4 = v4_src_subnet_beg; - scan.src_addr[1].v4 = v4_src_subnet_end; - mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &scan, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); + // IPv4 subnet + memset(&filter, 0, sizeof(filter)); + filter.cursor = 0; + filter.count = 1460; + filter.src_family = AF_INET; + filter.src_addr_range[0].v4 = v4_src_subnet_beg; + filter.src_addr_range[1].v4 = v4_src_subnet_end; + + mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &filter, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); EXPECT_TRUE(mached_sess_num == 3); - mached_session_print("SESSION_SCAN_SIP: (IPv4 SUBNET)", sess_mgr_rte, mached_sess_id, mached_sess_num); + mached_session_print("scan source address: IPv4 subnet", sess_mgr_rte, mached_sess_id, mached_sess_num); - scan.addr_family = AF_INET; - scan.src_addr[0].v4 = v4_min_addr; - scan.src_addr[1].v4 = v4_max_addr; - mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &scan, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); + // IPv4 min max + memset(&filter, 0, sizeof(filter)); + filter.cursor = 0; + filter.count = 1460; + filter.src_family = AF_INET; + filter.src_addr_range[0].v4 = v4_min_addr; + filter.src_addr_range[1].v4 = v4_max_addr; + + mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &filter, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); EXPECT_TRUE(mached_sess_num == 3); - mached_session_print("SESSION_SCAN_SIP: (IPv4 MIN MAX)", sess_mgr_rte, mached_sess_id, mached_sess_num); + mached_session_print("scan source address: IPv4 min max", sess_mgr_rte, mached_sess_id, mached_sess_num); - scan.addr_family = AF_INET6; - memcpy(&scan.src_addr[0].v6, &v6_src_addr, sizeof(v6_src_addr)); - memcpy(&scan.src_addr[1].v6, &v6_src_addr, sizeof(v6_src_addr)); - mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &scan, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); + // IPv6 + memset(&filter, 0, sizeof(filter)); + filter.cursor = 0; + filter.count = 1460; + filter.src_family = AF_INET6; + memcpy(&filter.src_addr_range[0].v6, &v6_src_addr, sizeof(v6_src_addr)); + memcpy(&filter.src_addr_range[1].v6, &v6_src_addr, sizeof(v6_src_addr)); + + mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &filter, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); EXPECT_TRUE(mached_sess_num == 1); - mached_session_print("SESSION_SCAN_SIP: (IPv6)", sess_mgr_rte, mached_sess_id, mached_sess_num); + mached_session_print("scan source address: IPv6", sess_mgr_rte, mached_sess_id, mached_sess_num); - scan.addr_family = AF_INET6; - memcpy(&scan.src_addr[0].v6, &v6_src_subnet_beg, sizeof(v6_src_subnet_beg)); - memcpy(&scan.src_addr[1].v6, &v6_src_subnet_end, sizeof(v6_src_subnet_end)); - mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &scan, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); + // IPv6 subnet + memset(&filter, 0, sizeof(filter)); + filter.cursor = 0; + filter.count = 1460; + filter.src_family = AF_INET6; + memcpy(&filter.src_addr_range[0].v6, &v6_src_subnet_beg, sizeof(v6_src_subnet_beg)); + memcpy(&filter.src_addr_range[1].v6, &v6_src_subnet_end, sizeof(v6_src_subnet_end)); + + mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &filter, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); EXPECT_TRUE(mached_sess_num == 1); - mached_session_print("SESSION_SCAN_SIP: (IPv6 SUBNET)", sess_mgr_rte, mached_sess_id, mached_sess_num); + mached_session_print("scan source address: IPv6 subnet", sess_mgr_rte, mached_sess_id, mached_sess_num); - scan.addr_family = AF_INET6; - memcpy(&scan.src_addr[0].v6, &v6_min_addr, sizeof(v6_min_addr)); - memcpy(&scan.src_addr[1].v6, &v6_max_addr, sizeof(v6_max_addr)); - mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &scan, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); + // IPv6 min max + memset(&filter, 0, sizeof(filter)); + filter.cursor = 0; + filter.count = 1460; + filter.src_family = AF_INET6; + memcpy(&filter.src_addr_range[0].v6, &v6_min_addr, sizeof(v6_min_addr)); + memcpy(&filter.src_addr_range[1].v6, &v6_max_addr, sizeof(v6_max_addr)); + + mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &filter, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); EXPECT_TRUE(mached_sess_num == 1); - mached_session_print("SESSION_SCAN_SIP: (IPv6 MIN MAX)", sess_mgr_rte, mached_sess_id, mached_sess_num); + mached_session_print("scan source address: IPv6 min max", sess_mgr_rte, mached_sess_id, mached_sess_num); - // SESSION_SCAN_DIP - scan.flags = SESSION_SCAN_DIP; + /************************************************************************** + * scan destination address + **************************************************************************/ - scan.addr_family = AF_INET; - scan.dst_addr[0].v4 = v4_dst_addr; - scan.dst_addr[1].v4 = v4_dst_addr; - mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &scan, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); + // IPv4 + memset(&filter, 0, sizeof(filter)); + filter.cursor = 0; + filter.count = 1460; + filter.dst_family = AF_INET; + filter.dst_addr_range[0].v4 = v4_dst_addr; + filter.dst_addr_range[1].v4 = v4_dst_addr; + + mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &filter, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); EXPECT_TRUE(mached_sess_num == 3); - mached_session_print("SESSION_SCAN_DIP: (IPv4)", sess_mgr_rte, mached_sess_id, mached_sess_num); + mached_session_print("scan destination address: IPv4", sess_mgr_rte, mached_sess_id, mached_sess_num); - scan.addr_family = AF_INET; - scan.dst_addr[0].v4 = v4_dst_subnet_beg; - scan.dst_addr[1].v4 = v4_dst_subnet_end; - mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &scan, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); + // IPv4 subnet + memset(&filter, 0, sizeof(filter)); + filter.cursor = 0; + filter.count = 1460; + filter.dst_family = AF_INET; + filter.dst_addr_range[0].v4 = v4_dst_subnet_beg; + filter.dst_addr_range[1].v4 = v4_dst_subnet_end; + + mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &filter, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); EXPECT_TRUE(mached_sess_num == 3); - mached_session_print("SESSION_SCAN_DIP: (IPv4 SUBNET)", sess_mgr_rte, mached_sess_id, mached_sess_num); + mached_session_print("scan destination address: IPv4 subnet", sess_mgr_rte, mached_sess_id, mached_sess_num); - scan.addr_family = AF_INET; - scan.dst_addr[0].v4 = v4_min_addr; - scan.dst_addr[1].v4 = v4_max_addr; - mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &scan, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); + // IPv4 min max + memset(&filter, 0, sizeof(filter)); + filter.cursor = 0; + filter.count = 1460; + filter.dst_family = AF_INET; + filter.dst_addr_range[0].v4 = v4_min_addr; + filter.dst_addr_range[1].v4 = v4_max_addr; + + mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &filter, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); EXPECT_TRUE(mached_sess_num == 3); - mached_session_print("SESSION_SCAN_DIP: (IPv4 MIN MAX)", sess_mgr_rte, mached_sess_id, mached_sess_num); + mached_session_print("scan destination address: IPv4 min max", sess_mgr_rte, mached_sess_id, mached_sess_num); - scan.addr_family = AF_INET6; - memcpy(&scan.dst_addr[0].v6, &v6_dst_addr, sizeof(v6_dst_addr)); - memcpy(&scan.dst_addr[1].v6, &v6_dst_addr, sizeof(v6_dst_addr)); - mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &scan, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); + // IPv6 + memset(&filter, 0, sizeof(filter)); + filter.cursor = 0; + filter.count = 1460; + filter.dst_family = AF_INET6; + memcpy(&filter.dst_addr_range[0].v6, &v6_dst_addr, sizeof(v6_dst_addr)); + memcpy(&filter.dst_addr_range[1].v6, &v6_dst_addr, sizeof(v6_dst_addr)); + + mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &filter, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); EXPECT_TRUE(mached_sess_num == 1); - mached_session_print("SESSION_SCAN_DIP: (IPv6)", sess_mgr_rte, mached_sess_id, mached_sess_num); + mached_session_print("scan destination address: IPv6", sess_mgr_rte, mached_sess_id, mached_sess_num); - scan.addr_family = AF_INET6; - memcpy(&scan.dst_addr[0].v6, &v6_dst_subnet_beg, sizeof(v6_dst_subnet_beg)); - memcpy(&scan.dst_addr[1].v6, &v6_dst_subnet_end, sizeof(v6_dst_subnet_end)); - mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &scan, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); + // IPv6 subnet + memset(&filter, 0, sizeof(filter)); + filter.cursor = 0; + filter.count = 1460; + filter.dst_family = AF_INET6; + memcpy(&filter.dst_addr_range[0].v6, &v6_dst_subnet_beg, sizeof(v6_dst_subnet_beg)); + memcpy(&filter.dst_addr_range[1].v6, &v6_dst_subnet_end, sizeof(v6_dst_subnet_end)); + + mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &filter, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); EXPECT_TRUE(mached_sess_num == 1); - mached_session_print("SESSION_SCAN_DIP: (IPv6 SUBNET)", sess_mgr_rte, mached_sess_id, mached_sess_num); + mached_session_print("scan destination address: IPv6 subnet", sess_mgr_rte, mached_sess_id, mached_sess_num); - scan.addr_family = AF_INET6; - memcpy(&scan.dst_addr[0].v6, &v6_min_addr, sizeof(v6_min_addr)); - memcpy(&scan.dst_addr[1].v6, &v6_max_addr, sizeof(v6_max_addr)); - mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &scan, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); + // IPv6 min max + memset(&filter, 0, sizeof(filter)); + filter.cursor = 0; + filter.count = 1460; + filter.dst_family = AF_INET6; + memcpy(&filter.dst_addr_range[0].v6, &v6_min_addr, sizeof(v6_min_addr)); + memcpy(&filter.dst_addr_range[1].v6, &v6_max_addr, sizeof(v6_max_addr)); + + mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &filter, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); EXPECT_TRUE(mached_sess_num == 1); - mached_session_print("SESSION_SCAN_DIP: (IPv6 MIN MAX)", sess_mgr_rte, mached_sess_id, mached_sess_num); + mached_session_print("scan destination address: IPv6 min max", sess_mgr_rte, mached_sess_id, mached_sess_num); - // SESSION_SCAN_SPORT - scan.flags = SESSION_SCAN_SPORT; + /************************************************************************** + * scan source port + **************************************************************************/ - scan.src_port = htons(60111); - mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &scan, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); + // hit + memset(&filter, 0, sizeof(filter)); + filter.cursor = 0; + filter.count = 1460; + filter.src_port = htons(60111); + + mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &filter, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); EXPECT_TRUE(mached_sess_num == 3); - mached_session_print("SESSION_SCAN_SPORT: (HIT)", sess_mgr_rte, mached_sess_id, mached_sess_num); + mached_session_print("scan source port: hit", sess_mgr_rte, mached_sess_id, mached_sess_num); - scan.src_port = htons(60110); - mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &scan, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); + // miss + memset(&filter, 0, sizeof(filter)); + filter.cursor = 0; + filter.count = 1460; + filter.src_port = htons(60110); + + mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &filter, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); EXPECT_TRUE(mached_sess_num == 0); - mached_session_print("SESSION_SCAN_SPORT: (MISS)", sess_mgr_rte, mached_sess_id, mached_sess_num); + mached_session_print("scan source port: miss", sess_mgr_rte, mached_sess_id, mached_sess_num); - // SESSION_SCAN_DPORT - scan.flags = SESSION_SCAN_DPORT; + /************************************************************************** + * scan destination port + **************************************************************************/ - scan.dst_port = htons(80); - mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &scan, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); + // hit + memset(&filter, 0, sizeof(filter)); + filter.cursor = 0; + filter.count = 1460; + filter.dst_port = htons(80); + + mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &filter, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); EXPECT_TRUE(mached_sess_num == 3); - mached_session_print("SESSION_SCAN_DPORT: (HIT)", sess_mgr_rte, mached_sess_id, mached_sess_num); + mached_session_print("scan destination port: hit", sess_mgr_rte, mached_sess_id, mached_sess_num); - scan.dst_port = htons(81); - mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &scan, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); + // miss + memset(&filter, 0, sizeof(filter)); + filter.cursor = 0; + filter.count = 1460; + filter.dst_port = htons(81); + + mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &filter, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); EXPECT_TRUE(mached_sess_num == 0); - mached_session_print("SESSION_SCAN_DPORT: (MISS)", sess_mgr_rte, mached_sess_id, mached_sess_num); + mached_session_print("scan destination port: miss", sess_mgr_rte, mached_sess_id, mached_sess_num); - // SESSION_SCAN_CREATE_TIME - scan.flags = SESSION_SCAN_CREATE_TIME; + /************************************************************************** + * scan session create time + **************************************************************************/ - scan.create_time_ms[0] = 0; - scan.create_time_ms[1] = UINT64_MAX; - mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &scan, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); + // hit + memset(&filter, 0, sizeof(filter)); + filter.cursor = 0; + filter.count = 1460; + filter.sess_created_ts_in_ms = 1; + + mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &filter, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); EXPECT_TRUE(mached_sess_num == 4); - mached_session_print("SESSION_SCAN_CREATE_TIME: (HIT)", sess_mgr_rte, mached_sess_id, mached_sess_num); + mached_session_print("scan session create time: hit", sess_mgr_rte, mached_sess_id, mached_sess_num); - scan.create_time_ms[0] = 1; - scan.create_time_ms[1] = 2; - mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &scan, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); - EXPECT_TRUE(mached_sess_num == 2); - mached_session_print("SESSION_SCAN_CREATE_TIME: (HIT)", sess_mgr_rte, mached_sess_id, mached_sess_num); + // miss + memset(&filter, 0, sizeof(filter)); + filter.cursor = 0; + filter.count = 1460; + filter.sess_created_ts_in_ms = 5; - scan.create_time_ms[0] = 0; - scan.create_time_ms[1] = 0; - mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &scan, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); + mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &filter, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); EXPECT_TRUE(mached_sess_num == 0); - mached_session_print("SESSION_SCAN_CREATE_TIME: (MISS)", sess_mgr_rte, mached_sess_id, mached_sess_num); + mached_session_print("scan session create time: miss", sess_mgr_rte, mached_sess_id, mached_sess_num); - scan.create_time_ms[0] = UINT64_MAX; - scan.create_time_ms[1] = UINT64_MAX; - mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &scan, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); - EXPECT_TRUE(mached_sess_num == 0); - mached_session_print("SESSION_SCAN_CREATE_TIME: (MISS)", sess_mgr_rte, mached_sess_id, mached_sess_num); + /************************************************************************** + * scan last packet receive time + **************************************************************************/ - // SESSION_SCAN_LASPKT_TIME - scan.flags = SESSION_SCAN_LASPKT_TIME; + // hit + memset(&filter, 0, sizeof(filter)); + filter.cursor = 0; + filter.count = 1460; + filter.pkt_received_ts_in_ms = 1; - scan.laspkt_time_ms[0] = 0; - scan.laspkt_time_ms[1] = UINT64_MAX; - mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &scan, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); + mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &filter, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); EXPECT_TRUE(mached_sess_num == 4); - mached_session_print("SESSION_SCAN_LASPKT_TIME: (HIT)", sess_mgr_rte, mached_sess_id, mached_sess_num); + mached_session_print("scan last packet receive time: hit", sess_mgr_rte, mached_sess_id, mached_sess_num); - scan.laspkt_time_ms[0] = 1; - scan.laspkt_time_ms[1] = 2; - mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &scan, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); - EXPECT_TRUE(mached_sess_num == 2); - mached_session_print("SESSION_SCAN_LASPKT_TIME: (HIT)", sess_mgr_rte, mached_sess_id, mached_sess_num); + // miss + memset(&filter, 0, sizeof(filter)); + filter.cursor = 0; + filter.count = 1460; + filter.pkt_received_ts_in_ms = 5; - scan.laspkt_time_ms[0] = 0; - scan.laspkt_time_ms[1] = 0; - mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &scan, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); + mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &filter, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); EXPECT_TRUE(mached_sess_num == 0); - mached_session_print("SESSION_SCAN_LASPKT_TIME: (MISS)", sess_mgr_rte, mached_sess_id, mached_sess_num); - - scan.laspkt_time_ms[0] = UINT64_MAX; - scan.laspkt_time_ms[1] = UINT64_MAX; - mached_sess_num = session_manager_rte_scan_session(sess_mgr_rte, &scan, mached_sess_id, sizeof(mached_sess_id) / sizeof(mached_sess_id[0])); - EXPECT_TRUE(mached_sess_num == 0); - mached_session_print("SESSION_SCAN_LASPKT_TIME: (MISS)", sess_mgr_rte, mached_sess_id, mached_sess_num); + mached_session_print("scan last packet receive time: miss", sess_mgr_rte, mached_sess_id, mached_sess_num); session_manager_rte_free(sess_mgr_rte); } diff --git a/infra/version.map b/infra/version.map index 616f18b..385a2a2 100644 --- a/infra/version.map +++ b/infra/version.map @@ -50,6 +50,9 @@ global: session_manager_subscribe_control_packet; session_manager_subscribe_tcp_stream; + session_monitor_on_init; + session_monitor_on_exit; + exdata_*; mq_*; module_*;