feature: session mananger publish ctrl message; Enhance session debugger module

This commit is contained in:
luwenpeng
2024-10-24 10:24:20 +08:00
parent 5c5a50929b
commit 1e71122521
9 changed files with 204 additions and 44 deletions

View File

@@ -300,7 +300,7 @@ static void lpi_plus_on_session(struct session *sess, struct packet *pkt, void *
if(exdata->ctx.detected_pkt_cnt>=env->max_pkts)return;
uint16_t payload_len=packet_get_payload_len(pkt);
const char *payload=packet_get_payload(pkt);
const char *payload=packet_get_payload_data(pkt);
if (payload!=NULL && payload_len>0)//detect packet with payload only
{
lpi_plus_context_update(sess, &exdata->ctx, payload, payload_len);

View File

@@ -182,7 +182,7 @@ const struct timeval *packet_get_timeval(const struct packet *pkt);
const char *packet_get_raw_data(const struct packet *pkt);
uint16_t packet_get_raw_len(const struct packet *pkt);
const char *packet_get_payload(const struct packet *pkt);
const char *packet_get_payload_data(const struct packet *pkt);
uint16_t packet_get_payload_len(const struct packet *pkt);
void packet_set_exdata(struct packet *pkt, int idx, void *ex_ptr);

View File

@@ -71,8 +71,8 @@ enum session_stat
STAT_INJECTED_BYTES_SUCCESS,
// control packet
STAT_CONTROL_PACKETS_RECEIVED, // TODO
STAT_CONTROL_BYTES_RECEIVED, // TODO
STAT_CONTROL_PACKETS_RECEIVED,
STAT_CONTROL_BYTES_RECEIVED,
STAT_CONTROL_PACKETS_TRANSMITTED,
STAT_CONTROL_BYTES_TRANSMITTED,

View File

@@ -850,7 +850,7 @@ uint16_t packet_get_raw_len(const struct packet *pkt)
return pkt->data_len;
}
const char *packet_get_payload(const struct packet *pkt)
const char *packet_get_payload_data(const struct packet *pkt)
{
if (pkt == NULL || pkt->layers_used == 0)
{

View File

@@ -54,7 +54,7 @@ struct session
UT_hash_handle hh1;
UT_hash_handle hh2;
UT_hash_handle hh3;
struct tuple6 tuple;
struct tuple6 tuple; // FLOW_TYPE_C2S tuple
char tuple_str[TUPLE6_STR_SIZE];
struct sids sids[MAX_FLOW_TYPE];
struct route_ctx route_ctx[MAX_FLOW_TYPE];

View File

@@ -106,10 +106,16 @@ static void on_packet_forward(enum packet_stage stage, struct packet *pkt, void
*/
uint64_t now_ms = clock_get_real_time_ms();
struct tuple6 key;
struct tcp_segment *seg = NULL;
struct session *sess = session_manager_runtime_lookup_session_by_packet(sess_mgr_rt, pkt);
if (sess == NULL)
{
if (packet_is_ctrl(pkt))
{
goto fast_path;
}
sess = session_manager_runtime_new_session(sess_mgr_rt, pkt, now_ms);
if (sess == NULL)
{
@@ -123,6 +129,11 @@ static void on_packet_forward(enum packet_stage stage, struct packet *pkt, void
}
else
{
if (packet_is_ctrl(pkt))
{
goto ctrl_path;
}
if (session_manager_runtime_update_session(sess_mgr_rt, sess, pkt, now_ms) == -1)
{
goto fast_path;
@@ -133,6 +144,21 @@ static void on_packet_forward(enum packet_stage stage, struct packet *pkt, void
}
}
ctrl_path:
session_set_current_packet(sess, pkt);
packet_get_innermost_tuple6(pkt, &key);
if (tuple6_cmp(session_get_tuple6(sess), &key) == 0)
{
session_set_flow_type(sess, FLOW_TYPE_C2S);
}
else
{
session_set_flow_type(sess, FLOW_TYPE_S2C);
}
packet_set_exdata(pkt, sess_mgr->schema->pkt_exdata_idx, sess);
mq_runtime_publish_message(mq_rt, sess_mgr->schema->topic_id_ctrl_pkt, sess);
return;
slow_path:
if (session_get_type(sess) == SESSION_TYPE_TCP)
{
@@ -146,11 +172,12 @@ slow_path:
{
mq_runtime_publish_message(mq_rt, sess_mgr->schema->topic_id_udp, sess);
}
packet_set_exdata(pkt, sess_mgr->schema->pkt_exdata_idx, sess);
return;
fast_path:
packet_set_exdata(pkt, sess_mgr->schema->pkt_exdata_idx, NULL);
return;
}
static void on_packet_output(enum packet_stage stage, struct packet *pkt, void *args)
@@ -163,18 +190,29 @@ static void on_packet_output(enum packet_stage stage, struct packet *pkt, void *
struct session *sess = (struct session *)packet_get_exdata(pkt, sess_mgr->schema->pkt_exdata_idx);
if (sess)
{
enum flow_type type = session_get_flow_type(sess);
struct tuple6 key;
enum flow_type flow = FLOW_TYPE_NONE;
packet_get_innermost_tuple6(pkt, &key);
if (tuple6_cmp(session_get_tuple6(sess), &key) == 0)
{
flow = FLOW_TYPE_C2S;
}
else
{
flow = FLOW_TYPE_S2C;
}
int is_ctrl = packet_is_ctrl(pkt);
uint16_t len = packet_get_raw_len(pkt);
switch (packet_get_action(pkt))
{
case PACKET_ACTION_DROP:
session_inc_stat(sess, type, (is_ctrl ? STAT_CONTROL_PACKETS_DROPPED : STAT_RAW_PACKETS_DROPPED), 1);
session_inc_stat(sess, type, (is_ctrl ? STAT_CONTROL_BYTES_DROPPED : STAT_RAW_BYTES_DROPPED), len);
session_inc_stat(sess, flow, (is_ctrl ? STAT_CONTROL_PACKETS_DROPPED : STAT_RAW_PACKETS_DROPPED), 1);
session_inc_stat(sess, flow, (is_ctrl ? STAT_CONTROL_BYTES_DROPPED : STAT_RAW_BYTES_DROPPED), len);
break;
case PACKET_ACTION_FORWARD:
session_inc_stat(sess, type, (is_ctrl ? STAT_CONTROL_PACKETS_TRANSMITTED : STAT_RAW_PACKETS_TRANSMITTED), 1);
session_inc_stat(sess, type, (is_ctrl ? STAT_CONTROL_BYTES_TRANSMITTED : STAT_RAW_BYTES_TRANSMITTED), len);
session_inc_stat(sess, flow, (is_ctrl ? STAT_CONTROL_PACKETS_TRANSMITTED : STAT_RAW_PACKETS_TRANSMITTED), 1);
session_inc_stat(sess, flow, (is_ctrl ? STAT_CONTROL_BYTES_TRANSMITTED : STAT_RAW_BYTES_TRANSMITTED), len);
break;
default:
assert(0);

View File

@@ -1102,6 +1102,7 @@ void session_manager_runtime_free_session(struct session_manager_runtime *sess_m
session_set_current_state(sess, SESSION_STATE_INIT);
session_set_current_packet(sess, NULL);
session_set_flow_type(sess, FLOW_TYPE_NONE);
session_init(sess);
session_pool_push(sess_mgr_rt->sess_pool, sess);
sess = NULL;
}

View File

@@ -12,7 +12,7 @@ global:
packet_get_action;
packet_get_raw_data;
packet_get_raw_len;
packet_get_payload;
packet_get_payload_data;
packet_get_payload_len;
packet_build_tcp;
packet_build_udp;

View File

@@ -30,20 +30,41 @@ struct session_debugger_exdata
struct session_debugger *dbg;
struct session *sess;
uint64_t c2s_rx_pkts;
uint64_t s2c_rx_pkts;
// data packet
uint64_t c2s_rx_data_pkts;
uint64_t s2c_rx_data_pkts;
uint64_t c2s_rx_bytes;
uint64_t s2c_rx_bytes;
uint64_t c2s_rx_data_bytes;
uint64_t s2c_rx_data_bytes;
// control packet
uint64_t c2s_rx_ctrl_pkts;
uint64_t s2c_rx_ctrl_pkts;
uint64_t c2s_rx_ctrl_bytes;
uint64_t s2c_rx_ctrl_bytes;
// TCP segment
uint64_t c2s_rx_tcp_seg;
uint64_t s2c_rx_tcp_seg;
uint64_t c2s_rx_tcp_bytes;
uint64_t s2c_rx_tcp_bytes;
// UDP payload
uint64_t c2s_rx_udp_payload;
uint64_t s2c_rx_udp_payload;
uint64_t c2s_rx_udp_bytes;
uint64_t s2c_rx_udp_bytes;
// hexdump TCP segment
int c2s_tcp_seg_hexdump_fd;
int s2c_tcp_seg_hexdump_fd;
// hexdump UDP payload
int c2s_udp_payload_hexdump_fd;
int s2c_udp_payload_hexdump_fd;
};
static void session_debugger_log(int fd, const char *fmt, ...)
@@ -88,14 +109,25 @@ static struct session_debugger_exdata *session_debugger_exdata_new(struct sessio
if (session_get_type(sess) == SESSION_TYPE_TCP)
{
memset(buff, 0, sizeof(buff));
sprintf(buff, "./log/session_debugger.%s_c2s.hexdump", session_get0_readable_addr(sess));
sprintf(buff, "./log/session_debugger.TCP_%s_C2S.hexdump", session_get0_readable_addr(sess));
exdata->c2s_tcp_seg_hexdump_fd = open(buff, O_WRONLY | O_APPEND | O_CREAT, 0644);
memset(buff, 0, sizeof(buff));
sprintf(buff, "./log/session_debugger.%s_s2c.hexdump", session_get0_readable_addr(sess));
sprintf(buff, "./log/session_debugger.TCP_%s_S2C.hexdump", session_get0_readable_addr(sess));
exdata->s2c_tcp_seg_hexdump_fd = open(buff, O_WRONLY | O_APPEND | O_CREAT, 0644);
}
if (session_get_type(sess) == SESSION_TYPE_UDP)
{
memset(buff, 0, sizeof(buff));
sprintf(buff, "./log/session_debugger.UDP_%s_C2S.hexdump", session_get0_readable_addr(sess));
exdata->c2s_udp_payload_hexdump_fd = open(buff, O_WRONLY | O_APPEND | O_CREAT, 0644);
memset(buff, 0, sizeof(buff));
sprintf(buff, "./log/session_debugger.UDP_%s_S2C.hexdump", session_get0_readable_addr(sess));
exdata->s2c_udp_payload_hexdump_fd = open(buff, O_WRONLY | O_APPEND | O_CREAT, 0644);
}
memset(buff, 0, sizeof(buff));
session_to_str(sess, 1, buff, sizeof(buff) - 1);
session_debugger_log(dbg->fd, "sess new: %s", buff);
@@ -115,6 +147,14 @@ static void session_debugger_exdata_free(struct session_debugger_exdata *exdata)
{
close(exdata->s2c_tcp_seg_hexdump_fd);
}
if (exdata->c2s_udp_payload_hexdump_fd > 0)
{
close(exdata->c2s_udp_payload_hexdump_fd);
}
if (exdata->s2c_udp_payload_hexdump_fd > 0)
{
close(exdata->s2c_udp_payload_hexdump_fd);
}
free(exdata);
}
@@ -128,7 +168,7 @@ static void session_debugger_exdata_free_callback(int idx, void *ex_ptr, void *a
session_debugger_exdata_free((struct session_debugger_exdata *)ex_ptr);
}
static void on_sess_free(struct session *sess, void *arg)
static void on_session_free(struct session *sess, void *arg)
{
struct session_debugger *dbg = (struct session_debugger *)arg;
struct session_debugger_exdata *exdata = (struct session_debugger_exdata *)session_get_exdata(sess, dbg->sess_exdata_idx);
@@ -136,23 +176,39 @@ static void on_sess_free(struct session *sess, void *arg)
char buff[PATH_MAX] = {0};
session_to_str(exdata->sess, 0, buff, sizeof(buff) - 1);
session_debugger_log(exdata->dbg->fd, "sess free: %s", buff);
session_debugger_log(exdata->dbg->fd, "session %lu %s stat:\n"
"C2S rx packets: %6lu, C2S rx bytes: %6lu\n"
"S2C rx packets: %6lu, S2C rx bytes: %6lu\n"
"C2S rx TCP segments: %6lu, C2S rx TCP bytes: %6lu\n"
"S2C rx TCP segments: %6lu, S2C rx TCP bytes: %6lu\n",
session_get_id(exdata->sess), session_get0_readable_addr(exdata->sess),
exdata->c2s_rx_pkts, exdata->c2s_rx_bytes,
exdata->s2c_rx_pkts, exdata->s2c_rx_bytes,
snprintf(buff, sizeof(buff),
"==========================================================\n"
"C2S Data Packets : %6lu | C2S Data Bytes : %6lu\n"
"S2C Data Packets : %6lu | S2C Data Bytes : %6lu\n"
"----------------------------------------------------------\n"
"C2S Control Packets : %6lu | C2S Control Bytes : %6lu\n"
"S2C Control Packets : %6lu | S2C Control Bytes : %6lu\n"
"----------------------------------------------------------\n"
"C2S TCP Segments : %6lu | C2S TCP Bytes : %6lu\n"
"S2C TCP Segments : %6lu | S2C TCP Bytes : %6lu\n"
"----------------------------------------------------------\n"
"C2S UDP Payload : %6lu | C2S UDP Bytes : %6lu\n"
"S2C UDP Payload : %6lu | S2C UDP Bytes : %6lu\n",
exdata->c2s_rx_data_pkts, exdata->c2s_rx_data_bytes,
exdata->s2c_rx_data_pkts, exdata->s2c_rx_data_bytes,
exdata->c2s_rx_ctrl_pkts, exdata->c2s_rx_ctrl_bytes,
exdata->s2c_rx_ctrl_pkts, exdata->s2c_rx_ctrl_bytes,
exdata->c2s_rx_tcp_seg, exdata->c2s_rx_tcp_bytes,
exdata->s2c_rx_tcp_seg, exdata->s2c_rx_tcp_bytes);
exdata->s2c_rx_tcp_seg, exdata->s2c_rx_tcp_bytes,
exdata->c2s_rx_udp_payload, exdata->c2s_rx_udp_bytes,
exdata->s2c_rx_udp_payload, exdata->s2c_rx_udp_bytes);
session_debugger_log(exdata->dbg->fd, "session %lu %s statistics:\n%s", session_get_id(exdata->sess), session_get0_readable_addr(exdata->sess), buff);
}
static void on_sess_packet(struct session *sess, struct packet *pkt, void *arg)
static void on_session_packet(struct session *sess, struct packet *pkt, void *arg)
{
struct session_debugger *dbg = (struct session_debugger *)arg;
int is_ctrl = packet_is_ctrl(pkt);
char buff[PATH_MAX];
enum flow_type flow = session_get_flow_type(sess);
assert(flow == FLOW_TYPE_C2S || flow == FLOW_TYPE_S2C);
struct session_debugger_exdata *exdata = (struct session_debugger_exdata *)session_get_exdata(sess, dbg->sess_exdata_idx);
if (exdata == NULL)
{
@@ -160,24 +216,40 @@ static void on_sess_packet(struct session *sess, struct packet *pkt, void *arg)
session_set_exdata(sess, dbg->sess_exdata_idx, exdata);
}
if (session_get_flow_type(sess) == FLOW_TYPE_C2S)
if (flow == FLOW_TYPE_C2S)
{
exdata->c2s_rx_pkts++;
exdata->c2s_rx_bytes += packet_get_raw_len(pkt);
if (is_ctrl)
{
exdata->c2s_rx_ctrl_pkts++;
exdata->c2s_rx_ctrl_bytes += packet_get_raw_len(pkt);
}
else
{
exdata->s2c_rx_pkts++;
exdata->s2c_rx_bytes += packet_get_raw_len(pkt);
exdata->c2s_rx_data_pkts++;
exdata->c2s_rx_data_bytes += packet_get_raw_len(pkt);
}
}
else
{
if (is_ctrl)
{
exdata->s2c_rx_ctrl_pkts++;
exdata->s2c_rx_ctrl_bytes += packet_get_raw_len(pkt);
}
else
{
exdata->s2c_rx_data_pkts++;
exdata->s2c_rx_data_bytes += packet_get_raw_len(pkt);
}
}
memset(buff, 0, sizeof(buff));
session_to_str(sess, 1, buff, sizeof(buff) - 1);
session_debugger_log(dbg->fd, "on %s msg: %s", session_type_to_str(session_get_type(sess)), buff);
session_debugger_log(dbg->fd, "on %s %s packet: %s", session_type_to_str(session_get_type(sess)), (is_ctrl ? "ctrl" : "data"), buff);
memset(buff, 0, sizeof(buff));
packet_dump_str(pkt, buff, sizeof(buff) - 1);
session_debugger_log(dbg->fd, "rx %s packet\n%s", session_type_to_str(session_get_type(sess)), buff);
session_debugger_log(dbg->fd, "rx %s %s packet\n%s", session_type_to_str(session_get_type(sess)), (is_ctrl ? "ctrl" : "data"), buff);
pthread_spin_lock(&dbg->lock);
packet_dump_hex(pkt, dbg->fd);
@@ -189,14 +261,17 @@ static void on_tcp_stream(struct session *sess, const char *tcp_payload, uint32_
struct session_debugger *dbg = (struct session_debugger *)arg;
char buff[PATH_MAX];
enum flow_type flow = session_get_flow_type(sess);
assert(flow == FLOW_TYPE_C2S || flow == FLOW_TYPE_S2C);
struct session_debugger_exdata *exdata = (struct session_debugger_exdata *)session_get_exdata(sess, dbg->sess_exdata_idx);
assert(exdata);
memset(buff, 0, sizeof(buff));
session_to_str(sess, 1, buff, sizeof(buff) - 1);
session_debugger_log(dbg->fd, "on TCP stream msg: %s", buff);
session_debugger_log(dbg->fd, "on TCP stream: %s", buff);
pthread_spin_lock(&dbg->lock);
if (session_get_flow_type(sess) == FLOW_TYPE_C2S)
if (flow == FLOW_TYPE_C2S)
{
session_debugger_log(dbg->fd, "rx C2S TCP segment: len: %d, data: %p", tcp_payload_len, tcp_payload);
hexdump_to_fd(dbg->fd, exdata->c2s_rx_tcp_bytes, tcp_payload, tcp_payload_len);
@@ -217,6 +292,47 @@ static void on_tcp_stream(struct session *sess, const char *tcp_payload, uint32_
pthread_spin_unlock(&dbg->lock);
}
static void on_udp_payload(struct session *sess, struct packet *pkt, void *arg)
{
struct session_debugger *dbg = (struct session_debugger *)arg;
char buff[PATH_MAX];
enum flow_type flow = session_get_flow_type(sess);
assert(flow == FLOW_TYPE_C2S || flow == FLOW_TYPE_S2C);
struct session_debugger_exdata *exdata = (struct session_debugger_exdata *)session_get_exdata(sess, dbg->sess_exdata_idx);
assert(exdata);
const char *udp_payload = packet_get_payload_data(pkt);
uint32_t udp_payload_len = packet_get_payload_len(pkt);
if (udp_payload_len == 0)
{
return;
}
memset(buff, 0, sizeof(buff));
session_to_str(sess, 1, buff, sizeof(buff) - 1);
session_debugger_log(dbg->fd, "on UDP payload: %s", buff);
pthread_spin_lock(&dbg->lock);
if (flow == FLOW_TYPE_C2S)
{
session_debugger_log(dbg->fd, "rx C2S UDP payload: len: %d, data: %p", udp_payload_len, udp_payload);
hexdump_to_fd(dbg->fd, exdata->c2s_rx_udp_bytes, udp_payload, udp_payload_len);
hexdump_to_fd(exdata->c2s_udp_payload_hexdump_fd, exdata->c2s_rx_udp_bytes, udp_payload, udp_payload_len);
exdata->c2s_rx_udp_payload++;
exdata->c2s_rx_udp_bytes += udp_payload_len;
}
else
{
session_debugger_log(dbg->fd, "rx S2C UDP payload: len: %d, data: %p", udp_payload_len, udp_payload);
hexdump_to_fd(dbg->fd, exdata->s2c_rx_udp_bytes, udp_payload, udp_payload_len);
hexdump_to_fd(exdata->s2c_udp_payload_hexdump_fd, exdata->s2c_rx_udp_bytes, udp_payload, udp_payload_len);
exdata->s2c_rx_udp_payload++;
exdata->s2c_rx_udp_bytes += udp_payload_len;
}
pthread_spin_unlock(&dbg->lock);
}
static void session_debugger_free(struct session_debugger *dbg)
{
if (dbg)
@@ -257,22 +373,22 @@ static struct session_debugger *session_debugger_new(struct session_manager *ses
goto error_out;
}
if (session_manager_subscribe_free(sess_mgr, on_sess_free, dbg) == -1)
if (session_manager_subscribe_free(sess_mgr, on_session_free, dbg) == -1)
{
session_debugger_log(STDERR_FILENO, "subscribe free failed\n");
goto error_out;
}
if (session_manager_subscribe_tcp(sess_mgr, on_sess_packet, dbg) == -1)
if (session_manager_subscribe_tcp(sess_mgr, on_session_packet, dbg) == -1)
{
session_debugger_log(STDERR_FILENO, "subscribe tcp failed\n");
goto error_out;
}
if (session_manager_subscribe_udp(sess_mgr, on_sess_packet, dbg) == -1)
if (session_manager_subscribe_udp(sess_mgr, on_session_packet, dbg) == -1)
{
session_debugger_log(STDERR_FILENO, "subscribe udp failed\n");
goto error_out;
}
if (session_manager_subscribe_control_packet(sess_mgr, on_sess_packet, dbg) == -1)
if (session_manager_subscribe_control_packet(sess_mgr, on_session_packet, dbg) == -1)
{
session_debugger_log(STDERR_FILENO, "subscribe control packet failed\n");
goto error_out;
@@ -282,6 +398,11 @@ static struct session_debugger *session_debugger_new(struct session_manager *ses
session_debugger_log(STDERR_FILENO, "subscribe tcp stream failed\n");
goto error_out;
}
if (session_manager_subscribe_udp(sess_mgr, on_udp_payload, dbg) == -1)
{
session_debugger_log(STDERR_FILENO, "subscribe udp failed\n");
goto error_out;
}
return dbg;