remove session_manager_subscribe_xxx() API; convert session closing events to pseudo packets

when a session is closed, use the packet manager to create a pseudo packet,
    set the session to be closed as packet Exdata, and schedule it to the packet forwarding stage.
    when the pseudo packet free, the session will be free.
This commit is contained in:
luwenpeng
2024-11-18 18:33:19 +08:00
parent 746d7fca06
commit c2e9275a9e
25 changed files with 210 additions and 378 deletions

View File

@@ -89,4 +89,5 @@ add_subdirectory(tools)
add_subdirectory(test)
install(DIRECTORY DESTINATION log COMPONENT PROGRAM)
install(DIRECTORY DESTINATION metrics COMPONENT PROGRAM)
install(DIRECTORY DESTINATION module COMPONENT PROGRAM)

View File

@@ -7,6 +7,7 @@
dev_symbol = "nf_0_fw"
pcap_path = "/tmp/test.pcap"
pcap_done_exit = 1 # range: [0, 1]
pcap_queue_size = 1 # range: [1, 4294967295]
thread_num = 1 # range: [1, 256]
cpu_mask = [5, 6, 7, 8, 9, 10, 11, 12]
idle_yield_ms = 900 # range: [0, 60000] (ms)

View File

@@ -1,4 +1,4 @@
add_subdirectory(lpi_plus)
#add_subdirectory(lpi_plus)
#add_subdirectory(http)
#add_subdirectory(socks)
#add_subdirectory(stratum)

View File

@@ -227,6 +227,8 @@ struct packet *packet_manager_build_udp_packet(struct packet_manager *pkt_mgr, c
const char *udp_payload, uint16_t udp_payload_len);
struct packet *packet_manager_build_l3_packet(struct packet_manager *pkt_mgr, const struct packet *origin_pkt,
uint8_t ip_proto, const char *l3_payload, uint16_t l3_payload_len);
struct packet *packet_manager_dup_packet(struct packet_manager *pkt_mgr, const struct packet *origin_pkt);
void packet_manager_free_packet(struct packet_manager *pkt_mgr, struct packet *pkt);
#ifdef __cplusplus
}

View File

@@ -130,7 +130,6 @@ int session_has_duplicate_traffic(const struct session *sess);
enum session_type session_get_type(const struct session *sess);
enum session_state session_get_current_state(const struct session *sess);
const struct packet *session_get_current_packet(const struct session *sess);
enum closing_reason session_get_closing_reason(const struct session *sess);
enum session_direction session_get_direction(const struct session *sess);
@@ -152,15 +151,7 @@ struct session_manager;
struct session_manager *module_to_session_manager(struct module *mod);
int session_manager_new_session_exdata_index(struct session_manager *sess_mgr, const char *name, exdata_free *func, void *arg);
// When the state is SESSION_STATE_CLOSED, the packet is NULL, and the session will be destroyed.
typedef void on_session_message_callback(struct session *sess, enum session_state state, struct packet *pkt, void *args);
// When the state is SESSION_STATE_CLOSED, the tcp_payload is NULL, and the session will be destroyed.
typedef void on_tcp_payload_callback(struct session *sess, enum session_state state, const char *tcp_payload, uint32_t tcp_payload_len, void *args);
int session_manager_subscribe_tcp(struct session_manager *sess_mgr, on_session_message_callback *cb, void *args);
int session_manager_subscribe_udp(struct session_manager *sess_mgr, on_session_message_callback *cb, void *args);
int session_manager_subscribe_control_packet(struct session_manager *sess_mgr, on_session_message_callback *cb, void *args);
int session_manager_subscribe_tcp_stream(struct session_manager *sess_mgr, on_tcp_payload_callback *cb, void *args);
struct session *packet_exdata_to_session(struct packet *pkt);
#ifdef __cplusplus
}

View File

@@ -1,6 +1,6 @@
set(INFRA exdata mq tuple packet_manager packet_io ip_reassembly tcp_reassembly session_manager module_manager monitor)
set(DEPS bitmap dablooms interval_tree logger nmx_pool rbtree timeout toml ringbuf)
set(DECODERS lpi_plus)
#set(DECODERS lpi_plus)
set(WHOLE_ARCHIVE ${DEPS} ${INFRA} ${DECODERS})
set(LIBS fieldstat4)

View File

@@ -161,7 +161,7 @@ struct packet_io *packet_io_new(const char *toml_file)
PACKET_IO_LOG_ERROR("failed to create fieldstat_easy");
goto error_out;
}
if (fieldstat_easy_enable_auto_output(pkt_io->fs, "packet_io.fs4", 2) != 0)
if (fieldstat_easy_enable_auto_output(pkt_io->fs, "metrics/packet_io.json", 2) != 0)
{
PACKET_IO_LOG_ERROR("failed to enable auto output for fieldstat_easy");
goto error_out;

View File

@@ -3,6 +3,7 @@
#include <limits.h>
#include <pthread.h>
#include <pcap/pcap.h>
#include <sys/prctl.h>
#include "pcap_io.h"
#include "packet_dump.h"
@@ -11,8 +12,6 @@
#include "packet_internal.h"
#include "utils_internal.h"
#define RING_BUFFER_MAX_SIZE (4096 * 1000)
struct pcap_pkt
{
char *data;
@@ -24,8 +23,9 @@ struct pcap_io_cfg
{
char mode[16]; // pcapfile, pcaplist
char pcap_path[PATH_MAX];
uint64_t pcap_done_exit; // range [0, 1]
uint64_t thread_num; // range [1, MAX_THREAD_NUM]
uint64_t pcap_done_exit; // range [0, 1]
uint64_t pcap_queue_size; // range [1, 4294967295]
uint64_t thread_num; // range [1, MAX_THREAD_NUM]
// packet pool
uint64_t capacity; // range: [1, 4294967295]
@@ -140,6 +140,7 @@ static struct pcap_io_cfg *pcap_io_cfg_new(const char *toml_file)
ret += load_toml_str_config(toml_file, "packet_io.mode", cfg->mode);
ret += load_toml_str_config(toml_file, "packet_io.pcap_path", cfg->pcap_path);
ret += load_toml_integer_config(toml_file, "packet_io.pcap_done_exit", &cfg->pcap_done_exit, 0, 1);
ret += load_toml_integer_config(toml_file, "packet_io.pcap_queue_size", &cfg->pcap_queue_size, 1, 4294967295);
ret += load_toml_integer_config(toml_file, "packet_io.thread_num", &cfg->thread_num, 1, MAX_THREAD_NUM);
ret += load_toml_integer_config(toml_file, "packet_io.packet_pool.capacity", &cfg->capacity, 1, 4294967295);
if (strcmp(cfg->mode, "pcapfile") != 0 && strcmp(cfg->mode, "pcaplist") != 0)
@@ -249,12 +250,23 @@ static int pcap_io_handler(struct pcap_io *pcap_io, const char *pcap_file)
static int all_packet_consumed(struct pcap_io *pcap_io)
{
uint64_t consumed_pkts = 0;
uint64_t total_tx_pkts = 0;
uint64_t total_dropped_pkts = 0;
uint64_t total_injected_pkts = 0;
uint64_t total_user_freed_pkts = 0;
uint64_t read_pcap_pkts = ATOMIC_READ(&pcap_io->read_pcap_pkts);
for (uint16_t i = 0; i < pcap_io->cfg->thread_num; i++)
{
consumed_pkts += ATOMIC_READ(&pcap_io->stat[i].pkts_rx);
total_tx_pkts += ATOMIC_READ(&pcap_io->stat[i].pkts_tx);
total_dropped_pkts += ATOMIC_READ(&pcap_io->stat[i].pkts_dropped);
total_injected_pkts += ATOMIC_READ(&pcap_io->stat[i].pkts_injected);
total_user_freed_pkts += ATOMIC_READ(&pcap_io->stat[i].pkts_user_freed);
}
if (consumed_pkts == read_pcap_pkts)
consumed_pkts = total_tx_pkts + total_dropped_pkts + total_user_freed_pkts - total_injected_pkts;
if (consumed_pkts >= read_pcap_pkts)
{
return 1;
}
@@ -269,6 +281,8 @@ static void *pcap_io_thread(void *arg)
struct pcap_io *pcap_io = (struct pcap_io *)arg;
__thread_local_logger = pcap_io->logger;
prctl(PR_SET_NAME, "stellar:pcap", NULL, NULL, NULL);
ATOMIC_SET(&pcap_io->io_thread_is_runing, 1);
PACKET_IO_LOG_FATAL("pcap io thread is running");
@@ -373,7 +387,7 @@ void *pcap_io_new(const char *toml_file)
for (uint16_t i = 0; i < pcap_io->cfg->thread_num; i++)
{
pcap_io->ring[i] = ring_buffer_new(RING_BUFFER_MAX_SIZE);
pcap_io->ring[i] = ring_buffer_new(pcap_io->cfg->pcap_queue_size);
if (pcap_io->ring[i] == NULL)
{
PACKET_IO_LOG_ERROR("unable to create ring buffer");

View File

@@ -5,4 +5,5 @@ include(GoogleTest)
gtest_discover_tests(gtest_packet_io)
file(COPY ./conf/ DESTINATION ./conf/)
file(COPY ./pcap/ DESTINATION ./pcap/)
file(COPY ./pcap/ DESTINATION ./pcap/)
file(COPY ./metrics/ DESTINATION ./metrics/)

View File

@@ -4,6 +4,7 @@
dev_symbol = "nf_0_fw"
pcap_path = "./pcap/IPv4_frags_UDP.pcap"
pcap_done_exit = 1 # range: [0, 1]
pcap_queue_size = 1024 # range: [1, 4294967295]
thread_num = 1 # range: [1, 256]
cpu_mask = [5, 6, 7, 8, 9, 10, 11, 12]
idle_yield_ms = 900 # range: [0, 60000] (ms)

View File

@@ -227,7 +227,7 @@ struct packet_manager *packet_manager_new(struct mq_schema *mq_sche, uint16_t th
PACKET_MANAGER_LOG_ERROR("failed to create fieldstat_easy");
goto error_out;
}
if (fieldstat_easy_enable_auto_output(pkt_mgr->fs, "packet_manager.fs4", 2) != 0)
if (fieldstat_easy_enable_auto_output(pkt_mgr->fs, "metrics/packet_manager.json", 2) != 0)
{
PACKET_MANAGER_LOG_ERROR("failed to enable auto output for fieldstat_easy");
goto error_out;
@@ -477,6 +477,20 @@ struct packet *packet_manager_build_l3_packet(struct packet_manager *pkt_mgr, co
return pkt;
}
struct packet *packet_manager_dup_packet(struct packet_manager *pkt_mgr, const struct packet *origin_pkt)
{
struct packet *pkt = packet_dup(origin_pkt);
if (pkt == NULL)
{
return NULL;
}
struct exdata_runtime *ex_rte = exdata_runtime_new(pkt_mgr->sche->ex_sche);
packet_set_user_data(pkt, ex_rte);
return pkt;
}
void packet_manager_free_packet(struct packet_manager *pkt_mgr __attribute__((unused)), struct packet *pkt)
{
if (pkt)
@@ -581,8 +595,9 @@ struct module *packet_manager_on_thread_init(struct module_manager *mod_mgr, int
void packet_manager_on_thread_exit(struct module_manager *mod_mgr __attribute__((unused)), int thread_id, struct module *mod)
{
struct packet_manager *pkt_mgr = module_get_ctx(mod);
assert(pkt_mgr);
assert(thread_id < pkt_mgr->thread_num);
packet_manager_clean(pkt_mgr, thread_id);
if (pkt_mgr)
{
assert(thread_id < pkt_mgr->thread_num);
packet_manager_clean(pkt_mgr, thread_id);
}
}

View File

@@ -84,4 +84,5 @@ gtest_discover_tests(gtest_packet_ldbc)
gtest_discover_tests(gtest_packet_pool)
gtest_discover_tests(gtest_packet_manager)
file(COPY ../../../conf/ DESTINATION ./conf/)
file(COPY ../../../conf/ DESTINATION ./conf/)
file(COPY ./metrics/ DESTINATION ./metrics/)

View File

@@ -48,7 +48,6 @@ struct session
uint64_t timestamps[MAX_TIMESTAMP]; // realtime msec
struct tcp_half tcp_halfs[MAX_FLOW_TYPE];
struct timeout timeout;
struct tcp_segment empty_seg;
TAILQ_ENTRY(session) lru_tqe;
TAILQ_ENTRY(session) free_tqe;
TAILQ_ENTRY(session) evc_tqe;
@@ -120,7 +119,7 @@ void session_set_first_packet(struct session *sess, enum flow_type type, const s
// const struct packet *session_get_first_packet(const struct session *sess, enum flow_type type);
void session_set_current_packet(struct session *sess, const struct packet *pkt);
// const struct packet *session_get_current_packet(const struct session *sess);
const struct packet *session_get_current_packet(const struct session *sess);
// int session_is_symmetric(const struct session *sess, unsigned char *flag);

View File

@@ -2,6 +2,7 @@
#include "utils_internal.h"
#include "session_internal.h"
#include "session_manager.h"
#include "session_manager_log.h"
#include "session_manager_cfg.h"
#include "session_manager_rte.h"
@@ -12,89 +13,36 @@
#pragma GCC diagnostic ignored "-Wunused-parameter"
struct session_manager_sche
{
int pkt_ex_id;
int sess_msg_id_tcp;
int sess_msg_id_udp;
int sess_msg_id_ctrl;
int sess_msg_id_stream;
struct mq_schema *mq_sche;
struct exdata_schema *ex_sche;
};
struct session_manager
{
int pkt_ex_to_get_sess;
int pkt_ex_to_free_sess;
struct exdata_schema *ex_sche;
int stat_idx[SESS_MGR_STAT_MAX];
struct fieldstat_easy *fs;
struct session_manager_cfg *cfg;
struct session_manager_sche *sche;
struct session_manager_rte *rte[MAX_THREAD_NUM];
struct mq_runtime *mq[MAX_THREAD_NUM];
struct module_manager *mod_mgr;
struct packet_manager *pkt_mgr;
};
__thread int __thread_pkt_ex_to_get_sess = 0;
/******************************************************************************
* session manager sche
******************************************************************************/
static void clean_closed_session(struct session_manager *sess_mgr, uint16_t thread_id, uint64_t now_ms)
static void free_session(int idx, void *ex_ptr, void *arg)
{
struct session *sess = NULL;
struct session *cleaned[CLEAN_SESSION_BURST] = {NULL};
struct mq_runtime *mq_rte = sess_mgr->mq[thread_id];
struct session_manager_rte *sess_mgr_rte = sess_mgr->rte[thread_id];
struct session *sess = (struct session *)ex_ptr;
struct session_manager *sess_mgr = (struct session_manager *)arg;
assert(idx == sess_mgr->pkt_ex_to_free_sess);
uint64_t used = session_manager_rte_clean_session(sess_mgr_rte, now_ms, cleaned, CLEAN_SESSION_BURST);
for (uint64_t i = 0; i < used; i++)
if (sess && session_get_current_state(sess) == SESSION_STATE_CLOSED)
{
sess = cleaned[i];
assert(session_get_current_state(sess) == SESSION_STATE_CLOSED);
session_set_current_packet(sess, NULL);
session_set_flow_type(sess, FLOW_TYPE_NONE);
if (session_get_type(sess) == SESSION_TYPE_TCP)
{
mq_runtime_publish_message(mq_rte, sess_mgr->sche->sess_msg_id_stream, &sess->empty_seg);
mq_runtime_publish_message(mq_rte, sess_mgr->sche->sess_msg_id_ctrl, sess);
mq_runtime_publish_message(mq_rte, sess_mgr->sche->sess_msg_id_tcp, sess);
}
else
{
mq_runtime_publish_message(mq_rte, sess_mgr->sche->sess_msg_id_ctrl, sess);
mq_runtime_publish_message(mq_rte, sess_mgr->sche->sess_msg_id_udp, sess);
}
}
}
static void on_sess_msg_dispatch(int sess_msg_id, void *msg, on_msg_cb_func *msg_cb, void *msg_cb_args, void *dispatch_args)
{
struct session *sess = (struct session *)msg;
struct packet *pkt = (struct packet *)session_get_current_packet(sess);
enum session_state state = session_get_current_state(sess);
((on_session_message_callback *)(void *)msg_cb)(sess, state, pkt, msg_cb_args);
}
static void on_tcp_payload_msg_dispatch(int sess_msg_id, void *msg, on_msg_cb_func *msg_cb, void *msg_cb_args, void *dispatch_args)
{
struct tcp_segment *seg = (struct tcp_segment *)msg;
struct session *sess = (struct session *)seg->user_data;
enum session_state state = session_get_current_state(sess);
((on_tcp_payload_callback *)(void *)msg_cb)(sess, state, seg->data, seg->len, msg_cb_args);
}
static void on_sess_msg_free(void *msg, void *args)
{
struct session *sess = (struct session *)msg;
if (session_get_current_state(sess) == SESSION_STATE_CLOSED)
{
struct session_manager *sess_mgr = (struct session_manager *)args;
int thread_id = module_manager_get_thread_id(sess_mgr->mod_mgr);
struct session_manager_rte *sess_mgr_rte = sess_mgr->rte[thread_id];
struct session_manager_rte *sess_mgr_rte = session_manager_get_rte(sess_mgr, thread_id);
char buffer[4096] = {0};
session_to_str(sess, 0, buffer, sizeof(buffer));
@@ -106,20 +54,34 @@ static void on_sess_msg_free(void *msg, void *args)
}
}
static void on_tcp_payload_msg_free(void *msg, void *args)
static void notify_sess_closed_by_pseudo_pkt(struct session_manager *sess_mgr, int thread_id, struct session *sess)
{
struct tcp_segment *seg = (struct tcp_segment *)msg;
struct session *sess = (struct session *)seg->user_data;
struct packet *pseudo = NULL;
struct packet_manager *pkt_mgr = sess_mgr->pkt_mgr;
assert(session_get_current_state(sess) == SESSION_STATE_CLOSED);
session_free_tcp_segment(sess, seg);
if (session_get_first_packet(sess, FLOW_TYPE_C2S))
{
pseudo = packet_manager_dup_packet(pkt_mgr, session_get_first_packet(sess, FLOW_TYPE_C2S));
}
else
{
pseudo = packet_manager_dup_packet(pkt_mgr, session_get_first_packet(sess, FLOW_TYPE_S2C));
}
assert(pseudo);
packet_set_type(pseudo, PACKET_TYPE_PSEUDO);
packet_set_action(pseudo, PACKET_ACTION_DROP);
packet_set_exdata(pseudo, sess_mgr->pkt_ex_to_free_sess, sess);
packet_manager_schedule_packet(pkt_mgr, thread_id, pseudo, PACKET_STAGE_FORWARD);
SESSION_MANAGER_LOG_INFO("notify session %lu %s closed by pseudo packet: %p", session_get_id(sess), session_get_readable_addr(sess), pseudo);
}
static void on_packet_forward(struct packet *pkt, enum packet_stage stage, void *args)
{
struct session_manager *sess_mgr = (struct session_manager *)args;
int thread_id = module_manager_get_thread_id(sess_mgr->mod_mgr);
struct mq_runtime *mq_rte = sess_mgr->mq[thread_id];
struct session_manager_rte *sess_mgr_rte = sess_mgr->rte[thread_id];
struct session_manager_rte *sess_mgr_rte = session_manager_get_rte(sess_mgr, thread_id);
/*
* We use the system's real time instead of monotonic time for the following reasons:
@@ -132,86 +94,62 @@ static void on_packet_forward(struct packet *pkt, enum packet_stage stage, void
uint64_t now_ms = clock_get_real_time_ms();
struct tuple6 key;
struct tcp_segment *seg = NULL;
struct session *sess = session_manager_rte_lookup_session_by_packet(sess_mgr_rte, pkt);
if (sess == NULL)
{
if (packet_get_type(pkt) == PACKET_TYPE_PSEUDO)
if (packet_get_type(pkt) == PACKET_TYPE_RAW)
{
goto fast_path;
}
sess = session_manager_rte_new_session(sess_mgr_rte, pkt, now_ms);
if (sess == NULL)
{
goto fast_path;
sess = session_manager_rte_new_session(sess_mgr_rte, pkt, now_ms);
if (sess)
{
session_set_user_data(sess, exdata_runtime_new(sess_mgr->ex_sche));
}
}
else
{
session_set_user_data(sess, exdata_runtime_new(sess_mgr->sche->ex_sche));
goto slow_path;
// TODO new session by pseudo packet
}
}
else
{
if (packet_get_type(pkt) == PACKET_TYPE_PSEUDO)
{
goto ctrl_path;
}
if (session_manager_rte_update_session(sess_mgr_rte, sess, pkt, now_ms) == -1)
{
goto fast_path;
session_set_current_packet(sess, pkt);
packet_get_innermost_tuple6(pkt, &key);
if (tuple6_cmp(session_get_tuple6(sess), &key) == 0)
{
session_set_flow_type(sess, FLOW_TYPE_C2S);
sess->stats[FLOW_TYPE_C2S][STAT_PSEUDO_PACKETS_RECEIVED]++;
sess->stats[FLOW_TYPE_C2S][STAT_PSEUDO_BYTES_RECEIVED] += packet_get_raw_len(pkt);
}
else
{
session_set_flow_type(sess, FLOW_TYPE_S2C);
sess->stats[FLOW_TYPE_S2C][STAT_PSEUDO_PACKETS_RECEIVED]++;
sess->stats[FLOW_TYPE_S2C][STAT_PSEUDO_BYTES_RECEIVED] += packet_get_raw_len(pkt);
}
}
else
{
goto slow_path;
session_manager_rte_update_session(sess_mgr_rte, sess, pkt, now_ms);
}
}
ctrl_path:
session_set_current_packet(sess, pkt);
packet_get_innermost_tuple6(pkt, &key);
if (tuple6_cmp(session_get_tuple6(sess), &key) == 0)
{
session_set_flow_type(sess, FLOW_TYPE_C2S);
}
else
{
session_set_flow_type(sess, FLOW_TYPE_S2C);
}
packet_set_exdata(pkt, sess_mgr->sche->pkt_ex_id, sess);
mq_runtime_publish_message(mq_rte, sess_mgr->sche->sess_msg_id_ctrl, sess);
return;
packet_set_exdata(pkt, sess_mgr->pkt_ex_to_get_sess, sess);
slow_path:
if (session_get_type(sess) == SESSION_TYPE_TCP)
while ((sess = session_manager_rte_get_evicted_session(sess_mgr_rte)))
{
mq_runtime_publish_message(mq_rte, sess_mgr->sche->sess_msg_id_tcp, sess);
while ((seg = session_get_tcp_segment(sess)))
{
mq_runtime_publish_message(mq_rte, sess_mgr->sche->sess_msg_id_stream, seg);
}
notify_sess_closed_by_pseudo_pkt(sess_mgr, thread_id, sess);
}
else
{
mq_runtime_publish_message(mq_rte, sess_mgr->sche->sess_msg_id_udp, sess);
}
packet_set_exdata(pkt, sess_mgr->sche->pkt_ex_id, sess);
return;
fast_path:
packet_set_exdata(pkt, sess_mgr->sche->pkt_ex_id, NULL);
return;
}
static void on_packet_output(struct packet *pkt, enum packet_stage stage, void *args)
{
struct session_manager *sess_mgr = (struct session_manager *)args;
int thread_id = module_manager_get_thread_id(sess_mgr->mod_mgr);
struct session_manager_rte *sess_mgr_rte = sess_mgr->rte[thread_id];
struct session_manager_rte *sess_mgr_rte = session_manager_get_rte(sess_mgr, thread_id);
struct session *sess = (struct session *)packet_get_exdata(pkt, sess_mgr->sche->pkt_ex_id);
struct session *sess = (struct session *)packet_get_exdata(pkt, sess_mgr->pkt_ex_to_get_sess);
if (sess)
{
struct tuple6 key;
@@ -255,18 +193,35 @@ static void on_packet_output(struct packet *pkt, enum packet_stage stage, void *
static void on_polling(struct module_manager *mod_mgr, void *args)
{
struct session *sess = NULL;
uint64_t now_ms = clock_get_real_time_ms();
struct session_manager *sess_mgr = (struct session_manager *)args;
int thread_id = module_manager_get_thread_id(mod_mgr);
struct session_manager_rte *sess_mgr_rte = sess_mgr->rte[thread_id];
struct session_manager_stat *sess_mgr_stat = session_manager_rte_get_stat(sess_mgr_rte);
struct session_manager_rte *sess_mgr_rte = session_manager_get_rte(sess_mgr, thread_id);
clean_closed_session(sess_mgr, thread_id, now_ms);
static __thread uint64_t last_clean_expired_sess_ts = 0;
if (now_ms - last_clean_expired_sess_ts > sess_mgr->cfg->expire_period_ms)
{
for (uint64_t i = 0; i < sess_mgr->cfg->expire_batch_max; i++)
{
sess = session_manager_rte_get_expired_session(sess_mgr_rte, now_ms);
if (sess)
{
notify_sess_closed_by_pseudo_pkt(sess_mgr, thread_id, sess);
}
else
{
break;
}
}
last_clean_expired_sess_ts = now_ms;
}
static __thread uint64_t last_sync_stat_ms = 0;
static __thread struct session_manager_stat last_stat = {0};
if (now_ms - last_sync_stat_ms >= SYNC_STAT_INTERVAL_MS)
{
struct session_manager_stat *sess_mgr_stat = session_manager_rte_get_stat(sess_mgr_rte);
for (int i = 0; i < SESS_MGR_STAT_MAX; i++)
{
uint64_t val = session_manager_stat_get(sess_mgr_stat, i) - session_manager_stat_get(&last_stat, i);
@@ -277,100 +232,6 @@ static void on_polling(struct module_manager *mod_mgr, void *args)
}
}
static void session_manager_sche_free(struct session_manager_sche *sess_mgr_schema)
{
if (sess_mgr_schema)
{
if (sess_mgr_schema->mq_sche)
{
mq_schema_destroy_topic(sess_mgr_schema->mq_sche, sess_mgr_schema->sess_msg_id_tcp);
mq_schema_destroy_topic(sess_mgr_schema->mq_sche, sess_mgr_schema->sess_msg_id_udp);
mq_schema_destroy_topic(sess_mgr_schema->mq_sche, sess_mgr_schema->sess_msg_id_ctrl);
mq_schema_destroy_topic(sess_mgr_schema->mq_sche, sess_mgr_schema->sess_msg_id_stream);
}
exdata_schema_free(sess_mgr_schema->ex_sche);
free(sess_mgr_schema);
sess_mgr_schema = NULL;
}
}
static struct session_manager_sche *session_manager_sche_new(struct packet_manager *pkt_mgr, struct mq_schema *mq_sche, void *sess_mgr)
{
if (packet_manager_subscribe(pkt_mgr, PACKET_STAGE_FORWARD, on_packet_forward, sess_mgr))
{
SESSION_MANAGER_LOG_ERROR("failed to subscribe PACKET_STAGE_FORWARD");
return NULL;
}
if (packet_manager_subscribe(pkt_mgr, PACKET_STAGE_OUTPUT, on_packet_output, sess_mgr))
{
SESSION_MANAGER_LOG_ERROR("failed to subscribe PACKET_STAGE_OUTPUT");
return NULL;
}
struct session_manager_sche *sess_mgr_schema = calloc(1, sizeof(struct session_manager_sche));
if (sess_mgr_schema == NULL)
{
SESSION_MANAGER_LOG_ERROR("failed to allocate memory for session_manager_sche");
return NULL;
}
sess_mgr_schema->ex_sche = exdata_schema_new();
if (sess_mgr_schema->ex_sche == NULL)
{
SESSION_MANAGER_LOG_ERROR("failed to create exdata_schema");
goto error_out;
}
sess_mgr_schema->mq_sche = mq_sche;
sess_mgr_schema->pkt_ex_id = packet_manager_new_packet_exdata_index(pkt_mgr, "session_manager", NULL, NULL);
if (sess_mgr_schema->pkt_ex_id == -1)
{
SESSION_MANAGER_LOG_ERROR("failed to create packet exdata index");
goto error_out;
}
/*
* Publish session closed messages to multiple topics.
* Each topic has its own session message free callback.
* To prevent the same session from being freeed multiple times,
* only TCP/UDP topics register session message free callbacks,
* and other topics do not register session message callbacks;
*
* Restriction: MQ ensures that the session message free order is consistent with the publishing order
*/
sess_mgr_schema->sess_msg_id_tcp = mq_schema_create_topic(sess_mgr_schema->mq_sche, "SESSION_MESSAGE_TCP", &on_sess_msg_dispatch, NULL, &on_sess_msg_free, sess_mgr);
if (sess_mgr_schema->sess_msg_id_tcp == -1)
{
SESSION_MANAGER_LOG_ERROR("failed to create topic SESSION_MESSAGE_FREE");
goto error_out;
}
sess_mgr_schema->sess_msg_id_udp = mq_schema_create_topic(sess_mgr_schema->mq_sche, "SESSION_MESSAGE_UDP", &on_sess_msg_dispatch, NULL, &on_sess_msg_free, sess_mgr);
if (sess_mgr_schema->sess_msg_id_udp == -1)
{
SESSION_MANAGER_LOG_ERROR("failed to create topic SESSION_MESSAGE_UDP");
goto error_out;
}
sess_mgr_schema->sess_msg_id_ctrl = mq_schema_create_topic(sess_mgr_schema->mq_sche, "SESSION_MESSAGE_CTRL_PKT", &on_sess_msg_dispatch, NULL, NULL, NULL);
if (sess_mgr_schema->sess_msg_id_ctrl == -1)
{
SESSION_MANAGER_LOG_ERROR("failed to create topic SESSION_MESSAGE_CTRL_PKT");
goto error_out;
}
sess_mgr_schema->sess_msg_id_stream = mq_schema_create_topic(sess_mgr_schema->mq_sche, "SESSION_MESSAGE_TCP_STREAM", &on_tcp_payload_msg_dispatch, NULL, &on_tcp_payload_msg_free, sess_mgr);
if (sess_mgr_schema->sess_msg_id_stream == -1)
{
SESSION_MANAGER_LOG_ERROR("failed to create topic SESSION_MESSAGE_TCP_STREAM");
goto error_out;
}
return sess_mgr_schema;
error_out:
session_manager_sche_free(sess_mgr_schema);
return NULL;
}
/******************************************************************************
* session manager
******************************************************************************/
@@ -379,9 +240,9 @@ void session_manager_free(struct session_manager *sess_mgr)
{
if (sess_mgr)
{
if (sess_mgr->sche)
if (sess_mgr->ex_sche)
{
session_manager_sche_free(sess_mgr->sche);
exdata_schema_free(sess_mgr->ex_sche);
}
if (sess_mgr->fs)
{
@@ -395,7 +256,7 @@ void session_manager_free(struct session_manager *sess_mgr)
}
}
static struct session_manager *session_manager_new(struct packet_manager *pkt_mgr, struct mq_schema *mq_schema, const char *toml_file)
static struct session_manager *session_manager_new(struct packet_manager *pkt_mgr, const char *toml_file)
{
struct session_manager *sess_mgr = calloc(1, sizeof(struct session_manager));
if (sess_mgr == NULL)
@@ -422,18 +283,45 @@ static struct session_manager *session_manager_new(struct packet_manager *pkt_mg
{
sess_mgr->stat_idx[i] = fieldstat_easy_register_counter(sess_mgr->fs, sess_mgr_stat_str[i]);
}
if (fieldstat_easy_enable_auto_output(sess_mgr->fs, "session_manager.fs4", 2) != 0)
if (fieldstat_easy_enable_auto_output(sess_mgr->fs, "metrics/session_manager.json", 2) != 0)
{
SESSION_MANAGER_LOG_ERROR("failed to enable auto output");
goto error_out;
}
sess_mgr->sche = session_manager_sche_new(pkt_mgr, mq_schema, sess_mgr);
if (sess_mgr->sche == NULL)
if (packet_manager_subscribe(pkt_mgr, PACKET_STAGE_FORWARD, on_packet_forward, sess_mgr))
{
SESSION_MANAGER_LOG_ERROR("failed to subscribe PACKET_STAGE_FORWARD");
goto error_out;
}
if (packet_manager_subscribe(pkt_mgr, PACKET_STAGE_OUTPUT, on_packet_output, sess_mgr))
{
SESSION_MANAGER_LOG_ERROR("failed to subscribe PACKET_STAGE_OUTPUT");
goto error_out;
}
sess_mgr->ex_sche = exdata_schema_new();
if (sess_mgr->ex_sche == NULL)
{
SESSION_MANAGER_LOG_ERROR("failed to create exdata_schema");
goto error_out;
}
sess_mgr->pkt_ex_to_get_sess = packet_manager_new_packet_exdata_index(pkt_mgr, "pkt_ex_key_for_get_sess", NULL, NULL);
if (sess_mgr->pkt_ex_to_get_sess == -1)
{
SESSION_MANAGER_LOG_ERROR("failed to create packet exdata index");
goto error_out;
}
sess_mgr->pkt_ex_to_free_sess = packet_manager_new_packet_exdata_index(pkt_mgr, "pkt_ex_key_for_free_sess", free_session, sess_mgr);
if (sess_mgr->pkt_ex_to_free_sess == -1)
{
SESSION_MANAGER_LOG_ERROR("failed to create packet exdata index");
goto error_out;
}
sess_mgr->pkt_mgr = pkt_mgr;
return sess_mgr;
error_out:
@@ -446,44 +334,15 @@ int session_manager_new_session_exdata_index(struct session_manager *sess_mgr, c
assert(sess_mgr);
assert(name);
assert(func);
return exdata_schema_new_index(sess_mgr->sche->ex_sche, name, func, arg);
return exdata_schema_new_index(sess_mgr->ex_sche, name, func, arg);
}
int session_manager_subscribe_tcp(struct session_manager *sess_mgr, on_session_message_callback *cb, void *args)
{
assert(sess_mgr);
assert(cb);
return mq_schema_subscribe(sess_mgr->sche->mq_sche, sess_mgr->sche->sess_msg_id_tcp, (on_msg_cb_func *)(void *)cb, args);
}
int session_manager_subscribe_udp(struct session_manager *sess_mgr, on_session_message_callback *cb, void *args)
{
assert(sess_mgr);
assert(cb);
return mq_schema_subscribe(sess_mgr->sche->mq_sche, sess_mgr->sche->sess_msg_id_udp, (on_msg_cb_func *)(void *)cb, args);
}
int session_manager_subscribe_control_packet(struct session_manager *sess_mgr, on_session_message_callback *cb, void *args)
{
assert(sess_mgr);
assert(cb);
return mq_schema_subscribe(sess_mgr->sche->mq_sche, sess_mgr->sche->sess_msg_id_ctrl, (on_msg_cb_func *)(void *)cb, args);
}
int session_manager_subscribe_tcp_stream(struct session_manager *sess_mgr, on_tcp_payload_callback *cb, void *args)
{
assert(sess_mgr);
assert(cb);
return mq_schema_subscribe(sess_mgr->sche->mq_sche, sess_mgr->sche->sess_msg_id_stream, (on_msg_cb_func *)(void *)cb, args);
}
int session_manager_init(struct session_manager *sess_mgr, uint16_t thread_id, struct mq_runtime *mq_rte)
int session_manager_init(struct session_manager *sess_mgr, uint16_t thread_id)
{
assert(sess_mgr);
uint64_t now_ms = clock_get_real_time_ms();
sess_mgr->cfg->session_id_seed = sess_mgr->cfg->instance_id << 8 | thread_id;
sess_mgr->mq[thread_id] = mq_rte;
sess_mgr->rte[thread_id] = session_manager_rte_new(sess_mgr->cfg, now_ms);
if (sess_mgr->rte[thread_id] == NULL)
{
@@ -500,22 +359,26 @@ void session_manager_clean(struct session_manager *sess_mgr, uint16_t thread_id)
{
assert(sess_mgr);
struct mq_runtime *mq_rte = sess_mgr->mq[thread_id];
if (sess_mgr->rte[thread_id] == NULL)
struct session_manager_rte *rte = session_manager_get_rte(sess_mgr, thread_id);
if (rte == NULL)
{
return;
}
struct session_manager_stat *stat = session_manager_rte_get_stat(sess_mgr->rte[thread_id]);
struct session *sess = NULL;
struct session_manager_stat *stat = session_manager_rte_get_stat(rte);
while (stat->tcp_sess_used || stat->udp_sess_used)
{
clean_closed_session(sess_mgr, thread_id, UINT64_MAX);
// here we need to dispatch the message to ensure that the session is cleaned up
mq_runtime_dispatch(mq_rte);
while ((sess = session_manager_rte_get_expired_session(rte, UINT64_MAX)))
{
exdata_runtime_free((struct exdata_runtime *)session_get_user_data(sess));
session_manager_rte_free_session(rte, sess);
}
}
session_manager_rte_free(sess_mgr->rte[thread_id]);
sess_mgr->rte[thread_id] = NULL;
session_manager_rte_free(rte);
}
/******************************************************************************
@@ -535,12 +398,10 @@ struct module *session_manager_on_init(struct module_manager *mod_mgr)
struct module *pkt_mgr_mod = module_manager_get_module(mod_mgr, PACKET_MANAGER_MODULE_NAME);
struct packet_manager *pkt_mgr = module_to_packet_manager(pkt_mgr_mod);
assert(pkt_mgr);
struct mq_schema *mq_sche = module_manager_get_mq_schema(mod_mgr);
assert(mq_sche);
const char *toml_file = module_manager_get_toml_path(mod_mgr);
assert(toml_file);
struct session_manager *sess_mgr = session_manager_new(pkt_mgr, mq_sche, toml_file);
struct session_manager *sess_mgr = session_manager_new(pkt_mgr, toml_file);
if (sess_mgr == NULL)
{
return NULL;
@@ -575,10 +436,9 @@ struct module *session_manager_on_thread_init(struct module_manager *mod_mgr, in
{
struct session_manager *sess_mgr = module_get_ctx(mod);
assert(sess_mgr);
struct mq_runtime *mq_rte = module_manager_get_mq_runtime(mod_mgr);
assert(mq_rte);
if (session_manager_init(sess_mgr, thread_id, mq_rte) != 0)
__thread_pkt_ex_to_get_sess = sess_mgr->pkt_ex_to_get_sess;
if (session_manager_init(sess_mgr, thread_id) != 0)
{
SESSION_MANAGER_LOG_ERROR("failed to int session_manager_init");
return NULL;
@@ -592,10 +452,11 @@ struct module *session_manager_on_thread_init(struct module_manager *mod_mgr, in
void session_manager_on_thread_exit(struct module_manager *mod_mgr, int thread_id, struct module *mod)
{
struct session_manager *sess_mgr = module_get_ctx(mod);
assert(sess_mgr);
assert(thread_id < (int)sess_mgr->cfg->thread_num);
session_manager_clean(sess_mgr, thread_id);
if (sess_mgr)
{
assert(thread_id < (int)sess_mgr->cfg->thread_num);
session_manager_clean(sess_mgr, thread_id);
}
}
struct session_manager_rte *session_manager_get_rte(struct session_manager *sess_mgr, uint16_t thread_id)
@@ -609,4 +470,9 @@ struct session_manager_cfg *session_manager_get_cfg(struct session_manager *sess
{
assert(sess_mgr);
return sess_mgr->cfg;
}
struct session *packet_exdata_to_session(struct packet *pkt)
{
return (struct session *)packet_get_exdata(pkt, __thread_pkt_ex_to_get_sess);
}

View File

@@ -34,7 +34,6 @@ struct session_manager_rte
// only used for session_set_discard() or session_manager_rte_record_duplicated_packet(), because the function is called by plugin and has no time input.
uint64_t now_ms;
uint64_t last_clean_expired_sess_ts;
struct snowflake *sf;
};
@@ -810,7 +809,6 @@ struct session_manager_rte *session_manager_rte_new(const struct session_manager
TAILQ_INIT(&sess_mgr_rte->evc_list);
session_transition_init();
sess_mgr_rte->now_ms = now_ms;
sess_mgr_rte->last_clean_expired_sess_ts = now_ms;
return sess_mgr_rte;
@@ -1064,55 +1062,6 @@ struct session *session_manager_rte_get_evicted_session(struct session_manager_r
return sess;
}
uint64_t session_manager_rte_clean_session(struct session_manager_rte *sess_mgr_rte, uint64_t now_ms, struct session *cleaned_sess_ptr[], uint64_t array_size)
{
sess_mgr_rte->now_ms = now_ms;
struct session *sess = NULL;
uint64_t cleaned_sess_num = 0;
uint64_t expired_sess_num = 0;
uint8_t expired_sess_canbe_clean = 0;
if (now_ms - sess_mgr_rte->last_clean_expired_sess_ts >= sess_mgr_rte->cfg.expire_period_ms ||
now_ms == UINT64_MAX)
{
expired_sess_canbe_clean = 1;
}
for (uint64_t i = 0; i < array_size; i++)
{
// frist clean evicted session
sess = session_manager_rte_get_evicted_session(sess_mgr_rte);
if (sess)
{
cleaned_sess_ptr[cleaned_sess_num++] = sess;
}
// then clean expired session
else
{
if (expired_sess_canbe_clean && expired_sess_num < sess_mgr_rte->cfg.expire_batch_max)
{
sess_mgr_rte->last_clean_expired_sess_ts = now_ms;
sess = session_manager_rte_get_expired_session(sess_mgr_rte, now_ms);
if (sess)
{
cleaned_sess_ptr[cleaned_sess_num++] = sess;
expired_sess_num++;
}
else
{
break;
}
}
else
{
break;
}
}
}
return cleaned_sess_num;
}
uint64_t session_manager_rte_scan_session(struct session_manager_rte *sess_mgr_rte, const struct session_filter *filter, uint64_t mached_sess_id[], uint64_t array_size)
{
uint64_t capacity = 0;

View File

@@ -43,7 +43,6 @@ int session_manager_rte_update_session(struct session_manager_rte *sess_mgr_rte,
struct session *session_manager_rte_get_expired_session(struct session_manager_rte *sess_mgr_rte, uint64_t now_ms);
struct session *session_manager_rte_get_evicted_session(struct session_manager_rte *sess_mgr_rte);
uint64_t session_manager_rte_clean_session(struct session_manager_rte *sess_mgr_rte, uint64_t now_ms, struct session *cleaned_sess_ptr[], uint64_t array_size);
uint64_t session_manager_rte_scan_session(struct session_manager_rte *sess_mgr_rte, const struct session_filter *filter, uint64_t mached_sess_id[], uint64_t array_size);
void session_manager_rte_record_duplicated_packet(struct session_manager_rte *sess_mgr_rte, const struct packet *pkt);

View File

@@ -5,9 +5,6 @@
void session_init(struct session *sess)
{
memset(sess, 0, sizeof(struct session));
sess->empty_seg.data = NULL;
sess->empty_seg.len = 0;
sess->empty_seg.user_data = sess;
}
void session_set_id(struct session *sess, uint64_t id)
@@ -212,7 +209,6 @@ struct tcp_segment *session_get_tcp_segment(struct session *sess)
{
sess->sess_mgr_stat->tcp_segs_consumed++;
half->inorder_seg_consumed = 1;
half->inorder_seg.user_data = sess;
return &half->inorder_seg;
}
else
@@ -226,7 +222,6 @@ struct tcp_segment *session_get_tcp_segment(struct session *sess)
// TODO
sess->sess_mgr_stat->tcp_segs_consumed++;
sess->sess_mgr_stat->tcp_segs_reordered++;
seg->user_data = sess;
}
return seg;
}
@@ -239,12 +234,6 @@ void session_free_tcp_segment(struct session *sess, struct tcp_segment *seg)
return;
}
// empty segment for end of session
if (seg == &sess->empty_seg)
{
return;
}
enum flow_type type = session_get_flow_type(sess);
struct tcp_half *half = &sess->tcp_halfs[type];

View File

@@ -149,4 +149,6 @@ gtest_discover_tests(gtest_session_transition)
gtest_discover_tests(gtest_sess_mgr_tcp_reassembly)
gtest_discover_tests(gtest_sess_mgr_scan)
gtest_discover_tests(gtest_case_tcp_fast_open)
gtest_discover_tests(gtest_case_tcp_fast_open)
file(COPY ./metrics/ DESTINATION ./metrics/)

View File

@@ -95,6 +95,8 @@ static void *worker_thread(void *arg)
}
}
CORE_LOG_FATAL("worker thread %d cleaning", thread_id);
module_manager_unregister_thread(mod_mgr, thread_id);
mq_runtime_free(mq_rt);

View File

@@ -12,7 +12,6 @@ struct tcp_segment
{
uint32_t len;
const void *data;
void *user_data;
};
struct tcp_segment *tcp_segment_new(uint32_t seq, const void *data, uint32_t len);

View File

@@ -17,7 +17,7 @@ extern "C"
#define RX_BURST_MAX 32
#define MAX_THREAD_NUM 256 // limit by snowflake
#define SYNC_STAT_INTERVAL_MS 1000
#define SYNC_STAT_INTERVAL_MS 1 // TODO
#define ATOMIC_INC(x) __atomic_fetch_add(x, 1, __ATOMIC_RELAXED)
#define ATOMIC_DEC(x) __atomic_fetch_sub(x, 1, __ATOMIC_RELAXED)

View File

@@ -25,12 +25,13 @@ global:
packet_manager_build_tcp_packet;
packet_manager_build_udp_packet;
packet_manager_build_l3_packet;
packet_manager_dup_packet;
packet_manager_free_packet;
session_is_symmetric;
session_has_duplicate_traffic;
session_get_type;
session_get_current_state;
session_get_current_packet;
session_get_closing_reason;
session_get_direction;
session_get_flow_type;
@@ -47,10 +48,7 @@ global:
session_manager_on_thread_init;
session_manager_on_thread_exit;
session_manager_new_session_exdata_index;
session_manager_subscribe_tcp;
session_manager_subscribe_udp;
session_manager_subscribe_control_packet;
session_manager_subscribe_tcp_stream;
packet_exdata_to_session;
session_monitor_on_init;
session_monitor_on_exit;

View File

@@ -6,4 +6,5 @@ if [ $# -ne 1 ]; then
fi
f4_json_file=$1
# python3 -m pip install prettytable jinja2
/opt/MESA/bin/fieldstat_exporter.py local -j $f4_json_file -l --clear-screen

View File

@@ -1,7 +1,7 @@
#add_subdirectory(packet_inject)
add_subdirectory(packet_tool)
add_subdirectory(session_debugger)
add_subdirectory(lpi_plus)
#add_subdirectory(session_debugger)
#add_subdirectory(lpi_plus)
#add_subdirectory(decoders/http)
#add_subdirectory(decoders/socks)
#add_subdirectory(decoders/stratum)

View File

@@ -26,7 +26,8 @@ foreach(tfile ${MONITOR_TEST_FILE})
endforeach()
set(MONITOR_TEST_RUN_DIR ${CMAKE_CURRENT_BINARY_DIR})
add_test(NAME MONITOR_ENV_SETUP COMMAND sh -c "mkdir -p ${MONITOR_TEST_RUN_DIR}/conf &&
add_test(NAME MONITOR_ENV_SETUP COMMAND sh -c "mkdir -p ${MONITOR_TEST_RUN_DIR}/metrics &&
mkdir -p ${MONITOR_TEST_RUN_DIR}/conf &&
mkdir -p ${MONITOR_TEST_RUN_DIR}/plugin &&
mkdir -p ${MONITOR_TEST_RUN_DIR}/log &&
mkdir -p ${MONITOR_TEST_RUN_DIR}/pcap &&