#include #include #include "timestamp.h" #include "session_private.h" #include "session_pool.h" #include "session_table.h" #include "session_timer.h" #include "session_queue.h" #include "session_manager.h" #include "tcp_helpers.h" #include "udp_helpers.h" #include "packet_helpers.h" #include "dupkt_filter.h" #include "eviction_filter.h" #include "id_generator.h" struct session_manager { struct session_pool *sess_pool; struct session_table *tcp_sess_table; struct session_table *udp_sess_table; struct session_timer *sess_timer; struct session_queue *sess_evicted_queue; struct session_queue *sess_toclosed_queue; struct dupkt_filter *tcp_dupkt_filter; struct eviction_filter *udp_eviction_filter; struct session_manager_config config; /*************************************************************** * session manager status ***************************************************************/ // session number uint64_t tcp_sess_num; uint64_t tcp_opening_sess_num; uint64_t tcp_active_sess_num; uint64_t tcp_closing_sess_num; uint64_t udp_sess_num; uint64_t udp_opening_sess_num; uint64_t udp_active_sess_num; uint64_t udp_closing_sess_num; uint64_t tcp_overload_evict_old_sess_num; uint64_t tcp_overload_evict_new_sess_num; uint64_t udp_overload_evict_old_sess_num; uint64_t udp_overload_evict_new_sess_num; // packet filter status uint64_t npkts_miss_l4_proto; // fast forward uint64_t npkts_hit_tcp_miss_sess; // fast forward uint64_t npkts_hit_tcp_dupkt; // fast forward uint64_t npkts_hit_tcp_discard; // drop uint64_t npkts_hit_udp_evicted; // fast forward }; static inline void tcp_init_timeout_cb(struct session *sess, void *arg); static inline void tcp_handshake_timeout_cb(struct session *sess, void *arg); static inline void tcp_data_timeout_cb(struct session *sess, void *arg); static inline void tcp_half_closed_timeout_cb(struct session *sess, void *arg); static inline void tcp_time_wait_timeout_cb(struct session *sess, void *arg); static inline void udp_data_timeout_cb(struct session *sess, void *arg); static inline int session_manager_check_config(struct session_manager_config *config); static inline uint64_t session_manager_alloc_session_id(void); static inline int session_manager_update_tcp_filter(struct session_manager *mgr, struct session *sess, const struct packet *pkt, enum session_dir curr_dir); static inline enum session_dir judge_direction_by_tuple6(const struct tuple6 *key); static inline enum session_dir judge_direction_by_session(const struct session *sess, const struct tuple6 *key); static inline void session_update_tcp_state(struct session *sess, const struct layer_record *tcp_layer, enum session_dir curr_dir); static inline void session_update_udp_state(struct session *sess, const struct layer_record *udp_layer, enum session_dir curr_dir); static inline void session_manager_update_session_state(struct session_manager *mgr, struct session *sess, enum session_state state); static inline void session_manager_update_session_timer(struct session_manager *mgr, struct session *sess, session_expire_cb expire_cb, uint64_t timeout_sec); static inline void session_manager_update_session_base(struct session_manager *mgr, struct session *sess, const struct tuple6 *key, enum session_dir curr_dir); static inline void session_manager_update_session_packet(struct session_manager *mgr, struct session *sess, const struct packet *pkt, enum session_dir curr_dir); static inline void session_manager_update_udp_to_opening(struct session_manager *mgr, struct session *sess); static inline void session_manager_update_udp_to_active(struct session_manager *mgr, struct session *sess); static inline void session_manager_update_udp_to_closing(struct session_manager *mgr, struct session *sess); static inline void session_manager_update_tcp_to_opening(struct session_manager *mgr, struct session *sess, int opening_by_syn); static inline void session_manager_update_tcp_to_active(struct session_manager *mgr, struct session *sess); static inline void session_manager_update_tcp_to_closing(struct session_manager *mgr, struct session *sess, int enable_time_wait); static inline void session_manager_update_session_to_closed(struct session_manager *mgr, struct session *sess); static inline void session_manager_handle_tcp_on_opening(struct session_manager *mgr, struct session *sess, enum tcp_state tcp_old_state, enum tcp_state tcp_curr_state); static inline void session_manager_handle_tcp_on_active(struct session_manager *mgr, struct session *sess, enum tcp_state tcp_old_state, enum tcp_state tcp_curr_state); static inline void session_manager_handle_tcp_on_closing(struct session_manager *mgr, struct session *sess, enum tcp_state tcp_old_state, enum tcp_state tcp_curr_state); static inline struct session *session_manager_new_tcp_session(struct session_manager *mgr, const struct packet *pkt, const struct tuple6 *key); static inline struct session *session_manager_new_udp_session(struct session_manager *mgr, const struct packet *pkt, const struct tuple6 *key); static inline struct session *session_manager_update_tcp_session(struct session_manager *mgr, struct session *sess, const struct packet *pkt, const struct tuple6 *key); static inline struct session *session_manager_update_udp_session(struct session_manager *mgr, struct session *sess, const struct packet *pkt, const struct tuple6 *key); static inline void session_manager_free_session(struct session_manager *mgr, struct session *sess); static inline void session_manager_recycle_session(struct session_manager *mgr); static inline void session_manager_evicte_session(struct session_manager *mgr, struct session *sess); /****************************************************************************** * Private API ******************************************************************************/ static inline void tcp_init_timeout_cb(struct session *sess, void *arg) { SESSION_LOG_DEBUG("run tcp_init_timeout_cb on session %lu", session_get_id(sess)); struct session_manager *mgr = (struct session_manager *)arg; assert(mgr != NULL); if (session_get_closing_reasion(sess) == 0) { session_set_closing_reasion(sess, CLOSING_BY_TIMEOUT); } session_manager_update_tcp_to_closing(mgr, sess, 1); } static inline void tcp_handshake_timeout_cb(struct session *sess, void *arg) { SESSION_LOG_DEBUG("run tcp_handshake_timeout_cb on session %lu", session_get_id(sess)); struct session_manager *mgr = (struct session_manager *)arg; assert(mgr != NULL); if (session_get_closing_reasion(sess) == 0) { session_set_closing_reasion(sess, CLOSING_BY_TIMEOUT); } session_manager_update_tcp_to_closing(mgr, sess, 1); } static inline void tcp_data_timeout_cb(struct session *sess, void *arg) { SESSION_LOG_DEBUG("run tcp_data_timeout_cb on session %lu", session_get_id(sess)); struct session_manager *mgr = (struct session_manager *)arg; assert(mgr != NULL); if (session_get_closing_reasion(sess) == 0) { session_set_closing_reasion(sess, CLOSING_BY_TIMEOUT); } session_manager_update_tcp_to_closing(mgr, sess, 1); } static inline void tcp_half_closed_timeout_cb(struct session *sess, void *arg) { SESSION_LOG_DEBUG("run tcp_half_closed_timeout_cb on session %lu", session_get_id(sess)); struct session_manager *mgr = (struct session_manager *)arg; assert(mgr != NULL); if (session_get_closing_reasion(sess) == 0) { session_set_closing_reasion(sess, CLOSING_BY_TIMEOUT); } session_manager_update_tcp_to_closing(mgr, sess, 1); } static inline void tcp_time_wait_timeout_cb(struct session *sess, void *arg) { SESSION_LOG_DEBUG("run tcp_time_wait_timeout_cb on session %lu", session_get_id(sess)); struct session_manager *mgr = (struct session_manager *)arg; assert(mgr != NULL); session_manager_free_session(mgr, sess); } static inline void udp_data_timeout_cb(struct session *sess, void *arg) { SESSION_LOG_DEBUG("run udp_data_timeout_cb on session %lu", session_get_id(sess)); struct session_manager *mgr = (struct session_manager *)arg; assert(mgr != NULL); if (session_get_closing_reasion(sess) == 0) { session_set_closing_reasion(sess, CLOSING_BY_TIMEOUT); } session_manager_update_udp_to_closing(mgr, sess); } // return 0: success // return -1: invalid config static inline int session_manager_check_config(struct session_manager_config *config) { if (config == NULL) { SESSION_LOG_ERROR("invalid config"); return -1; } // max session number if (config->max_tcp_session_num < 2) { SESSION_LOG_ERROR("invalid max tcp session number"); return -1; } if (config->max_udp_session_num < 2) { SESSION_LOG_ERROR("invalid max udp session number"); return -1; } // session overload if (config->tcp_overload_evict_old_sess != 0 && config->tcp_overload_evict_old_sess != 1) { SESSION_LOG_ERROR("invalid tcp overload evict old session, support range: 0-1"); return -1; } if (config->udp_overload_evict_old_sess != 0 && config->udp_overload_evict_old_sess != 1) { SESSION_LOG_ERROR("invalid udp overload evict old session, support range: 0-1"); return -1; } // TCP timeout config if (config->tcp_timeout_init < 1 || config->tcp_timeout_init > 60) { SESSION_LOG_ERROR("invalid tcp timeout init, support range: 1-60"); return -1; } if (config->tcp_timeout_handshake < 1 || config->tcp_timeout_handshake > 60) { SESSION_LOG_ERROR("invalid tcp timeout handshake, support range: 1-60"); return -1; } if (config->tcp_timeout_data < 1 || config->tcp_timeout_data > 15999999) { SESSION_LOG_ERROR("invalid tcp timeout data, support range: 1-15,999,999"); return -1; } if (config->tcp_timeout_half_closed < 1 || config->tcp_timeout_half_closed > 604800) { SESSION_LOG_ERROR("invalid tcp timeout half closed, support range: 1-604,800"); return -1; } if (config->tcp_timeout_time_wait < 1 || config->tcp_timeout_time_wait > 600) { SESSION_LOG_ERROR("invalid tcp timeout time wait, support range: 1-600"); return -1; } if (config->tcp_timeout_discard < 1 || config->tcp_timeout_discard > 15999999) { SESSION_LOG_ERROR("invalid tcp timeout discard, support range: 1-15,999,999"); return -1; } // UDP timeout config if (config->udp_timeout_data < 1 || config->udp_timeout_data > 15999999) { SESSION_LOG_ERROR("invalid udp timeout data, support range: 1-15,999,999"); return -1; } // TCP duplicate packet filter config if (config->tcp_dupkt_filter_enable != 0 && config->tcp_dupkt_filter_enable != 1) { SESSION_LOG_ERROR("invalid tcp dupkt filter enable, support range: 0-1"); return -1; } if (config->tcp_dupkt_filter_enable) { if (config->tcp_dupkt_filter_capacity == 0) { SESSION_LOG_ERROR("invalid tcp dupkt filter capacity"); return -1; } if (config->tcp_dupkt_filter_timeout < 1 || config->tcp_dupkt_filter_timeout > 60) { SESSION_LOG_ERROR("invalid tcp dupkt filter timeout, support range: 1-60"); return -1; } if (config->tcp_dupkt_filter_error_rate < 0 || config->tcp_dupkt_filter_error_rate > 1) { SESSION_LOG_ERROR("invalid tcp dupkt filter error rate, support range: 0-1"); return -1; } } // UDP eviction filter config if (config->udp_eviction_filter_enable != 0 && config->udp_eviction_filter_enable != 1) { SESSION_LOG_ERROR("invalid udp eviction filter enable, support range: 0-1"); return -1; } if (config->udp_eviction_filter_enable) { if (config->udp_eviction_filter_capacity == 0) { SESSION_LOG_ERROR("invalid udp eviction filter capacity"); return -1; } if (config->udp_eviction_filter_timeout < 1 || config->udp_eviction_filter_timeout > 60) { SESSION_LOG_ERROR("invalid udp eviction filter timeout, support range: 1-60"); return -1; } if (config->udp_eviction_filter_error_rate < 0 || config->udp_eviction_filter_error_rate > 1) { SESSION_LOG_ERROR("invalid udp eviction filter error rate, support range: 0-1"); return -1; } } return 0; } static inline uint64_t session_manager_alloc_session_id(void) { return id_generator_get(); } // return 1: duplicate packet // return 0: not duplicate packet static inline int session_manager_update_tcp_filter(struct session_manager *mgr, struct session *sess, const struct packet *pkt, enum session_dir curr_dir) { if (curr_dir == SESSION_DIR_C2S) { if (session_get_c2s_packets(sess) < 3) { goto dupkt_fitler; } } else if (curr_dir == SESSION_DIR_S2C) { if (session_get_s2c_packets(sess) < 3) { goto dupkt_fitler; } } if (session_get_dup_traffic_flag(sess) == DUP_TRAFFIC_YES) { goto dupkt_fitler; } else { return 0; } dupkt_fitler: if (dupkt_filter_lookup(mgr->tcp_dupkt_filter, pkt)) { return 1; } else { dupkt_filter_add(mgr->tcp_dupkt_filter, pkt); return 0; } } static inline enum session_dir judge_direction_by_tuple6(const struct tuple6 *key) { uint16_t src_port = ntohs(key->src_port); uint16_t dst_port = ntohs(key->dst_port); // big port is client if (src_port > dst_port) { return SESSION_DIR_C2S; } else if (src_port < dst_port) { return SESSION_DIR_S2C; } else { // if port is equal, first packet is C2S return SESSION_DIR_C2S; } } static inline enum session_dir judge_direction_by_session(const struct session *sess, const struct tuple6 *key) { if (tuple6_cmp(session_get0_key(sess), key) == 0) { return session_get_key_dir(sess); } else { return (session_get_key_dir(sess) == SESSION_DIR_C2S ? SESSION_DIR_S2C : SESSION_DIR_C2S); } } static inline void session_update_tcp_state(struct session *sess, const struct layer_record *tcp_layer, enum session_dir curr_dir) { const struct tcphdr *hdr = (const struct tcphdr *)tcp_layer->hdr_ptr; uint64_t state = session_get_tcp_state(sess); if (tcp_hdr_has_flag_syn(hdr)) { state |= (tcp_hdr_has_flag_ack(hdr) ? TCP_SYNACK_RECVED : TCP_SYN_RECVED); } else { if (tcp_hdr_has_flag_ack(hdr)) { if (curr_dir == SESSION_DIR_C2S) { state |= TCP_C2S_ACK_RECVED; } else if (curr_dir == SESSION_DIR_S2C) { state |= TCP_S2C_ACK_RECVED; } } } if (tcp_hdr_has_flag_fin(hdr)) { if (curr_dir == SESSION_DIR_C2S) { state |= TCP_C2S_FIN_RECVED; } else if (curr_dir == SESSION_DIR_S2C) { state |= TCP_S2C_FIN_RECVED; } } if (tcp_hdr_has_flag_rst(hdr)) { if (curr_dir == SESSION_DIR_C2S) { state |= TCP_C2S_RST_RECVED; } else if (curr_dir == SESSION_DIR_S2C) { state |= TCP_S2C_RST_RECVED; } } if (tcp_layer->pld_len > 0) { if (curr_dir == SESSION_DIR_C2S) { state |= TCP_C2S_DATA_RECVED; } else if (curr_dir == SESSION_DIR_S2C) { state |= TCP_S2C_DATA_RECVED; } } session_set_tcp_state(sess, (enum tcp_state)state); } static inline void session_update_udp_state(struct session *sess, const struct layer_record *udp_layer, enum session_dir curr_dir) { uint64_t state = session_get_udp_state(sess); if (curr_dir == SESSION_DIR_C2S) { state |= UDP_C2S_RECVED; } else if (curr_dir == SESSION_DIR_S2C) { state |= UDP_S2C_RECVED; } session_set_udp_state(sess, (enum udp_state)state); } /* on opening update session [*] session_init [*] session_set_id [*] session_set_key [*] session_set_key_dir [*] session_set_type [*] session_set_create_time [*] session_set_state on packet update session [*] session_inc_c2s_metrics [*] session_inc_s2c_metrics [*] session_set_c2s_1st_pkt [*] session_set_s2c_1st_pkt [*] session_set_c2s_1st_pkt_md [*] session_set_s2c_1st_pkt_md [*] session_set0_cur_pkt [*] session_set_cur_dir [*] session_set_last_time session_set_state session_set_dup_traffic_flag on closing update session [*] session_set_state [*] session_set_closing_reasion */ static inline void session_manager_update_session_state(struct session_manager *mgr, struct session *sess, enum session_state state) { // session state not change if (session_get_state(sess) == state) { return; } enum session_type type = session_get_type(sess); if (type == SESSION_TYPE_TCP) { // handle old state switch (session_get_state(sess)) { case SESSION_STATE_OPENING: mgr->tcp_opening_sess_num--; break; case SESSION_STATE_ACTIVE: mgr->tcp_active_sess_num--; break; case SESSION_STATE_CLOSING: mgr->tcp_closing_sess_num--; break; case SESSION_STATE_CLOSED: /* void */ break; default: break; } // handle new state switch (state) { case SESSION_STATE_OPENING: mgr->tcp_opening_sess_num++; mgr->tcp_sess_num++; break; case SESSION_STATE_ACTIVE: mgr->tcp_active_sess_num++; break; case SESSION_STATE_CLOSING: mgr->tcp_closing_sess_num++; break; case SESSION_STATE_CLOSED: mgr->tcp_sess_num--; break; default: break; } } else if (type == SESSION_TYPE_UDP) { // handle old state switch (session_get_state(sess)) { case SESSION_STATE_OPENING: mgr->udp_opening_sess_num--; break; case SESSION_STATE_ACTIVE: mgr->udp_active_sess_num--; break; case SESSION_STATE_CLOSING: mgr->udp_closing_sess_num--; break; case SESSION_STATE_CLOSED: /* void */ break; default: break; } // handle new state switch (state) { case SESSION_STATE_OPENING: mgr->udp_opening_sess_num++; mgr->udp_sess_num++; break; case SESSION_STATE_ACTIVE: mgr->udp_active_sess_num++; break; case SESSION_STATE_CLOSING: mgr->udp_closing_sess_num++; break; case SESSION_STATE_CLOSED: mgr->udp_sess_num--; break; default: break; } } session_set_state(sess, state); } static inline void session_manager_update_session_timer(struct session_manager *mgr, struct session *sess, session_expire_cb expire_cb, uint64_t timeout_sec) { session_timer_del_session(mgr->sess_timer, sess); session_set_expirecb(sess, expire_cb, mgr, timestamp_get_sec() + timeout_sec); session_timer_add_session(mgr->sess_timer, sess); } static inline void session_manager_update_session_base(struct session_manager *mgr, struct session *sess, const struct tuple6 *key, enum session_dir curr_dir) { session_init(sess); session_set_id(sess, session_manager_alloc_session_id()); session_set_key(sess, key); session_set_key_dir(sess, curr_dir); if (key->ip_proto == IPPROTO_UDP) { session_set_type(sess, SESSION_TYPE_UDP); } else if (key->ip_proto == IPPROTO_TCP) { session_set_type(sess, SESSION_TYPE_TCP); } session_set_create_time(sess, timestamp_get_sec()); } static inline void session_manager_update_session_packet(struct session_manager *mgr, struct session *sess, const struct packet *pkt, enum session_dir curr_dir) { uint64_t len = packet_get_raw_len(pkt); if (curr_dir == SESSION_DIR_C2S) { session_inc_c2s_metrics(sess, 1, len); if (session_get0_c2s_1st_pkt(sess) == NULL) { session_set_c2s_1st_pkt(sess, pkt); } } else if (curr_dir == SESSION_DIR_S2C) { session_inc_s2c_metrics(sess, 1, len); if (session_get0_s2c_1st_pkt(sess) == NULL) { session_set_s2c_1st_pkt(sess, pkt); } } session_set0_cur_pkt(sess, pkt); session_set_cur_dir(sess, curr_dir); session_set_last_time(sess, timestamp_get_sec()); } static inline void session_manager_update_udp_to_opening(struct session_manager *mgr, struct session *sess) { session_manager_update_session_state(mgr, sess, SESSION_STATE_OPENING); session_manager_update_session_timer(mgr, sess, udp_data_timeout_cb, mgr->config.udp_timeout_data); } static inline void session_manager_update_udp_to_active(struct session_manager *mgr, struct session *sess) { session_manager_update_session_state(mgr, sess, SESSION_STATE_ACTIVE); session_manager_update_session_timer(mgr, sess, udp_data_timeout_cb, mgr->config.udp_timeout_data); } static inline void session_manager_update_udp_to_closing(struct session_manager *mgr, struct session *sess) { session_manager_update_session_state(mgr, sess, SESSION_STATE_CLOSING); session_timer_del_session(mgr->sess_timer, sess); session_queue_push(mgr->sess_toclosed_queue, sess); eviction_filter_add(mgr->udp_eviction_filter, session_get0_1st_pkt(sess)); } static inline void session_manager_update_tcp_to_opening(struct session_manager *mgr, struct session *sess, int opening_by_syn) { session_manager_update_session_state(mgr, sess, SESSION_STATE_OPENING); if (opening_by_syn) { session_manager_update_session_timer(mgr, sess, tcp_init_timeout_cb, mgr->config.tcp_timeout_init); } else { session_manager_update_session_timer(mgr, sess, tcp_handshake_timeout_cb, mgr->config.tcp_timeout_handshake); } } static inline void session_manager_update_tcp_to_active(struct session_manager *mgr, struct session *sess) { session_manager_update_session_state(mgr, sess, SESSION_STATE_ACTIVE); session_manager_update_session_timer(mgr, sess, tcp_data_timeout_cb, mgr->config.tcp_timeout_data); } static inline void session_manager_update_tcp_to_closing(struct session_manager *mgr, struct session *sess, int enable_time_wait) { session_manager_update_session_state(mgr, sess, SESSION_STATE_CLOSING); if (enable_time_wait) { session_manager_update_session_timer(mgr, sess, tcp_time_wait_timeout_cb, mgr->config.tcp_timeout_time_wait); } else { session_timer_del_session(mgr->sess_timer, sess); } } static inline void session_manager_update_session_to_closed(struct session_manager *mgr, struct session *sess) { session_manager_update_session_state(mgr, sess, SESSION_STATE_CLOSED); session_timer_del_session(mgr->sess_timer, sess); } // opening -> opening // opening -> active // opening -> closing static inline void session_manager_handle_tcp_on_opening(struct session_manager *mgr, struct session *sess, enum tcp_state tcp_old_state, enum tcp_state tcp_curr_state) { uint64_t tcp_mod_state = tcp_curr_state & (~tcp_old_state); // opening -> closing if ((tcp_curr_state & TCP_C2S_FIN_RECVED) && (tcp_curr_state & TCP_S2C_FIN_RECVED)) { SESSION_LOG_DEBUG("TCP FIN-FIN received, session %lu opening -> closing", session_get_id(sess)); session_manager_update_tcp_to_closing(mgr, sess, 1); return; } if (tcp_mod_state & (TCP_C2S_RST_RECVED | TCP_S2C_RST_RECVED)) { SESSION_LOG_DEBUG("TCP %s RST received, session %lu opening -> closing", (tcp_curr_state & TCP_C2S_RST_RECVED ? "C2S" : "S2C"), session_get_id(sess)); session_set_closing_reasion(sess, (tcp_mod_state & TCP_C2S_RST_RECVED) ? CLOSING_BY_CLIENT_RST : CLOSING_BY_SERVER_RST); session_manager_update_tcp_to_closing(mgr, sess, 1); return; } // opening -> active if (tcp_mod_state & (TCP_C2S_DATA_RECVED | TCP_S2C_DATA_RECVED)) { SESSION_LOG_DEBUG("TCP %s DATA received, session %lu opening -> active", (tcp_curr_state & TCP_C2S_DATA_RECVED ? "C2S" : "S2C"), session_get_id(sess)); session_manager_update_tcp_to_active(mgr, sess); return; } // opening -> opening if (tcp_mod_state & (TCP_C2S_FIN_RECVED | TCP_S2C_FIN_RECVED)) { SESSION_LOG_DEBUG("TCP %s FIN received, session %lu opening -> opening", (tcp_curr_state & TCP_C2S_FIN_RECVED ? "C2S" : "S2C"), session_get_id(sess)); // still opening, only update timeout session_set_closing_reasion(sess, (tcp_mod_state & TCP_C2S_FIN_RECVED) ? CLOSING_BY_CLIENT_FIN : CLOSING_BY_SERVER_FIN); session_manager_update_session_timer(mgr, sess, tcp_half_closed_timeout_cb, mgr->config.tcp_timeout_half_closed); return; } if (tcp_mod_state & (TCP_C2S_ACK_RECVED | TCP_S2C_ACK_RECVED)) { SESSION_LOG_DEBUG("TCP %s ACK received, session %lu opening -> opening", (tcp_curr_state & TCP_C2S_ACK_RECVED ? "C2S" : "S2C"), session_get_id(sess)); // still opening, only update timeout session_manager_update_session_timer(mgr, sess, tcp_data_timeout_cb, mgr->config.tcp_timeout_data); return; } if (tcp_mod_state & TCP_SYNACK_RECVED) { SESSION_LOG_DEBUG("TCP SYNACK received, session %lu opening -> opening", session_get_id(sess)); session_manager_update_tcp_to_opening(mgr, sess, 0); } if (tcp_mod_state == 0) { if (tcp_curr_state & TCP_SYN_RECVED) { SESSION_LOG_DEBUG("TCP SYN retransmission received, session %lu opening -> opening", session_get_id(sess)); session_manager_update_tcp_to_opening(mgr, sess, 1); } if (tcp_curr_state & TCP_SYNACK_RECVED) { SESSION_LOG_DEBUG("TCP SYNACK retransmission received, session %lu opening -> opening", session_get_id(sess)); session_manager_update_tcp_to_opening(mgr, sess, 0); } } } // active -> active // active -> closing static inline void session_manager_handle_tcp_on_active(struct session_manager *mgr, struct session *sess, enum tcp_state tcp_old_state, enum tcp_state tcp_curr_state) { // active -> closing if ((tcp_curr_state & TCP_C2S_FIN_RECVED) && (tcp_curr_state & TCP_S2C_FIN_RECVED)) { SESSION_LOG_DEBUG("TCP FIN-FIN received, session %lu active -> closing", session_get_id(sess)); session_manager_update_tcp_to_closing(mgr, sess, 1); return; } if (tcp_curr_state & (TCP_C2S_RST_RECVED | TCP_S2C_RST_RECVED)) { SESSION_LOG_DEBUG("TCP %s RST received, session %lu active -> closing", (tcp_curr_state & TCP_C2S_RST_RECVED) ? "C2S" : "S2C", session_get_id(sess)); session_set_closing_reasion(sess, (tcp_curr_state & TCP_C2S_RST_RECVED) ? CLOSING_BY_CLIENT_RST : CLOSING_BY_SERVER_RST); session_manager_update_tcp_to_closing(mgr, sess, 1); return; } // active -> active if (tcp_curr_state & (TCP_C2S_FIN_RECVED | TCP_S2C_FIN_RECVED)) { SESSION_LOG_DEBUG("TCP %s FIN received, session %lu active -> active", (tcp_curr_state & TCP_C2S_FIN_RECVED) ? "C2S" : "S2C", session_get_id(sess)); // still active session_set_closing_reasion(sess, (tcp_curr_state & TCP_C2S_FIN_RECVED) ? CLOSING_BY_CLIENT_FIN : CLOSING_BY_SERVER_FIN); session_manager_update_session_timer(mgr, sess, tcp_half_closed_timeout_cb, mgr->config.tcp_timeout_half_closed); return; } // still active session_manager_update_tcp_to_active(mgr, sess); } // closing -> closing // closing -> closed static inline void session_manager_handle_tcp_on_closing(struct session_manager *mgr, struct session *sess, enum tcp_state tcp_old_state, enum tcp_state tcp_curr_state) { // still closing session_manager_update_tcp_to_closing(mgr, sess, 1); } static inline struct session *session_manager_new_tcp_session(struct session_manager *mgr, const struct packet *pkt, const struct tuple6 *key) { const struct layer_record *tcp_layer = packet_get_innermost_layer(pkt, LAYER_TYPE_TCP); if (tcp_layer == NULL) { mgr->npkts_miss_l4_proto++; return NULL; } const struct tcphdr *hdr = (const struct tcphdr *)tcp_layer->hdr_ptr; if (!tcp_hdr_has_flag_syn(hdr)) { mgr->npkts_hit_tcp_miss_sess++; return NULL; } if (mgr->tcp_sess_num >= mgr->config.max_tcp_session_num - 1) { if (mgr->config.tcp_overload_evict_old_sess) { struct session *evicted_sess = session_table_find_least_recently_unused_session(mgr->tcp_sess_table); assert(evicted_sess); session_manager_evicte_session(mgr, evicted_sess); mgr->tcp_overload_evict_old_sess_num++; } else { if (mgr->tcp_sess_num >= mgr->config.max_tcp_session_num) { mgr->tcp_overload_evict_new_sess_num++; return NULL; } else { // continue; } } } struct session *sess = session_pool_alloc(mgr->sess_pool); assert(sess); enum session_dir curr_dir = tcp_hdr_has_flag_ack(hdr) ? SESSION_DIR_S2C : SESSION_DIR_C2S; session_manager_update_session_base(mgr, sess, key, curr_dir); session_manager_update_session_packet(mgr, sess, pkt, curr_dir); session_update_tcp_state(sess, tcp_layer, curr_dir); char buffer[128] = {0}; tuple6_tostring(session_get0_key(sess), buffer, sizeof(buffer)); SESSION_LOG_DEBUG("session new: %lu %s", session_get_id(sess), buffer); SESSION_LOG_DEBUG("TCP %s received, session %lu init -> opening", (curr_dir == SESSION_DIR_C2S ? "SYN" : "SYNACK"), session_get_id(sess)); session_manager_update_tcp_to_opening(mgr, sess, curr_dir == SESSION_DIR_C2S); session_table_add_session(mgr->tcp_sess_table, key, sess); dupkt_filter_add(mgr->tcp_dupkt_filter, pkt); return sess; } static inline struct session *session_manager_new_udp_session(struct session_manager *mgr, const struct packet *pkt, const struct tuple6 *key) { if (eviction_filter_lookup(mgr->udp_eviction_filter, pkt)) { mgr->npkts_hit_udp_evicted++; return NULL; } if (mgr->udp_sess_num >= mgr->config.max_udp_session_num - 1) { if (mgr->config.udp_overload_evict_old_sess) { struct session *evicted_sess = session_table_find_least_recently_unused_session(mgr->udp_sess_table); assert(evicted_sess); session_manager_evicte_session(mgr, evicted_sess); mgr->udp_overload_evict_old_sess_num++; } else { if (mgr->udp_sess_num >= mgr->config.max_udp_session_num) { mgr->udp_overload_evict_new_sess_num++; return NULL; } else { // continue; } } } struct session *sess = session_pool_alloc(mgr->sess_pool); assert(sess); enum session_dir curr_dir = judge_direction_by_tuple6(key); session_manager_update_session_base(mgr, sess, key, curr_dir); session_manager_update_session_packet(mgr, sess, pkt, curr_dir); session_update_udp_state(sess, NULL, curr_dir); session_manager_update_udp_to_opening(mgr, sess); session_table_add_session(mgr->udp_sess_table, key, sess); char buffer[128] = {0}; tuple6_tostring(session_get0_key(sess), buffer, sizeof(buffer)); SESSION_LOG_DEBUG("session new: %lu %s", session_get_id(sess), buffer); SESSION_LOG_DEBUG("UDP %s first packet received, session %lu init -> opening", (curr_dir == SESSION_DIR_C2S ? "C2S" : "S2C"), session_get_id(sess)); return sess; } static inline struct session *session_manager_update_tcp_session(struct session_manager *mgr, struct session *sess, const struct packet *pkt, const struct tuple6 *key) { const struct layer_record *tcp_layer = packet_get_innermost_layer(pkt, LAYER_TYPE_TCP); if (tcp_layer == NULL) { mgr->npkts_miss_l4_proto++; return NULL; } enum session_dir curr_dir = judge_direction_by_session(sess, key); if (session_manager_update_tcp_filter(mgr, sess, pkt, curr_dir)) { mgr->npkts_hit_tcp_dupkt++; session_set_dup_traffic_flag(sess, DUP_TRAFFIC_YES); return NULL; } enum session_state sess_state = session_get_state(sess); enum tcp_state tcp_old_state = session_get_tcp_state(sess); session_manager_update_session_packet(mgr, sess, pkt, curr_dir); session_update_tcp_state(sess, tcp_layer, curr_dir); enum tcp_state tcp_curr_state = session_get_tcp_state(sess); switch (sess_state) { case SESSION_STATE_OPENING: session_manager_handle_tcp_on_opening(mgr, sess, tcp_old_state, tcp_curr_state); break; case SESSION_STATE_ACTIVE: session_manager_handle_tcp_on_active(mgr, sess, tcp_old_state, tcp_curr_state); break; case SESSION_STATE_CLOSING: session_manager_handle_tcp_on_closing(mgr, sess, tcp_old_state, tcp_curr_state); break; case SESSION_STATE_CLOSED: assert(0); break; default: break; } return sess; } static inline struct session *session_manager_update_udp_session(struct session_manager *mgr, struct session *sess, const struct packet *pkt, const struct tuple6 *key) { enum session_dir curr_dir = judge_direction_by_session(sess, key); session_manager_update_session_packet(mgr, sess, pkt, curr_dir); session_update_udp_state(sess, NULL, curr_dir); enum session_state sess_state = session_get_state(sess); switch (sess_state) { case SESSION_STATE_OPENING: session_manager_update_udp_to_active(mgr, sess); break; case SESSION_STATE_ACTIVE: session_manager_update_udp_to_active(mgr, sess); return sess; case SESSION_STATE_CLOSING: assert(0); break; case SESSION_STATE_CLOSED: assert(0); break; } return sess; } static inline void session_manager_free_session(struct session_manager *mgr, struct session *sess) { if (sess) { SESSION_LOG_DEBUG("%s, session %lu closing -> closed", session_closing_reasion_tostring(session_get_closing_reasion(sess)), session_get_id(sess)); session_manager_update_session_to_closed(mgr, sess); if (session_get_type(sess) == SESSION_TYPE_TCP) { session_table_del_session(mgr->tcp_sess_table, session_get0_key(sess)); } if (session_get_type(sess) == SESSION_TYPE_UDP) { session_table_del_session(mgr->udp_sess_table, session_get0_key(sess)); } session_set0_cur_pkt(sess, NULL); session_set_cur_dir(sess, SESSION_DIR_NONE); session_free(sess); session_pool_free(mgr->sess_pool, sess); sess = NULL; } } static inline void session_manager_recycle_session(struct session_manager *mgr) { while (1) { struct session *sess = session_queue_pop(mgr->sess_toclosed_queue); if (sess == NULL) { break; } session_manager_free_session(mgr, sess); } } static inline void session_manager_evicte_session(struct session_manager *mgr, struct session *sess) { session_set_closing_reasion(sess, CLOSING_BY_EVICTED); session_manager_update_session_state(mgr, sess, SESSION_STATE_CLOSING); session_queue_push(mgr->sess_evicted_queue, sess); session_timer_del_session(mgr->sess_timer, sess); if (session_get_type(sess) == SESSION_TYPE_UDP) { eviction_filter_add(mgr->udp_eviction_filter, session_get0_1st_pkt(sess)); } } /****************************************************************************** * Public API ******************************************************************************/ struct session_manager *session_manager_create(struct session_manager_config *config) { if (session_manager_check_config(config)) { return NULL; } struct session_manager *mgr = (struct session_manager *)calloc(1, sizeof(struct session_manager)); if (mgr == NULL) { return NULL; } memcpy(&mgr->config, config, sizeof(struct session_manager_config)); mgr->sess_pool = session_pool_create(mgr->config.max_tcp_session_num + mgr->config.max_udp_session_num); if (mgr->sess_pool == NULL) { goto error; } mgr->tcp_sess_table = session_table_create(); if (mgr->tcp_sess_table == NULL) { goto error; } mgr->udp_sess_table = session_table_create(); if (mgr->udp_sess_table == NULL) { goto error; } mgr->sess_timer = session_timer_create(); if (mgr->sess_timer == NULL) { goto error; } mgr->sess_evicted_queue = session_queue_create(); if (mgr->sess_evicted_queue == NULL) { goto error; } mgr->sess_toclosed_queue = session_queue_create(); if (mgr->sess_toclosed_queue == NULL) { goto error; } mgr->tcp_dupkt_filter = dupkt_filter_create(mgr->config.tcp_dupkt_filter_enable, mgr->config.tcp_dupkt_filter_capacity, mgr->config.tcp_dupkt_filter_error_rate, mgr->config.tcp_dupkt_filter_timeout); if (mgr->tcp_dupkt_filter == NULL) { goto error; } mgr->udp_eviction_filter = eviction_filter_create(mgr->config.udp_eviction_filter_enable, mgr->config.udp_eviction_filter_capacity, mgr->config.udp_eviction_filter_error_rate, mgr->config.udp_eviction_filter_timeout); if (mgr->udp_eviction_filter == NULL) { goto error; } return mgr; error: session_manager_destroy(mgr); return NULL; } void session_manager_destroy(struct session_manager *mgr) { struct session *sess; if (mgr) { // move all evicted session to closed queue while (mgr->sess_evicted_queue && session_manager_get_evicted_session(mgr)) { } // free all closed queue if (mgr->sess_toclosed_queue) { session_manager_recycle_session(mgr); } // free all udp session which is not in closed state while (mgr->udp_sess_table && (sess = session_table_find_least_recently_unused_session(mgr->udp_sess_table))) { session_manager_free_session(mgr, sess); } // free all tcp session which is not in closed state while (mgr->tcp_sess_table && (sess = session_table_find_least_recently_unused_session(mgr->tcp_sess_table))) { session_manager_free_session(mgr, sess); } eviction_filter_destroy(mgr->udp_eviction_filter); dupkt_filter_destroy(mgr->tcp_dupkt_filter); session_queue_destroy(mgr->sess_toclosed_queue); session_queue_destroy(mgr->sess_evicted_queue); session_timer_destroy(mgr->sess_timer); session_table_destroy(mgr->udp_sess_table); session_table_destroy(mgr->tcp_sess_table); session_pool_destroy(mgr->sess_pool); free(mgr); mgr = NULL; } } // only use the packet six-tuple to find the session, not update it struct session *session_manager_lookup_sesssion(struct session_manager *mgr, const struct packet *pkt) { struct tuple6 key; memset(&key, 0, sizeof(struct tuple6)); if (packet_get_innermost_tuple6(pkt, &key)) { return NULL; } if (key.ip_proto == IPPROTO_UDP) { return session_table_find_session(mgr->udp_sess_table, &key); } else if (key.ip_proto == IPPROTO_TCP) { return session_table_find_session(mgr->tcp_sess_table, &key); } else { return NULL; } } /* * Return NULL in the following cases: * 1.not a TCP or UDP packet * 2.TCP packet miss session but no syn packet seen * 3.TCP duplicate packet * 4.TCP discards packets * 5.UDP evict packet * pakcet will not update the session and needs to be fast forwarded */ struct session *session_manager_update_session(struct session_manager *mgr, const struct packet *pkt) { assert(session_manager_get_evicted_session(mgr) == NULL); session_manager_recycle_session(mgr); struct tuple6 key; memset(&key, 0, sizeof(struct tuple6)); if (packet_get_innermost_tuple6(pkt, &key)) { mgr->npkts_miss_l4_proto++; return NULL; } struct session *sess = NULL; if (key.ip_proto == IPPROTO_UDP) { sess = session_table_find_session(mgr->udp_sess_table, &key); if (sess) { return session_manager_update_udp_session(mgr, sess, pkt, &key); } else { return session_manager_new_udp_session(mgr, pkt, &key); } } else if (key.ip_proto == IPPROTO_TCP) { sess = session_table_find_session(mgr->tcp_sess_table, &key); if (sess) { return session_manager_update_tcp_session(mgr, sess, pkt, &key); } else { return session_manager_new_tcp_session(mgr, pkt, &key); } } else { return NULL; } } struct session *session_manager_get_expired_session(struct session_manager *mgr) { session_manager_recycle_session(mgr); struct session *sess = session_timer_expire_session(mgr->sess_timer, timestamp_get_sec()); if (sess) { session_run_expirecb(sess); if (session_get_state(sess) == SESSION_STATE_CLOSED) { return NULL; } if (session_get_type(sess) == SESSION_TYPE_UDP) { session_queue_push(mgr->sess_toclosed_queue, sess); } } return sess; } struct session *session_manager_get_evicted_session(struct session_manager *mgr) { session_manager_recycle_session(mgr); struct session *sess = session_queue_pop(mgr->sess_evicted_queue); if (sess) { session_queue_push(mgr->sess_toclosed_queue, sess); } return sess; } uint64_t session_manager_get_expire_interval(struct session_manager *mgr) { return session_timer_next_expire_interval(mgr->sess_timer); } void session_manager_get_session_counter(struct session_manager *mgr, struct session_counter *out) { out->tcp_sess_num = mgr->tcp_sess_num; out->tcp_opening_sess_num = mgr->tcp_opening_sess_num; out->tcp_active_sess_num = mgr->tcp_active_sess_num; out->tcp_closing_sess_num = mgr->tcp_closing_sess_num; out->udp_sess_num = mgr->udp_sess_num; out->udp_opening_sess_num = mgr->udp_opening_sess_num; out->udp_active_sess_num = mgr->udp_active_sess_num; out->udp_closing_sess_num = mgr->udp_closing_sess_num; out->tcp_overload_evict_old_sess_num = mgr->tcp_overload_evict_old_sess_num; out->tcp_overload_evict_new_sess_num = mgr->tcp_overload_evict_new_sess_num; out->udp_overload_evict_old_sess_num = mgr->udp_overload_evict_old_sess_num; out->udp_overload_evict_new_sess_num = mgr->udp_overload_evict_new_sess_num; } void session_manager_print_status(struct session_manager *mgr) { printf("session manager status:\n"); printf("tcp session number : %lu\n", mgr->tcp_sess_num); printf(" opening number : %lu\n", mgr->tcp_opening_sess_num); printf(" active number : %lu\n", mgr->tcp_active_sess_num); printf(" closing number : %lu\n", mgr->tcp_closing_sess_num); printf("udp session number : %lu\n", mgr->udp_sess_num); printf(" opening number : %lu\n", mgr->udp_opening_sess_num); printf(" active number : %lu\n", mgr->udp_active_sess_num); printf(" closing number : %lu\n", mgr->udp_closing_sess_num); printf("tcp overload evict : \n"); printf(" old session number: %lu\n", mgr->tcp_overload_evict_old_sess_num); printf(" new session number: %lu\n", mgr->tcp_overload_evict_new_sess_num); printf("udp overload evict : \n"); printf(" old session number: %lu\n", mgr->udp_overload_evict_old_sess_num); printf(" new session number: %lu\n", mgr->udp_overload_evict_new_sess_num); printf("packet status:\n"); printf(" miss l4 proto : %lu (bypass)\n", mgr->npkts_miss_l4_proto); printf(" hit tcp miss sess : %lu (bypass)\n", mgr->npkts_hit_tcp_miss_sess); printf(" hit tcp dupkt : %lu (bypass)\n", mgr->npkts_hit_tcp_dupkt); printf(" hit tcp discard : %lu (drop)\n", mgr->npkts_hit_tcp_discard); printf(" hit udp evicted : %lu (bypass)\n", mgr->npkts_hit_udp_evicted); }