Add TCP reassembly metrics on session

This commit is contained in:
luwenpeng
2024-04-03 18:59:46 +08:00
parent e8e60cee6d
commit 151b6f8f1d
3 changed files with 279 additions and 269 deletions

View File

@@ -15,25 +15,7 @@
struct session_manager
{
// max session number
uint64_t max_tcp_session_num;
uint64_t max_udp_session_num;
// session overload
uint8_t tcp_overload_evict_old_sess; // 1: evict old session, 0: bypass new session
uint8_t udp_overload_evict_old_sess; // 1: evict old session, 0: bypass new session
// TCP timeout
uint64_t tcp_init_timeout; // range: [1, 60000]
uint64_t tcp_handshake_timeout; // range: [1, 60000]
uint64_t tcp_data_timeout; // range: [1, 15999999000]
uint64_t tcp_half_closed_timeout; // range: [1, 604800000]
uint64_t tcp_time_wait_timeout; // range: [1, 600000]
uint64_t tcp_discard_timeout; // range: [1, 15999999000]
uint64_t tcp_unverified_rst_timeout; // range: [1, 600000]
// UDP timeout
uint64_t udp_data_timeout; // range: [1, 15999999000]
// TCP reassembly
uint32_t tcp_reassembly_max_timeout; // range: [1, 60000] (ms)
uint32_t tcp_reassembly_max_segments; // range: [2, 32]
struct session_manager_options opts;
struct session_pool *sess_pool;
struct session_table *tcp_sess_table;
@@ -58,6 +40,16 @@ int check_options(const struct session_manager_options *opts)
return -1;
}
if (opts->max_tcp_session_num < EVICTE_SESSION_BURST * 2)
{
SESSION_LOG_ERROR("invalid max_tcp_session_num: %lu, supported range: [%u, %lu]", opts->max_tcp_session_num, EVICTE_SESSION_BURST * 2, UINT64_MAX);
return -1;
}
if (opts->max_udp_session_num < EVICTE_SESSION_BURST * 2)
{
SESSION_LOG_ERROR("invalid max_udp_session_num: %lu, supported range: [%u, %lu]", opts->max_udp_session_num, EVICTE_SESSION_BURST * 2, UINT64_MAX);
return -1;
}
if (opts->tcp_init_timeout < 1 || opts->tcp_init_timeout > 60000)
{
SESSION_LOG_ERROR("invalid tcp_init_timeout: %lu, supported range: [1, 60000]", opts->tcp_init_timeout);
@@ -116,16 +108,16 @@ static void tcp_pcb_clean(struct tcp_pcb *pcb)
{
if (pcb)
{
tcp_reassembly_free(pcb->c2s_assembler);
tcp_reassembly_free(pcb->s2c_assembler);
tcp_reassembly_free(pcb->c2s.assembler);
tcp_reassembly_free(pcb->s2c.assembler);
}
}
static int tcp_pcb_init(struct tcp_pcb *pcb, uint64_t max_timeout, uint64_t max_seg_num)
{
pcb->c2s_assembler = tcp_reassembly_new(max_timeout, max_seg_num);
pcb->s2c_assembler = tcp_reassembly_new(max_timeout, max_seg_num);
if (pcb->c2s_assembler == NULL || pcb->s2c_assembler == NULL)
pcb->c2s.assembler = tcp_reassembly_new(max_timeout, max_seg_num);
pcb->s2c.assembler = tcp_reassembly_new(max_timeout, max_seg_num);
if (pcb->c2s.assembler == NULL || pcb->s2c.assembler == NULL)
{
tcp_pcb_clean(pcb);
return -1;
@@ -134,100 +126,69 @@ static int tcp_pcb_init(struct tcp_pcb *pcb, uint64_t max_timeout, uint64_t max_
return 0;
}
static void tcp_pcb_update(struct tcp_pcb *pcb, enum session_dir dir, const struct pkt_layer *tcp_layer, uint64_t now)
static void tcp_half_update(struct tcp_half *half, const struct pkt_layer *tcp_layer, uint64_t now)
{
struct tcp_segment *seg;
struct tcp_reassembly *assembler;
struct tcphdr *hdr = (struct tcphdr *)tcp_layer->hdr_ptr;
uint32_t seq = tcp_hdr_get_seq(hdr);
uint32_t ack = tcp_hdr_get_ack(hdr);
uint8_t flags = tcp_hdr_get_flags(hdr);
uint32_t rcv_nxt;
/*
* https://www.rfc-editor.org/rfc/rfc5961#section-3.2
*
* If the RST bit is set and the sequence number exactly matches the
* next expected sequence number (RCV.NXT), then TCP MUST reset the
* connection.
*
* if fin is received, the expected sequence number should be increased by 1
*/
uint16_t expect = 0;
if (dir == SESSION_DIR_C2S)
{
pcb->c2s_seq = seq;
pcb->c2s_ack = ack;
assembler = pcb->c2s_assembler;
expect = pcb->s2c_ack;
expect += pcb->sub_state & TCP_S2C_FIN_RCVD ? 1 : 0;
pcb->sub_state |= (flags & TH_SYN) ? TCP_SYN_RCVD : 0;
pcb->sub_state |= (flags & TH_FIN) ? TCP_C2S_FIN_RCVD : 0;
pcb->sub_state |= ((flags & TH_RST) && (seq == expect)) ? TCP_C2S_RST_RCVD : 0;
pcb->sub_state |= ((flags & TH_RST) && (seq != expect)) ? TCP_C2S_UNVERIFIED_RST_RCVD : 0;
}
else
{
pcb->s2c_seq = seq;
pcb->s2c_ack = ack;
assembler = pcb->s2c_assembler;
expect = pcb->c2s_ack;
expect += pcb->sub_state & TCP_C2S_FIN_RCVD ? 1 : 0;
pcb->sub_state |= (flags & TH_SYN) ? TCP_SYN_ACK_RCVD : 0;
pcb->sub_state |= (flags & TH_FIN) ? TCP_S2C_FIN_RCVD : 0;
pcb->sub_state |= ((flags & TH_RST) && (seq == expect)) ? TCP_S2C_RST_RCVD : 0;
pcb->sub_state |= ((flags & TH_RST) && (seq != expect)) ? TCP_S2C_UNVERIFIED_RST_RCVD : 0;
}
half->flags |= flags;
half->seq = tcp_hdr_get_seq(hdr);
half->ack = tcp_hdr_get_ack(hdr);
if (flags & TH_SYN)
{
tcp_reassembly_set_recv_next(assembler, seq + 1);
tcp_reassembly_set_recv_next(half->assembler, half->seq + 1);
}
seg = tcp_reassembly_expire(assembler, now);
seg = tcp_reassembly_expire(half->assembler, now);
if (seg)
{
// TODO add metric (expire)
half->nr_seg_expired++;
half->nr_seg_released++;
tcp_segment_free(seg);
}
if (tcp_layer->pld_len)
{
rcv_nxt = tcp_reassembly_get_recv_next(assembler);
if (seq == rcv_nxt)
half->nr_seg_received++;
uint32_t rcv_nxt = tcp_reassembly_get_recv_next(half->assembler);
if (half->seq == rcv_nxt)
{
pcb->order_seg.data = tcp_layer->pld_ptr;
pcb->order_seg.len = tcp_layer->pld_len;
tcp_reassembly_inc_recv_next(assembler, tcp_layer->pld_len);
half->nr_seg_inorder++;
half->order.data = tcp_layer->pld_ptr;
half->order.len = tcp_layer->pld_len;
tcp_reassembly_inc_recv_next(half->assembler, tcp_layer->pld_len);
}
else if (before(seq, rcv_nxt))
else if (before(half->seq, rcv_nxt))
{
// TODO add metric (overlap)
half->nr_seg_overlap++;
}
else if ((seg = tcp_segment_new(seq, tcp_layer->pld_ptr, tcp_layer->pld_len)))
else if ((seg = tcp_segment_new(half->seq, tcp_layer->pld_ptr, tcp_layer->pld_len)))
{
switch (tcp_reassembly_push(assembler, seg, now))
switch (tcp_reassembly_push(half->assembler, seg, now))
{
case -1:
// TODO add metric (assembler full)
half->nr_seg_no_space++;
tcp_segment_free(seg);
break;
case 0:
// TODO add metric (assembler push success)
half->nr_seg_buffered++;
break;
case 1:
// TODO add metric (assembler push success, overlap)
half->nr_seg_buffered++;
half->nr_seg_overlap++;
break;
default:
assert(0);
break;
}
}
else
{
half->nr_seg_no_space++;
}
}
}
@@ -363,7 +324,7 @@ static int session_manager_self_protection(struct session_manager *mgr, struct s
switch (key->ip_proto)
{
case IPPROTO_TCP:
if (stat->tcp_sess.nr_sess_used >= mgr->max_tcp_session_num)
if (stat->tcp_sess.nr_sess_used >= mgr->opts.max_tcp_session_num)
{
stat->evc_pkt.nr_pkts++;
stat->evc_pkt.nr_bytes += packet_get_len(pkt);
@@ -372,7 +333,7 @@ static int session_manager_self_protection(struct session_manager *mgr, struct s
}
break;
case IPPROTO_UDP:
if (stat->udp_sess.nr_sess_used >= mgr->max_udp_session_num)
if (stat->udp_sess.nr_sess_used >= mgr->opts.max_udp_session_num)
{
stat->evc_pkt.nr_pkts++;
stat->evc_pkt.nr_bytes += packet_get_len(pkt);
@@ -559,7 +520,7 @@ static struct session *session_manager_new_tcp_session(struct session_manager *m
}
// tcp table full evict old session
if (mgr->tcp_overload_evict_old_sess && mgr->stat.tcp_sess.nr_sess_used >= mgr->max_tcp_session_num - EVICTE_SESSION_BURST)
if (mgr->opts.tcp_overload_evict_old_sess && mgr->stat.tcp_sess.nr_sess_used >= mgr->opts.max_tcp_session_num - EVICTE_SESSION_BURST)
{
struct session *evic_sess = session_table_find_lru(mgr->tcp_sess_table);
session_manager_evicte_session(mgr, evic_sess, now);
@@ -575,20 +536,21 @@ static struct session *session_manager_new_tcp_session(struct session_manager *m
session_init(sess);
session_set_id(sess, id_generator_alloc());
if (tcp_pcb_init(&sess->tcp_pcb, mgr->tcp_reassembly_max_timeout, mgr->tcp_reassembly_max_segments) == -1)
if (tcp_pcb_init(&sess->tcp_pcb, mgr->opts.tcp_reassembly_max_timeout, mgr->opts.tcp_reassembly_max_segments) == -1)
{
assert(0);
session_pool_push(mgr->sess_pool, sess);
return NULL;
}
tcp_pcb_update(&sess->tcp_pcb, dir, tcp_layer, now);
struct tcp_half *curr = (dir == SESSION_DIR_C2S) ? &sess->tcp_pcb.c2s : &sess->tcp_pcb.s2c;
tcp_half_update(curr, tcp_layer, now);
enum session_state next_state = session_transition_run(SESSION_STATE_INIT, TCP_SYN);
session_update(sess, next_state, pkt, key, dir, now);
session_transition_log(sess, SESSION_STATE_INIT, next_state, TCP_SYN);
session_stat_inc(&mgr->stat.tcp_sess, next_state);
uint64_t timeout = (flags & TH_ACK) ? mgr->tcp_handshake_timeout : mgr->tcp_init_timeout;
uint64_t timeout = (flags & TH_ACK) ? mgr->opts.tcp_handshake_timeout : mgr->opts.tcp_init_timeout;
session_timer_update(mgr->sess_timer, sess, now + timeout);
session_table_add(mgr->tcp_sess_table, key, sess);
@@ -601,7 +563,7 @@ static struct session *session_manager_new_tcp_session(struct session_manager *m
static struct session *session_manager_new_udp_session(struct session_manager *mgr, const struct packet *pkt, const struct tuple6 *key, uint64_t now)
{
// udp table full evict old session
if (mgr->udp_overload_evict_old_sess && mgr->stat.udp_sess.nr_sess_used >= mgr->max_udp_session_num - EVICTE_SESSION_BURST)
if (mgr->opts.udp_overload_evict_old_sess && mgr->stat.udp_sess.nr_sess_used >= mgr->opts.max_udp_session_num - EVICTE_SESSION_BURST)
{
struct session *evic_sess = session_table_find_lru(mgr->udp_sess_table);
session_manager_evicte_session(mgr, evic_sess, now);
@@ -623,7 +585,7 @@ static struct session *session_manager_new_udp_session(struct session_manager *m
session_transition_log(sess, SESSION_STATE_INIT, next_state, UDP_DATA);
session_stat_inc(&mgr->stat.udp_sess, next_state);
session_timer_update(mgr->sess_timer, sess, now + mgr->udp_data_timeout);
session_timer_update(mgr->sess_timer, sess, now + mgr->opts.udp_data_timeout);
session_table_add(mgr->udp_sess_table, key, sess);
return sess;
@@ -635,65 +597,74 @@ static int session_manager_update_tcp_session(struct session_manager *mgr, struc
const struct tcphdr *hdr = (const struct tcphdr *)tcp_layer->hdr_ptr;
enum session_dir dir = identify_direction_by_history(sess, key);
uint8_t flags = tcp_hdr_get_flags(hdr);
int inputs = (flags & TH_SYN) ? TCP_SYN : NONE;
int inputs = 0;
inputs |= (flags & TH_SYN) ? TCP_SYN : NONE;
inputs |= (flags & TH_FIN) ? TCP_FIN : NONE;
inputs |= (flags & TH_RST) ? TCP_RST : NONE;
inputs |= tcp_layer->pld_len ? TCP_DATA : NONE;
// update state
enum session_state curr_state = session_get_state(sess);
enum session_state next_state = session_transition_run(curr_state, inputs);
session_update(sess, next_state, pkt, key, dir, now);
session_transition_log(sess, curr_state, next_state, inputs);
session_stat_update(mgr, sess, curr_state, next_state);
tcp_pcb_update(&sess->tcp_pcb, dir, tcp_layer, now);
// update session
session_update(sess, next_state, pkt, key, dir, now);
session_stat_update(mgr, sess, curr_state, next_state);
session_transition_log(sess, curr_state, next_state, inputs);
// update tcp pcb
struct tcp_half *curr = (dir == SESSION_DIR_C2S) ? &sess->tcp_pcb.c2s : &sess->tcp_pcb.s2c;
struct tcp_half *peer = (dir == SESSION_DIR_C2S) ? &sess->tcp_pcb.s2c : &sess->tcp_pcb.c2s;
tcp_half_update(curr, tcp_layer, now);
// set closing reason
if (next_state == SESSION_STATE_CLOSING && !session_get_closing_reason(sess))
{
if (tcp_hdr_get_fin_flag(hdr))
if (flags & TH_FIN)
{
session_set_closing_reason(sess, (dir == SESSION_DIR_C2S ? CLOSING_BY_CLIENT_FIN : CLOSING_BY_SERVER_FIN));
}
if (tcp_hdr_get_rst_flag(hdr))
if (flags & TH_RST)
{
session_set_closing_reason(sess, (dir == SESSION_DIR_C2S ? CLOSING_BY_CLIENT_RST : CLOSING_BY_SERVER_RST));
}
}
uint16_t sub_state = sess->tcp_pcb.sub_state;
// update timeout
uint64_t timeout = 0;
switch (next_state)
{
case SESSION_STATE_OPENING:
if (flags & TH_SYN)
{
timeout = (flags & TH_ACK) ? mgr->tcp_handshake_timeout : mgr->tcp_init_timeout;
timeout = (flags & TH_ACK) ? mgr->opts.tcp_handshake_timeout : mgr->opts.tcp_init_timeout;
}
else
{
timeout = mgr->tcp_data_timeout;
timeout = mgr->opts.tcp_data_timeout;
}
break;
case SESSION_STATE_ACTIVE:
timeout = mgr->tcp_data_timeout;
timeout = mgr->opts.tcp_data_timeout;
break;
case SESSION_STATE_CLOSING:
if (flags & TH_FIN)
{
timeout = (sub_state & TCP_C2S_FIN_RCVD && sub_state & TCP_S2C_FIN_RCVD) ? mgr->tcp_time_wait_timeout : mgr->tcp_half_closed_timeout;
timeout = (peer->flags & TH_FIN) ? mgr->opts.tcp_time_wait_timeout : mgr->opts.tcp_half_closed_timeout;
}
else if (flags & TH_RST)
{
timeout = (sub_state & TCP_C2S_RST_RCVD || sub_state & TCP_S2C_RST_RCVD) ? mgr->tcp_time_wait_timeout : mgr->tcp_unverified_rst_timeout;
// if fin is received, the expected sequence number should be increased by 1
uint32_t expected = (peer->flags & TH_FIN) ? peer->ack + 1 : peer->ack;
timeout = (expected == curr->seq) ? mgr->opts.tcp_time_wait_timeout : mgr->opts.tcp_unverified_rst_timeout;
}
else
{
timeout = mgr->tcp_data_timeout;
timeout = mgr->opts.tcp_data_timeout;
}
break;
case SESSION_STATE_DISCARD:
timeout = mgr->tcp_discard_timeout;
timeout = mgr->opts.tcp_discard_timeout;
break;
default:
assert(0);
@@ -712,7 +683,7 @@ static int session_manager_update_udp_session(struct session_manager *mgr, struc
session_update(sess, next_state, pkt, key, dir, now);
session_transition_log(sess, curr_state, next_state, UDP_DATA);
session_stat_update(mgr, sess, curr_state, next_state);
session_timer_update(mgr->sess_timer, sess, now + mgr->udp_data_timeout);
session_timer_update(mgr->sess_timer, sess, now + mgr->opts.udp_data_timeout);
return 0;
}
@@ -733,26 +704,9 @@ struct session_manager *session_manager_new(struct session_manager_options *opts
{
return NULL;
}
// max session number
mgr->max_tcp_session_num = (opts->max_tcp_session_num < EVICTE_SESSION_BURST * 2) ? EVICTE_SESSION_BURST * 2 : opts->max_tcp_session_num;
mgr->max_udp_session_num = (opts->max_udp_session_num < EVICTE_SESSION_BURST * 2) ? EVICTE_SESSION_BURST * 2 : opts->max_udp_session_num;
// session overload
mgr->stat.tcp_sess.nr_sess_init = 0;
mgr->tcp_overload_evict_old_sess = opts->tcp_overload_evict_old_sess;
mgr->udp_overload_evict_old_sess = opts->udp_overload_evict_old_sess;
// session timeout
mgr->tcp_init_timeout = opts->tcp_init_timeout;
mgr->tcp_handshake_timeout = opts->tcp_handshake_timeout;
mgr->tcp_data_timeout = opts->tcp_data_timeout;
mgr->tcp_half_closed_timeout = opts->tcp_half_closed_timeout;
mgr->tcp_time_wait_timeout = opts->tcp_time_wait_timeout;
mgr->tcp_discard_timeout = opts->tcp_discard_timeout;
mgr->tcp_unverified_rst_timeout = opts->tcp_unverified_rst_timeout;
mgr->udp_data_timeout = opts->udp_data_timeout;
// tcp reassembly
mgr->tcp_reassembly_max_timeout = opts->tcp_reassembly_max_timeout;
mgr->tcp_reassembly_max_segments = opts->tcp_reassembly_max_segments;
memcpy(&mgr->opts, opts, sizeof(struct session_manager_options));
// duplicated packet filter
struct duplicated_packet_filter_options duplicated_packet_filter_opts = {
.enable = opts->duplicated_packet_filter_enable,
@@ -768,7 +722,7 @@ struct session_manager *session_manager_new(struct session_manager_options *opts
.error_rate = opts->evicted_session_filter_error_rate,
};
mgr->sess_pool = session_pool_new(mgr->max_tcp_session_num + mgr->max_udp_session_num);
mgr->sess_pool = session_pool_new(mgr->opts.max_tcp_session_num + mgr->opts.max_udp_session_num);
mgr->tcp_sess_table = session_table_new();
mgr->udp_sess_table = session_table_new();
mgr->sess_timer = session_timer_new(now);
@@ -946,10 +900,10 @@ struct session *session_manager_get_expired_session(struct session_manager *mgr,
switch (session_get_type(sess))
{
case SESSION_TYPE_TCP:
timeout = mgr->tcp_data_timeout;
timeout = mgr->opts.tcp_data_timeout;
break;
case SESSION_TYPE_UDP:
timeout = mgr->udp_data_timeout;
timeout = mgr->opts.udp_data_timeout;
break;
default:
assert(0);