#include #include #include "stellar.h" #include "tcp_utils.h" #include "udp_utils.h" #include "id_generator.h" #include "session_pool.h" #include "session_table.h" #include "session_timer.h" #include "session_manager.h" #include "session_transition.h" #include "evicted_session_filter.h" #include "duplicated_packet_filter.h" struct session_manager { // max session number uint64_t max_tcp_session_num; uint64_t max_udp_session_num; // session overload uint8_t tcp_overload_evict_old_sess; // 1: evict old session, 0: bypass new session uint8_t udp_overload_evict_old_sess; // 1: evict old session, 0: bypass new session // TCP timeout uint64_t tcp_init_timeout; // range: [1, 60000] uint64_t tcp_handshake_timeout; // range: [1, 60000] uint64_t tcp_data_timeout; // range: [1, 15999999000] uint64_t tcp_half_closed_timeout; // range: [1, 604800000] uint64_t tcp_time_wait_timeout; // range: [1, 600000] uint64_t tcp_discard_timeout; // range: [1, 15999999000] uint64_t tcp_unverified_rst_timeout; // range: [1, 600000] // UDP timeout uint64_t udp_data_timeout; // range: [1, 15999999000] // TCP reassembly uint32_t tcp_reassembly_max_timeout; // range: [1, 60000] (ms) uint32_t tcp_reassembly_max_segments; // range: [2, 32] struct session_pool *sess_pool; struct session_table *tcp_sess_table; struct session_table *udp_sess_table; struct session_timer *sess_timer; struct list_head evicte_queue; struct duplicated_packet_filter *dup_pkt_filter; struct evicted_session_filter *evicte_sess_filter; struct session_manager_stat stat; }; #define EVICTE_SESSION_BURST (RX_BURST_MAX) // TODO int check_options(const struct session_manager_options *opts) { if (opts == NULL) { SESSION_LOG_ERROR("invalid options"); return -1; } if (opts->tcp_init_timeout < 1 || opts->tcp_init_timeout > 60000) { SESSION_LOG_ERROR("invalid tcp_init_timeout: %lu, supported range: [1, 60000]", opts->tcp_init_timeout); return -1; } if (opts->tcp_handshake_timeout < 1 || opts->tcp_handshake_timeout > 60000) { SESSION_LOG_ERROR("invalid tcp_handshake_timeout: %lu, supported range: [1, 60000]", opts->tcp_handshake_timeout); return -1; } if (opts->tcp_data_timeout < 1 || opts->tcp_data_timeout > 15999999000) { SESSION_LOG_ERROR("invalid tcp_data_timeout: %lu, supported range: [1, 15999999000]", opts->tcp_data_timeout); return -1; } if (opts->tcp_half_closed_timeout < 1 || opts->tcp_half_closed_timeout > 604800000) { SESSION_LOG_ERROR("invalid tcp_half_closed_timeout: %lu, supported range: [1, 604800000]", opts->tcp_half_closed_timeout); return -1; } if (opts->tcp_time_wait_timeout < 1 || opts->tcp_time_wait_timeout > 600000) { SESSION_LOG_ERROR("invalid tcp_time_wait_timeout: %lu, supported range: [1, 600000]", opts->tcp_time_wait_timeout); return -1; } if (opts->tcp_discard_timeout < 1 || opts->tcp_discard_timeout > 15999999000) { SESSION_LOG_ERROR("invalid tcp_discard_timeout: %lu, supported range: [1, 15999999000]", opts->tcp_discard_timeout); return -1; } if (opts->tcp_unverified_rst_timeout < 1 || opts->tcp_unverified_rst_timeout > 600000) { SESSION_LOG_ERROR("invalid tcp_unverified_rst_timeout: %lu, supported range: [1, 600000]", opts->tcp_unverified_rst_timeout); return -1; } if (opts->udp_data_timeout < 1 || opts->udp_data_timeout > 15999999000) { SESSION_LOG_ERROR("invalid udp_data_timeout: %lu, supported range: [1, 15999999000]", opts->udp_data_timeout); return -1; } return 0; } /* * The next routines deal with comparing 32 bit unsigned ints * and worry about wraparound (automatic with unsigned arithmetic). */ static inline bool before(uint32_t seq1, uint32_t seq2) { return (int32_t)(seq1 - seq2) < 0; } static void tcp_pcb_clean(struct tcp_pcb *pcb) { if (pcb) { tcp_reassembly_free(pcb->c2s_assembler); tcp_reassembly_free(pcb->s2c_assembler); } } static int tcp_pcb_init(struct tcp_pcb *pcb, uint64_t max_timeout, uint64_t max_seg_num) { pcb->c2s_assembler = tcp_reassembly_new(max_timeout, max_seg_num); pcb->s2c_assembler = tcp_reassembly_new(max_timeout, max_seg_num); if (pcb->c2s_assembler == NULL || pcb->s2c_assembler == NULL) { tcp_pcb_clean(pcb); return -1; } return 0; } static void tcp_pcb_update(struct tcp_pcb *pcb, enum session_dir dir, const struct pkt_layer *tcp_layer, uint64_t now) { struct tcp_segment *seg; struct tcp_reassembly *assembler; struct tcphdr *hdr = (struct tcphdr *)tcp_layer->hdr_ptr; uint32_t seq = tcp_hdr_get_seq(hdr); uint32_t ack = tcp_hdr_get_ack(hdr); uint8_t flags = tcp_hdr_get_flags(hdr); uint32_t rcv_nxt; /* * https://www.rfc-editor.org/rfc/rfc5961#section-3.2 * * If the RST bit is set and the sequence number exactly matches the * next expected sequence number (RCV.NXT), then TCP MUST reset the * connection. * * if fin is received, the expected sequence number should be increased by 1 */ uint16_t expect = 0; if (dir == SESSION_DIR_C2S) { pcb->c2s_seq = seq; pcb->c2s_ack = ack; assembler = pcb->c2s_assembler; expect = pcb->s2c_ack; expect += pcb->sub_state & TCP_S2C_FIN_RCVD ? 1 : 0; pcb->sub_state |= (flags & TH_SYN) ? TCP_SYN_RCVD : 0; pcb->sub_state |= (flags & TH_FIN) ? TCP_C2S_FIN_RCVD : 0; pcb->sub_state |= ((flags & TH_RST) && (seq == expect)) ? TCP_C2S_RST_RCVD : 0; pcb->sub_state |= ((flags & TH_RST) && (seq != expect)) ? TCP_C2S_UNVERIFIED_RST_RCVD : 0; } else { pcb->s2c_seq = seq; pcb->s2c_ack = ack; assembler = pcb->s2c_assembler; expect = pcb->c2s_ack; expect += pcb->sub_state & TCP_C2S_FIN_RCVD ? 1 : 0; pcb->sub_state |= (flags & TH_SYN) ? TCP_SYN_ACK_RCVD : 0; pcb->sub_state |= (flags & TH_FIN) ? TCP_S2C_FIN_RCVD : 0; pcb->sub_state |= ((flags & TH_RST) && (seq == expect)) ? TCP_S2C_RST_RCVD : 0; pcb->sub_state |= ((flags & TH_RST) && (seq != expect)) ? TCP_S2C_UNVERIFIED_RST_RCVD : 0; } if (flags & TH_SYN) { tcp_reassembly_set_recv_next(assembler, seq + 1); } seg = tcp_reassembly_expire(assembler, now); if (seg) { // TODO add metric (expire) tcp_segment_free(seg); } if (tcp_layer->pld_len) { rcv_nxt = tcp_reassembly_get_recv_next(assembler); if (seq == rcv_nxt) { pcb->order_seg.data = tcp_layer->pld_ptr; pcb->order_seg.len = tcp_layer->pld_len; tcp_reassembly_inc_recv_next(assembler, tcp_layer->pld_len); } else if (before(seq, rcv_nxt)) { // TODO add metric (overlap) } else if ((seg = tcp_segment_new(seq, tcp_layer->pld_ptr, tcp_layer->pld_len))) { switch (tcp_reassembly_push(assembler, seg, now)) { case -1: // TODO add metric (assembler full) tcp_segment_free(seg); break; case 0: // TODO add metric (assembler push success) break; case 1: // TODO add metric (assembler push success, overlap) break; default: assert(0); break; } } } } /****************************************************************************** * Stat ******************************************************************************/ static void session_stat_inc(struct session_stat *stat, enum session_state state) { switch (state) { case SESSION_STATE_INIT: stat->nr_sess_init++; break; case SESSION_STATE_OPENING: stat->nr_sess_opening++; break; case SESSION_STATE_ACTIVE: stat->nr_sess_active++; break; case SESSION_STATE_CLOSING: stat->nr_sess_closing++; break; case SESSION_STATE_DISCARD: stat->nr_sess_discard++; break; case SESSION_STATE_CLOSED: stat->nr_sess_closed++; break; default: break; } } static void session_stat_dec(struct session_stat *stat, enum session_state state) { switch (state) { case SESSION_STATE_INIT: stat->nr_sess_init--; break; case SESSION_STATE_OPENING: stat->nr_sess_opening--; break; case SESSION_STATE_ACTIVE: stat->nr_sess_active--; break; case SESSION_STATE_CLOSING: stat->nr_sess_closing--; break; case SESSION_STATE_DISCARD: stat->nr_sess_discard--; break; case SESSION_STATE_CLOSED: stat->nr_sess_closed--; break; default: break; } } static void session_stat_update(struct session_manager *mgr, struct session *sess, enum session_state curr_state, enum session_state next_state) { switch (session_get_type(sess)) { case SESSION_TYPE_TCP: session_stat_dec(&mgr->stat.tcp_sess, curr_state); session_stat_inc(&mgr->stat.tcp_sess, next_state); break; case SESSION_TYPE_UDP: session_stat_dec(&mgr->stat.udp_sess, curr_state); session_stat_inc(&mgr->stat.udp_sess, next_state); break; default: break; } } /****************************************************************************** * Session Direction ******************************************************************************/ static enum session_dir identify_direction_by_port(uint16_t src_port, uint16_t dst_port) { // big port is client if (src_port > dst_port) { return SESSION_DIR_C2S; } else if (src_port < dst_port) { return SESSION_DIR_S2C; } else { // if port is equal, first packet is C2S return SESSION_DIR_C2S; } } static enum session_dir identify_direction_by_history(const struct session *sess, const struct tuple6 *key) { if (tuple6_cmp(session_get_tuple(sess), key) == 0) { return session_get_tuple_dir(sess); } else { return (session_get_tuple_dir(sess) == SESSION_DIR_C2S ? SESSION_DIR_S2C : SESSION_DIR_C2S); } } /****************************************************************************** * Session Filter ******************************************************************************/ #define MAX_FILTER_NUM_PER_STAGE 4 enum filter_stage { FILTER_STAGE_PRE_NEW_SESS, FILTER_STAGE_PRE_UPDATE_SESS, MAX_FILTER_STAGE, }; // return 1: bypass // return 0: not bypass typedef int filter(struct session_manager *mgr, struct session *sess, const struct packet *pkt, const struct tuple6 *key, uint64_t now); // on pre new session static int session_manager_self_protection(struct session_manager *mgr, struct session *sess, const struct packet *pkt, const struct tuple6 *key, uint64_t now) { struct session_manager_stat *stat = &mgr->stat; switch (key->ip_proto) { case IPPROTO_TCP: if (stat->tcp_sess.nr_sess_used >= mgr->max_tcp_session_num) { stat->evc_pkt.nr_pkts++; stat->evc_pkt.nr_bytes += packet_get_len(pkt); stat->tcp_sess.nr_new_sess_evicted++; return 1; } break; case IPPROTO_UDP: if (stat->udp_sess.nr_sess_used >= mgr->max_udp_session_num) { stat->evc_pkt.nr_pkts++; stat->evc_pkt.nr_bytes += packet_get_len(pkt); stat->udp_sess.nr_new_sess_evicted++; return 1; } break; default: break; } return 0; } // on pre new session static int session_manager_filter_evicted_session(struct session_manager *mgr, struct session *sess, const struct packet *pkt, const struct tuple6 *key, uint64_t now) { if (evicted_session_filter_lookup(mgr->evicte_sess_filter, key, now)) { mgr->stat.evc_pkt.nr_pkts++; mgr->stat.evc_pkt.nr_bytes += packet_get_len(pkt); return 1; } return 0; } // on pre update session static int session_manager_filter_duplicated_packet(struct session_manager *mgr, struct session *sess, const struct packet *pkt, const struct tuple6 *key, uint64_t now) { enum session_dir dir = identify_direction_by_history(sess, key); if ((dir == SESSION_DIR_C2S && session_get_metric(sess, SESSION_METRIC_C2S_PACKETS) < 3) || (dir == SESSION_DIR_S2C && session_get_metric(sess, SESSION_METRIC_S2C_PACKETS) < 3) || (session_has_dup_traffic(sess) == 1)) { if (duplicated_packet_filter_lookup(mgr->dup_pkt_filter, pkt, now)) { mgr->stat.dup_pkt.nr_pkts++; mgr->stat.dup_pkt.nr_bytes += packet_get_len(pkt); session_set_dup_traffic(sess); return 1; } else { duplicated_packet_filter_add(mgr->dup_pkt_filter, pkt, now); return 0; } } return 0; } filter *smf[MAX_FILTER_STAGE][MAX_FILTER_NUM_PER_STAGE]; static void session_filter_init() { for (int i = 0; i < MAX_FILTER_STAGE; i++) { for (int j = 0; j < MAX_FILTER_NUM_PER_STAGE; j++) { smf[i][j] = NULL; } } smf[FILTER_STAGE_PRE_NEW_SESS][0] = session_manager_self_protection; smf[FILTER_STAGE_PRE_NEW_SESS][1] = session_manager_filter_evicted_session; smf[FILTER_STAGE_PRE_NEW_SESS][2] = NULL; smf[FILTER_STAGE_PRE_UPDATE_SESS][0] = session_manager_filter_duplicated_packet; smf[FILTER_STAGE_PRE_UPDATE_SESS][1] = NULL; } // return 1: bypass packet // return 0: not bypass packet static int session_filter_run(struct session_manager *mgr, enum filter_stage stage, struct session *sess, const struct packet *pkt, const struct tuple6 *key, uint64_t now) { filter **list = smf[stage]; for (int i = 0; i < MAX_FILTER_NUM_PER_STAGE; i++) { if (list[i]) { if (list[i](mgr, sess, pkt, key, now)) { return 1; } } else { break; } } return 0; } /****************************************************************************** * Session Manager ******************************************************************************/ static void session_update(struct session *sess, enum session_state next_state, const struct packet *pkt, const struct tuple6 *key, enum session_dir dir, uint64_t now) { if (session_get_state(sess) == SESSION_STATE_INIT) { session_set_tuple(sess, key); session_set_tuple_dir(sess, dir); session_set_timestamp(sess, SESSION_TIMESTAMP_NEW, now); switch (key->ip_proto) { case IPPROTO_TCP: session_set_type(sess, SESSION_TYPE_TCP); break; case IPPROTO_UDP: session_set_type(sess, SESSION_TYPE_UDP); break; default: assert(0); break; } } if (dir == SESSION_DIR_C2S) { session_inc_metric(sess, SESSION_METRIC_C2S_PACKETS, 1); session_inc_metric(sess, SESSION_METRIC_C2S_BYTES, packet_get_len(pkt)); session_set_packet(sess, SESSION_PACKET_C2S_1ST, pkt); } else { session_inc_metric(sess, SESSION_METRIC_S2C_PACKETS, 1); session_inc_metric(sess, SESSION_METRIC_S2C_BYTES, packet_get_len(pkt)); session_set_packet(sess, SESSION_PACKET_S2C_1ST, pkt); } session_set_packet(sess, SESSION_PACKET_CURRENT, pkt); session_set_cur_dir(sess, dir); session_set_timestamp(sess, SESSION_TIMESTAMP_LAST, now); session_set_state(sess, next_state); } static void session_manager_evicte_session(struct session_manager *mgr, struct session *sess, uint64_t now) { if (sess == NULL) { return; } // when session add to evicted queue, session lifetime is over enum session_state curr_state = session_get_state(sess); enum session_state next_state = session_transition_run(curr_state, LRU_EVICT); session_transition_log(sess, curr_state, next_state, LRU_EVICT); session_set_state(sess, next_state); session_stat_update(mgr, sess, curr_state, next_state); session_timer_del(mgr->sess_timer, sess); session_set_closing_reason(sess, CLOSING_BY_EVICTED); list_add_tail(&sess->evicte, &mgr->evicte_queue); switch (session_get_type(sess)) { case SESSION_TYPE_TCP: SESSION_LOG_DEBUG("evicte tcp old session: %lu", session_get_id(sess)); mgr->stat.tcp_sess.nr_old_sess_evicted++; session_table_del(mgr->tcp_sess_table, session_get_tuple(sess)); break; case SESSION_TYPE_UDP: SESSION_LOG_DEBUG("evicte udp old session: %lu", session_get_id(sess)); mgr->stat.udp_sess.nr_old_sess_evicted++; session_table_del(mgr->udp_sess_table, session_get_tuple(sess)); evicted_session_filter_add(mgr->evicte_sess_filter, session_get_tuple(sess), now); break; default: assert(0); break; } } static struct session *session_manager_new_tcp_session(struct session_manager *mgr, const struct packet *pkt, const struct tuple6 *key, uint64_t now) { const struct pkt_layer *tcp_layer = packet_get_innermost_layer(pkt, LAYER_TYPE_TCP); const struct tcphdr *hdr = (const struct tcphdr *)tcp_layer->hdr_ptr; uint8_t flags = tcp_hdr_get_flags(hdr); if (!(flags & TH_SYN)) { return NULL; } // tcp table full evict old session if (mgr->tcp_overload_evict_old_sess && mgr->stat.tcp_sess.nr_sess_used >= mgr->max_tcp_session_num - EVICTE_SESSION_BURST) { struct session *evic_sess = session_table_find_lru(mgr->tcp_sess_table); session_manager_evicte_session(mgr, evic_sess, now); } enum session_dir dir = (flags & TH_ACK) ? SESSION_DIR_S2C : SESSION_DIR_C2S; struct session *sess = session_pool_pop(mgr->sess_pool); if (sess == NULL) { assert(0); return NULL; } session_init(sess); session_set_id(sess, id_generator_alloc()); if (tcp_pcb_init(&sess->tcp_pcb, mgr->tcp_reassembly_max_timeout, mgr->tcp_reassembly_max_segments) == -1) { assert(0); session_pool_push(mgr->sess_pool, sess); return NULL; } tcp_pcb_update(&sess->tcp_pcb, dir, tcp_layer, now); enum session_state next_state = session_transition_run(SESSION_STATE_INIT, TCP_SYN); session_update(sess, next_state, pkt, key, dir, now); session_transition_log(sess, SESSION_STATE_INIT, next_state, TCP_SYN); session_stat_inc(&mgr->stat.tcp_sess, next_state); uint64_t timeout = (flags & TH_ACK) ? mgr->tcp_handshake_timeout : mgr->tcp_init_timeout; session_timer_update(mgr->sess_timer, sess, now + timeout); session_table_add(mgr->tcp_sess_table, key, sess); duplicated_packet_filter_add(mgr->dup_pkt_filter, pkt, now); mgr->stat.tcp_sess.nr_sess_used++; return sess; } static struct session *session_manager_new_udp_session(struct session_manager *mgr, const struct packet *pkt, const struct tuple6 *key, uint64_t now) { // udp table full evict old session if (mgr->udp_overload_evict_old_sess && mgr->stat.udp_sess.nr_sess_used >= mgr->max_udp_session_num - EVICTE_SESSION_BURST) { struct session *evic_sess = session_table_find_lru(mgr->udp_sess_table); session_manager_evicte_session(mgr, evic_sess, now); } struct session *sess = session_pool_pop(mgr->sess_pool); if (sess == NULL) { assert(sess); return NULL; } mgr->stat.udp_sess.nr_sess_used++; session_init(sess); session_set_id(sess, id_generator_alloc()); enum session_dir dir = identify_direction_by_port(ntohs(key->src_port), ntohs(key->dst_port)); enum session_state next_state = session_transition_run(SESSION_STATE_INIT, UDP_DATA); session_update(sess, next_state, pkt, key, dir, now); session_transition_log(sess, SESSION_STATE_INIT, next_state, UDP_DATA); session_stat_inc(&mgr->stat.udp_sess, next_state); session_timer_update(mgr->sess_timer, sess, now + mgr->udp_data_timeout); session_table_add(mgr->udp_sess_table, key, sess); return sess; } static int session_manager_update_tcp_session(struct session_manager *mgr, struct session *sess, const struct packet *pkt, const struct tuple6 *key, uint64_t now) { const struct pkt_layer *tcp_layer = packet_get_innermost_layer(pkt, LAYER_TYPE_TCP); const struct tcphdr *hdr = (const struct tcphdr *)tcp_layer->hdr_ptr; enum session_dir dir = identify_direction_by_history(sess, key); uint8_t flags = tcp_hdr_get_flags(hdr); int inputs = (flags & TH_SYN) ? TCP_SYN : NONE; inputs |= (flags & TH_FIN) ? TCP_FIN : NONE; inputs |= (flags & TH_RST) ? TCP_RST : NONE; inputs |= tcp_layer->pld_len ? TCP_DATA : NONE; enum session_state curr_state = session_get_state(sess); enum session_state next_state = session_transition_run(curr_state, inputs); session_update(sess, next_state, pkt, key, dir, now); session_transition_log(sess, curr_state, next_state, inputs); session_stat_update(mgr, sess, curr_state, next_state); tcp_pcb_update(&sess->tcp_pcb, dir, tcp_layer, now); // set closing reason if (next_state == SESSION_STATE_CLOSING && !session_get_closing_reason(sess)) { if (tcp_hdr_get_fin_flag(hdr)) { session_set_closing_reason(sess, (dir == SESSION_DIR_C2S ? CLOSING_BY_CLIENT_FIN : CLOSING_BY_SERVER_FIN)); } if (tcp_hdr_get_rst_flag(hdr)) { session_set_closing_reason(sess, (dir == SESSION_DIR_C2S ? CLOSING_BY_CLIENT_RST : CLOSING_BY_SERVER_RST)); } } uint16_t sub_state = sess->tcp_pcb.sub_state; uint64_t timeout = 0; switch (next_state) { case SESSION_STATE_OPENING: if (flags & TH_SYN) { timeout = (flags & TH_ACK) ? mgr->tcp_handshake_timeout : mgr->tcp_init_timeout; } else { timeout = mgr->tcp_data_timeout; } break; case SESSION_STATE_ACTIVE: timeout = mgr->tcp_data_timeout; break; case SESSION_STATE_CLOSING: if (flags & TH_FIN) { timeout = (sub_state & TCP_C2S_FIN_RCVD && sub_state & TCP_S2C_FIN_RCVD) ? mgr->tcp_time_wait_timeout : mgr->tcp_half_closed_timeout; } else if (flags & TH_RST) { timeout = (sub_state & TCP_C2S_RST_RCVD || sub_state & TCP_S2C_RST_RCVD) ? mgr->tcp_time_wait_timeout : mgr->tcp_unverified_rst_timeout; } else { timeout = mgr->tcp_data_timeout; } break; case SESSION_STATE_DISCARD: timeout = mgr->tcp_discard_timeout; break; default: assert(0); break; } session_timer_update(mgr->sess_timer, sess, now + timeout); return 0; } static int session_manager_update_udp_session(struct session_manager *mgr, struct session *sess, const struct packet *pkt, const struct tuple6 *key, uint64_t now) { enum session_dir dir = identify_direction_by_history(sess, key); enum session_state curr_state = session_get_state(sess); enum session_state next_state = session_transition_run(curr_state, UDP_DATA); session_update(sess, next_state, pkt, key, dir, now); session_transition_log(sess, curr_state, next_state, UDP_DATA); session_stat_update(mgr, sess, curr_state, next_state); session_timer_update(mgr->sess_timer, sess, now + mgr->udp_data_timeout); return 0; } /****************************************************************************** * Public API ******************************************************************************/ struct session_manager *session_manager_new(struct session_manager_options *opts, uint64_t now) { if (check_options(opts) == -1) { return NULL; } struct session_manager *mgr = (struct session_manager *)calloc(1, sizeof(struct session_manager)); if (mgr == NULL) { return NULL; } // max session number mgr->max_tcp_session_num = (opts->max_tcp_session_num < EVICTE_SESSION_BURST * 2) ? EVICTE_SESSION_BURST * 2 : opts->max_tcp_session_num; mgr->max_udp_session_num = (opts->max_udp_session_num < EVICTE_SESSION_BURST * 2) ? EVICTE_SESSION_BURST * 2 : opts->max_udp_session_num; // session overload mgr->stat.tcp_sess.nr_sess_init = 0; mgr->tcp_overload_evict_old_sess = opts->tcp_overload_evict_old_sess; mgr->udp_overload_evict_old_sess = opts->udp_overload_evict_old_sess; // session timeout mgr->tcp_init_timeout = opts->tcp_init_timeout; mgr->tcp_handshake_timeout = opts->tcp_handshake_timeout; mgr->tcp_data_timeout = opts->tcp_data_timeout; mgr->tcp_half_closed_timeout = opts->tcp_half_closed_timeout; mgr->tcp_time_wait_timeout = opts->tcp_time_wait_timeout; mgr->tcp_discard_timeout = opts->tcp_discard_timeout; mgr->tcp_unverified_rst_timeout = opts->tcp_unverified_rst_timeout; mgr->udp_data_timeout = opts->udp_data_timeout; // tcp reassembly mgr->tcp_reassembly_max_timeout = opts->tcp_reassembly_max_timeout; mgr->tcp_reassembly_max_segments = opts->tcp_reassembly_max_segments; // duplicated packet filter struct duplicated_packet_filter_options duplicated_packet_filter_opts = { .enable = opts->duplicated_packet_filter_enable, .capacity = opts->duplicated_packet_filter_capacity, .timeout = opts->duplicated_packet_filter_timeout, .error_rate = opts->duplicated_packet_filter_error_rate, }; // evicted session filter struct evicted_session_filter_options evicted_session_filter_opts = { .enable = opts->evicted_session_filter_enable, .capacity = opts->evicted_session_filter_capacity, .timeout = opts->evicted_session_filter_timeout, .error_rate = opts->evicted_session_filter_error_rate, }; mgr->sess_pool = session_pool_new(mgr->max_tcp_session_num + mgr->max_udp_session_num); mgr->tcp_sess_table = session_table_new(); mgr->udp_sess_table = session_table_new(); mgr->sess_timer = session_timer_new(now); mgr->dup_pkt_filter = duplicated_packet_filter_new(&duplicated_packet_filter_opts, now); mgr->evicte_sess_filter = evicted_session_filter_new(&evicted_session_filter_opts, now); if (mgr->sess_pool == NULL || mgr->tcp_sess_table == NULL || mgr->udp_sess_table == NULL || mgr->sess_timer == NULL || mgr->dup_pkt_filter == NULL || mgr->evicte_sess_filter == NULL) { goto error; } INIT_LIST_HEAD(&mgr->evicte_queue); session_filter_init(); session_transition_init(); return mgr; error: session_manager_free(mgr); return NULL; } void session_manager_free(struct session_manager *mgr) { struct session *sess; if (mgr) { // free all evicted session while (!list_empty(&mgr->evicte_queue)) { sess = list_first_entry(&mgr->evicte_queue, struct session, evicte); list_del(&sess->evicte); session_manager_free_session(mgr, sess); } // free all udp session while (mgr->udp_sess_table && (sess = session_table_find_lru(mgr->udp_sess_table))) { session_manager_free_session(mgr, sess); } // free all tcp session while (mgr->tcp_sess_table && (sess = session_table_find_lru(mgr->tcp_sess_table))) { session_manager_free_session(mgr, sess); } evicted_session_filter_free(mgr->evicte_sess_filter); duplicated_packet_filter_free(mgr->dup_pkt_filter); session_timer_free(mgr->sess_timer); session_table_free(mgr->udp_sess_table); session_table_free(mgr->tcp_sess_table); session_pool_free(mgr->sess_pool); free(mgr); mgr = NULL; } } struct session *session_manager_new_session(struct session_manager *mgr, const struct packet *pkt, uint64_t now) { struct tuple6 key; if (packet_get_innermost_tuple6(pkt, &key)) { return NULL; } if (session_filter_run(mgr, FILTER_STAGE_PRE_NEW_SESS, NULL, pkt, &key, now)) { return NULL; } switch (key.ip_proto) { case IPPROTO_TCP: return session_manager_new_tcp_session(mgr, pkt, &key, now); case IPPROTO_UDP: return session_manager_new_udp_session(mgr, pkt, &key, now); default: return NULL; } } void session_manager_free_session(struct session_manager *mgr, struct session *sess) { if (sess) { SESSION_LOG_DEBUG("session %lu closed (%s)", session_get_id(sess), closing_reason_to_str(session_get_closing_reason(sess))); session_timer_del(mgr->sess_timer, sess); switch (session_get_type(sess)) { case SESSION_TYPE_TCP: tcp_pcb_clean(&sess->tcp_pcb); session_table_del(mgr->tcp_sess_table, session_get_tuple(sess)); session_stat_dec(&mgr->stat.tcp_sess, session_get_state(sess)); mgr->stat.tcp_sess.nr_sess_used--; break; case SESSION_TYPE_UDP: session_table_del(mgr->udp_sess_table, session_get_tuple(sess)); session_stat_dec(&mgr->stat.udp_sess, session_get_state(sess)); mgr->stat.udp_sess.nr_sess_used--; break; default: assert(0); break; } session_clean_packet(sess, SESSION_PACKET_C2S_1ST); session_clean_packet(sess, SESSION_PACKET_S2C_1ST); session_clean_packet(sess, SESSION_PACKET_CURRENT); session_set_cur_dir(sess, SESSION_DIR_NONE); session_free_all_ex_data(sess); session_pool_push(mgr->sess_pool, sess); sess = NULL; } } struct session *session_manager_lookup_session(struct session_manager *mgr, const struct packet *pkt) { struct tuple6 key; if (packet_get_innermost_tuple6(pkt, &key)) { return NULL; } switch (key.ip_proto) { case IPPROTO_UDP: return session_table_find_tuple(mgr->udp_sess_table, &key); case IPPROTO_TCP: return session_table_find_tuple(mgr->tcp_sess_table, &key); default: return NULL; } } int session_manager_update_session(struct session_manager *mgr, struct session *sess, const struct packet *pkt, uint64_t now) { struct tuple6 key; if (packet_get_innermost_tuple6(pkt, &key)) { return -1; } if (session_filter_run(mgr, FILTER_STAGE_PRE_UPDATE_SESS, sess, pkt, &key, now)) { return -1; } switch (session_get_type(sess)) { case SESSION_TYPE_TCP: return session_manager_update_tcp_session(mgr, sess, pkt, &key, now); case SESSION_TYPE_UDP: return session_manager_update_udp_session(mgr, sess, pkt, &key, now); default: return -1; } } struct session *session_manager_get_expired_session(struct session_manager *mgr, uint64_t now) { struct session *sess = session_timer_expire(mgr->sess_timer, now); if (sess) { enum session_state curr_state = session_get_state(sess); enum session_state next_state = session_transition_run(curr_state, TIMEOUT); session_transition_log(sess, curr_state, next_state, TIMEOUT); session_set_state(sess, next_state); session_stat_update(mgr, sess, curr_state, next_state); if (next_state == SESSION_STATE_CLOSED) { // need free session if (!session_get_closing_reason(sess)) { session_set_closing_reason(sess, CLOSING_BY_TIMEOUT); } return sess; } else { // in closing state, only update timeout uint64_t timeout = 0; switch (session_get_type(sess)) { case SESSION_TYPE_TCP: timeout = mgr->tcp_data_timeout; break; case SESSION_TYPE_UDP: timeout = mgr->udp_data_timeout; break; default: assert(0); break; } session_timer_update(mgr->sess_timer, sess, now + timeout); return NULL; } } return NULL; } struct session *session_manager_get_evicted_session(struct session_manager *mgr) { if (list_empty(&mgr->evicte_queue)) { return NULL; } else { struct session *sess = list_first_entry(&mgr->evicte_queue, struct session, evicte); list_del(&sess->evicte); return sess; } } uint64_t session_manager_get_expire_interval(struct session_manager *mgr) { return session_timer_next_expire_interval(mgr->sess_timer); } void session_manager_print_stat(struct session_manager *mgr) { if (mgr) { SESSION_LOG_DEBUG("session_manager_stat->tcp_sess_num: used: %u, init: %u, opening: %u, active: %u, closing: %u, closed: %u, evic_new: %u, evic_old: %u", mgr->stat.tcp_sess.nr_sess_used, mgr->stat.tcp_sess.nr_sess_init, mgr->stat.tcp_sess.nr_sess_opening, mgr->stat.tcp_sess.nr_sess_active, mgr->stat.tcp_sess.nr_sess_closing, mgr->stat.tcp_sess.nr_sess_closed, mgr->stat.tcp_sess.nr_new_sess_evicted, mgr->stat.tcp_sess.nr_old_sess_evicted); SESSION_LOG_DEBUG("session_manager_stat->udp_sess_num: used: %u, init: %u, opening: %u, active: %u, closing: %u, closed: %u, evic_new: %u, evic_old: %u", mgr->stat.udp_sess.nr_sess_used, mgr->stat.udp_sess.nr_sess_init, mgr->stat.udp_sess.nr_sess_opening, mgr->stat.udp_sess.nr_sess_active, mgr->stat.udp_sess.nr_sess_closing, mgr->stat.udp_sess.nr_sess_closed, mgr->stat.udp_sess.nr_new_sess_evicted, mgr->stat.udp_sess.nr_old_sess_evicted); SESSION_LOG_DEBUG("session_manager_stat: dup_pkts: %u, dup_bytes: %u, evic_pkts: %u, evic_bytes: %u", mgr->stat.dup_pkt.nr_pkts, mgr->stat.dup_pkt.nr_bytes, mgr->stat.evc_pkt.nr_pkts, mgr->stat.evc_pkt.nr_bytes); } } struct session_manager_stat *session_manager_get_stat(struct session_manager *mgr) { return &mgr->stat; }