optimizate: pass the current timeout to the ID generator as a parameter, instead of getting the time from the ID generator itself

This commit is contained in:
luwenpeng
2024-07-16 17:13:32 +08:00
parent 07ce636f64
commit b50f0c2c70
21 changed files with 197 additions and 201 deletions

View File

@@ -4,7 +4,6 @@
#include "log.h"
#include "macro.h"
#include "times.h"
#include "tcp_utils.h"
#include "udp_utils.h"
#include "packet_layer.h"
@@ -36,6 +35,12 @@ struct session_manager
struct session_manager_stat stat;
struct session_manager_options opts;
/*
* only used for session_set_discard() or session_manager record_duplicated_packet(),
* because the function is called by pluin and has no time input.
*/
uint64_t now_ms;
};
#define EVICTE_SESSION_BURST (RX_BURST_MAX)
@@ -286,7 +291,7 @@ static int tcp_init(struct session_manager *mgr, struct session *sess)
return 0;
}
static void tcp_update(struct session_manager *mgr, struct session *sess, enum flow_direction dir, const struct raw_layer *tcp_layer, uint64_t now)
static void tcp_update(struct session_manager *mgr, struct session *sess, enum flow_direction dir, const struct raw_layer *tcp_layer)
{
struct tcp_segment *seg;
struct tcphdr *hdr = (struct tcphdr *)tcp_layer->hdr_ptr;
@@ -329,7 +334,7 @@ static void tcp_update(struct session_manager *mgr, struct session *sess, enum f
tcp_reassembly_set_recv_next(half->assembler, len ? half->seq : half->seq + 1);
}
seg = tcp_reassembly_expire(half->assembler, now);
seg = tcp_reassembly_expire(half->assembler, mgr->now_ms);
if (seg)
{
session_inc_stat(sess, dir, STAT_TCP_SEGMENTS_EXPIRED, 1);
@@ -371,7 +376,7 @@ static void tcp_update(struct session_manager *mgr, struct session *sess, enum f
}
else if ((seg = tcp_segment_new(half->seq, tcp_layer->pld_ptr, len)))
{
switch (tcp_reassembly_push(half->assembler, seg, now))
switch (tcp_reassembly_push(half->assembler, seg, mgr->now_ms))
{
case -2:
session_inc_stat(sess, dir, STAT_TCP_SEGMENTS_RETRANSMIT, 1);
@@ -452,7 +457,7 @@ static enum flow_direction identify_direction_by_history(const struct session *s
******************************************************************************/
// on new session
static int tcp_overload_bypass(struct session_manager *mgr, const struct tuple6 *key, uint64_t now)
static int tcp_overload_bypass(struct session_manager *mgr, const struct tuple6 *key)
{
if (key->ip_proto == IPPROTO_TCP && mgr->stat.curr_nr_tcp_sess_used >= mgr->opts.max_tcp_session_num)
{
@@ -461,7 +466,8 @@ static int tcp_overload_bypass(struct session_manager *mgr, const struct tuple6
}
return 0;
}
static int udp_overload_bypass(struct session_manager *mgr, const struct tuple6 *key, uint64_t now)
static int udp_overload_bypass(struct session_manager *mgr, const struct tuple6 *key)
{
if (key->ip_proto == IPPROTO_UDP && mgr->stat.curr_nr_udp_sess_used >= mgr->opts.max_udp_session_num)
{
@@ -470,9 +476,10 @@ static int udp_overload_bypass(struct session_manager *mgr, const struct tuple6
}
return 0;
}
static int evicted_session_bypass(struct session_manager *mgr, const struct tuple6 *key, uint64_t now)
static int evicted_session_bypass(struct session_manager *mgr, const struct tuple6 *key)
{
if (mgr->opts.evicted_session_filter_enable && evicted_session_filter_lookup(mgr->evicte_sess_filter, key, now))
if (mgr->opts.evicted_session_filter_enable && evicted_session_filter_lookup(mgr->evicte_sess_filter, key, mgr->now_ms))
{
mgr->stat.nr_udp_pkts_evctd_bypass++;
return 1;
@@ -480,8 +487,9 @@ static int evicted_session_bypass(struct session_manager *mgr, const struct tupl
return 0;
}
// on update session
static int duplicated_packet_bypass(struct session_manager *mgr, struct session *sess, const struct packet *pkt, const struct tuple6 *key, uint64_t now)
static int duplicated_packet_bypass(struct session_manager *mgr, struct session *sess, const struct packet *pkt, const struct tuple6 *key)
{
if (mgr->opts.duplicated_packet_filter_enable == 0)
{
@@ -491,7 +499,7 @@ static int duplicated_packet_bypass(struct session_manager *mgr, struct session
enum flow_direction dir = identify_direction_by_history(sess, key);
if (session_get_stat(sess, dir, STAT_RAW_PACKETS_RECEIVED) < 3 || session_has_duplicate_traffic(sess))
{
if (duplicated_packet_filter_lookup(mgr->dup_pkt_filter, pkt, now))
if (duplicated_packet_filter_lookup(mgr->dup_pkt_filter, pkt, mgr->now_ms))
{
session_inc_stat(sess, dir, STAT_DUPLICATE_PACKETS_BYPASS, 1);
session_inc_stat(sess, dir, STAT_DUPLICATE_BYTES_BYPASS, packet_get_raw_len(pkt));
@@ -515,7 +523,7 @@ static int duplicated_packet_bypass(struct session_manager *mgr, struct session
}
else
{
duplicated_packet_filter_add(mgr->dup_pkt_filter, pkt, now);
duplicated_packet_filter_add(mgr->dup_pkt_filter, pkt, mgr->now_ms);
return 0;
}
}
@@ -527,12 +535,11 @@ static int duplicated_packet_bypass(struct session_manager *mgr, struct session
* Session Manager
******************************************************************************/
static void session_update(struct session *sess, enum session_state next_state, const struct packet *pkt, const struct tuple6 *key, enum flow_direction dir)
static void session_update(struct session_manager *mgr, struct session *sess, enum session_state next_state, const struct packet *pkt, const struct tuple6 *key, enum flow_direction dir)
{
uint64_t real_sec = stellar_get_real_time_sec();
if (session_get_current_state(sess) == SESSION_STATE_INIT)
{
session_set_id(sess, id_generator_alloc());
session_set_id(sess, id_generator_alloc(mgr->now_ms / 1000));
session_set_tuple6(sess, key);
session_set_tuple_direction(sess, dir);
@@ -562,7 +569,7 @@ static void session_update(struct session *sess, enum session_state next_state,
}
tuple6_to_str(key, sess->tuple_str, sizeof(sess->tuple_str));
session_set_timestamp(sess, SESSION_TIMESTAMP_START, real_sec);
session_set_timestamp(sess, SESSION_TIMESTAMP_START, mgr->now_ms);
switch (key->ip_proto)
{
case IPPROTO_TCP:
@@ -589,11 +596,11 @@ static void session_update(struct session *sess, enum session_state next_state,
session_set_current_packet(sess, pkt);
session_set_current_flow_direction(sess, dir);
session_set_timestamp(sess, SESSION_TIMESTAMP_LAST, real_sec);
session_set_timestamp(sess, SESSION_TIMESTAMP_LAST, mgr->now_ms);
session_set_current_state(sess, next_state);
}
static void session_manager_evicte_session(struct session_manager *mgr, struct session *sess, uint64_t now, int reason)
static void session_manager_evicte_session(struct session_manager *mgr, struct session *sess, int reason)
{
if (sess == NULL)
{
@@ -632,7 +639,7 @@ static void session_manager_evicte_session(struct session_manager *mgr, struct s
session_table_del(mgr->udp_sess_table, sess);
if (mgr->opts.evicted_session_filter_enable)
{
evicted_session_filter_add(mgr->evicte_sess_filter, session_get_tuple6(sess), now);
evicted_session_filter_add(mgr->evicte_sess_filter, session_get_tuple6(sess), mgr->now_ms);
}
SESS_MGR_STAT_UPDATE(&mgr->stat, curr_state, next_state, udp);
mgr->stat.nr_udp_sess_evicted++;
@@ -643,7 +650,7 @@ static void session_manager_evicte_session(struct session_manager *mgr, struct s
}
}
static struct session *session_manager_lookup_tcp_session(struct session_manager *mgr, const struct packet *pkt, const struct tuple6 *key, uint64_t now)
static struct session *session_manager_lookup_tcp_session(struct session_manager *mgr, const struct packet *pkt, const struct tuple6 *key)
{
struct session *sess = session_table_find_tuple6(mgr->tcp_sess_table, key);
if (sess == NULL)
@@ -665,7 +672,7 @@ static struct session *session_manager_lookup_tcp_session(struct session_manager
((half->history & TH_FIN) || (half->history & TH_RST))) // recv SYN after FIN or RST
{
// TCP port reuse, evict old session
session_manager_evicte_session(mgr, sess, now, PORT_REUSE_EVICT);
session_manager_evicte_session(mgr, sess, PORT_REUSE_EVICT);
return NULL;
}
else
@@ -675,7 +682,7 @@ static struct session *session_manager_lookup_tcp_session(struct session_manager
}
}
static struct session *session_manager_new_tcp_session(struct session_manager *mgr, const struct packet *pkt, const struct tuple6 *key, uint64_t now)
static struct session *session_manager_new_tcp_session(struct session_manager *mgr, const struct packet *pkt, const struct tuple6 *key)
{
const struct raw_layer *tcp_layer = packet_get_innermost_raw_layer(pkt, LAYER_PROTO_TCP);
const struct tcphdr *hdr = (const struct tcphdr *)tcp_layer->hdr_ptr;
@@ -690,7 +697,7 @@ static struct session *session_manager_new_tcp_session(struct session_manager *m
if (mgr->opts.tcp_overload_evict_old_sess && mgr->stat.curr_nr_tcp_sess_used >= mgr->opts.max_tcp_session_num - EVICTE_SESSION_BURST)
{
struct session *evic_sess = session_table_find_lru(mgr->tcp_sess_table);
session_manager_evicte_session(mgr, evic_sess, now, LRU_EVICT);
session_manager_evicte_session(mgr, evic_sess, LRU_EVICT);
}
enum flow_direction dir = (flags & TH_ACK) ? FLOW_DIRECTION_S2C : FLOW_DIRECTION_C2S;
@@ -705,7 +712,7 @@ static struct session *session_manager_new_tcp_session(struct session_manager *m
sess->mgr_stat = &mgr->stat;
enum session_state next_state = session_transition_run(SESSION_STATE_INIT, TCP_SYN);
session_update(sess, next_state, pkt, key, dir);
session_update(mgr, sess, next_state, pkt, key, dir);
session_transition_log(sess, SESSION_STATE_INIT, next_state, TCP_SYN);
if (tcp_init(mgr, sess) == -1)
@@ -714,15 +721,15 @@ static struct session *session_manager_new_tcp_session(struct session_manager *m
session_pool_push(mgr->sess_pool, sess);
return NULL;
}
tcp_update(mgr, sess, dir, tcp_layer, now);
tcp_update(mgr, sess, dir, tcp_layer);
uint64_t timeout = (flags & TH_ACK) ? mgr->opts.tcp_handshake_timeout : mgr->opts.tcp_init_timeout;
session_timer_update(mgr->sess_timer, sess, now + timeout);
session_timer_update(mgr->sess_timer, sess, mgr->now_ms + timeout);
session_table_add(mgr->tcp_sess_table, sess);
if (mgr->opts.duplicated_packet_filter_enable)
{
duplicated_packet_filter_add(mgr->dup_pkt_filter, pkt, now);
duplicated_packet_filter_add(mgr->dup_pkt_filter, pkt, mgr->now_ms);
}
SESS_MGR_STAT_INC(&mgr->stat, next_state, tcp);
@@ -732,13 +739,13 @@ static struct session *session_manager_new_tcp_session(struct session_manager *m
return sess;
}
static struct session *session_manager_new_udp_session(struct session_manager *mgr, const struct packet *pkt, const struct tuple6 *key, uint64_t now)
static struct session *session_manager_new_udp_session(struct session_manager *mgr, const struct packet *pkt, const struct tuple6 *key)
{
// udp table full evict old session
if (mgr->opts.udp_overload_evict_old_sess && mgr->stat.curr_nr_udp_sess_used >= mgr->opts.max_udp_session_num - EVICTE_SESSION_BURST)
{
struct session *evic_sess = session_table_find_lru(mgr->udp_sess_table);
session_manager_evicte_session(mgr, evic_sess, now, LRU_EVICT);
session_manager_evicte_session(mgr, evic_sess, LRU_EVICT);
}
struct session *sess = session_pool_pop(mgr->sess_pool);
@@ -753,10 +760,10 @@ static struct session *session_manager_new_udp_session(struct session_manager *m
enum flow_direction dir = identify_direction_by_port(ntohs(key->src_port), ntohs(key->dst_port));
enum session_state next_state = session_transition_run(SESSION_STATE_INIT, UDP_DATA);
session_update(sess, next_state, pkt, key, dir);
session_update(mgr, sess, next_state, pkt, key, dir);
session_transition_log(sess, SESSION_STATE_INIT, next_state, UDP_DATA);
session_timer_update(mgr->sess_timer, sess, now + mgr->opts.udp_data_timeout);
session_timer_update(mgr->sess_timer, sess, mgr->now_ms + mgr->opts.udp_data_timeout);
session_table_add(mgr->udp_sess_table, sess);
SESS_MGR_STAT_INC(&mgr->stat, next_state, udp);
@@ -766,7 +773,7 @@ static struct session *session_manager_new_udp_session(struct session_manager *m
return sess;
}
static int session_manager_update_tcp_session(struct session_manager *mgr, struct session *sess, const struct packet *pkt, const struct tuple6 *key, uint64_t now)
static int session_manager_update_tcp_session(struct session_manager *mgr, struct session *sess, const struct packet *pkt, const struct tuple6 *key)
{
const struct raw_layer *tcp_layer = packet_get_innermost_raw_layer(pkt, LAYER_PROTO_TCP);
const struct tcphdr *hdr = (const struct tcphdr *)tcp_layer->hdr_ptr;
@@ -783,11 +790,11 @@ static int session_manager_update_tcp_session(struct session_manager *mgr, struc
enum session_state next_state = session_transition_run(curr_state, inputs);
// update session
session_update(sess, next_state, pkt, key, dir);
session_update(mgr, sess, next_state, pkt, key, dir);
session_transition_log(sess, curr_state, next_state, inputs);
// update tcp
tcp_update(mgr, sess, dir, tcp_layer, now);
tcp_update(mgr, sess, dir, tcp_layer);
// set closing reason
if (next_state == SESSION_STATE_CLOSING && !session_get_closing_reason(sess))
@@ -844,28 +851,28 @@ static int session_manager_update_tcp_session(struct session_manager *mgr, struc
assert(0);
break;
}
session_timer_update(mgr->sess_timer, sess, now + timeout);
session_timer_update(mgr->sess_timer, sess, mgr->now_ms + timeout);
SESS_MGR_STAT_UPDATE(&mgr->stat, curr_state, next_state, tcp);
return 0;
}
static int session_manager_update_udp_session(struct session_manager *mgr, struct session *sess, const struct packet *pkt, const struct tuple6 *key, uint64_t now)
static int session_manager_update_udp_session(struct session_manager *mgr, struct session *sess, const struct packet *pkt, const struct tuple6 *key)
{
enum flow_direction dir = identify_direction_by_history(sess, key);
enum session_state curr_state = session_get_current_state(sess);
enum session_state next_state = session_transition_run(curr_state, UDP_DATA);
session_update(sess, next_state, pkt, key, dir);
session_update(mgr, sess, next_state, pkt, key, dir);
session_transition_log(sess, curr_state, next_state, UDP_DATA);
if (session_get_current_state(sess) == SESSION_STATE_DISCARD)
{
session_timer_update(mgr->sess_timer, sess, now + mgr->opts.udp_discard_timeout);
session_timer_update(mgr->sess_timer, sess, mgr->now_ms + mgr->opts.udp_discard_timeout);
}
else
{
session_timer_update(mgr->sess_timer, sess, now + mgr->opts.udp_data_timeout);
session_timer_update(mgr->sess_timer, sess, mgr->now_ms + mgr->opts.udp_data_timeout);
}
SESS_MGR_STAT_UPDATE(&mgr->stat, curr_state, next_state, udp);
@@ -877,7 +884,7 @@ static int session_manager_update_udp_session(struct session_manager *mgr, struc
* Public API
******************************************************************************/
struct session_manager *session_manager_new(struct session_manager_options *opts, uint64_t now)
struct session_manager *session_manager_new(struct session_manager_options *opts, uint64_t now_ms)
{
if (check_options(opts) == -1)
{
@@ -894,7 +901,7 @@ struct session_manager *session_manager_new(struct session_manager_options *opts
mgr->sess_pool = session_pool_new(mgr->opts.max_tcp_session_num + mgr->opts.max_udp_session_num);
mgr->tcp_sess_table = session_table_new();
mgr->udp_sess_table = session_table_new();
mgr->sess_timer = session_timer_new(now);
mgr->sess_timer = session_timer_new(now_ms);
if (mgr->sess_pool == NULL || mgr->tcp_sess_table == NULL || mgr->udp_sess_table == NULL || mgr->sess_timer == NULL)
{
goto error;
@@ -903,7 +910,7 @@ struct session_manager *session_manager_new(struct session_manager_options *opts
{
mgr->evicte_sess_filter = evicted_session_filter_new(mgr->opts.evicted_session_filter_capacity,
mgr->opts.evicted_session_filter_timeout,
mgr->opts.evicted_session_filter_error_rate, now);
mgr->opts.evicted_session_filter_error_rate, now_ms);
if (mgr->evicte_sess_filter == NULL)
{
goto error;
@@ -913,7 +920,7 @@ struct session_manager *session_manager_new(struct session_manager_options *opts
{
mgr->dup_pkt_filter = duplicated_packet_filter_new(mgr->opts.duplicated_packet_filter_capacity,
mgr->opts.duplicated_packet_filter_timeout,
mgr->opts.duplicated_packet_filter_error_rate, now);
mgr->opts.duplicated_packet_filter_error_rate, now_ms);
if (mgr->dup_pkt_filter == NULL)
{
goto error;
@@ -922,6 +929,7 @@ struct session_manager *session_manager_new(struct session_manager_options *opts
INIT_LIST_HEAD(&mgr->evicte_queue);
session_transition_init();
mgr->now_ms = now_ms;
return mgr;
@@ -969,16 +977,18 @@ void session_manager_free(struct session_manager *mgr)
}
}
void session_manager_record_duplicated_packet(struct session_manager *mgr, const struct packet *pkt, uint64_t now)
void session_manager_record_duplicated_packet(struct session_manager *mgr, const struct packet *pkt)
{
if (mgr->opts.duplicated_packet_filter_enable)
{
duplicated_packet_filter_add(mgr->dup_pkt_filter, pkt, now);
duplicated_packet_filter_add(mgr->dup_pkt_filter, pkt, mgr->now_ms);
}
}
struct session *session_manager_new_session(struct session_manager *mgr, const struct packet *pkt, uint64_t now)
struct session *session_manager_new_session(struct session_manager *mgr, const struct packet *pkt, uint64_t now_ms)
{
mgr->now_ms = now_ms;
struct tuple6 key;
if (packet_get_innermost_tuple6(pkt, &key))
{
@@ -987,21 +997,21 @@ struct session *session_manager_new_session(struct session_manager *mgr, const s
switch (key.ip_proto)
{
case IPPROTO_TCP:
if (tcp_overload_bypass(mgr, &key, now))
if (tcp_overload_bypass(mgr, &key))
{
return NULL;
}
return session_manager_new_tcp_session(mgr, pkt, &key, now);
return session_manager_new_tcp_session(mgr, pkt, &key);
case IPPROTO_UDP:
if (udp_overload_bypass(mgr, &key, now))
if (udp_overload_bypass(mgr, &key))
{
return NULL;
}
if (evicted_session_bypass(mgr, &key, now))
if (evicted_session_bypass(mgr, &key))
{
return NULL;
}
return session_manager_new_udp_session(mgr, pkt, &key, now);
return session_manager_new_udp_session(mgr, pkt, &key);
default:
return NULL;
}
@@ -1054,8 +1064,10 @@ void session_manager_free_session(struct session_manager *mgr, struct session *s
}
}
struct session *session_manager_lookup_session(struct session_manager *mgr, const struct packet *pkt, uint64_t now)
struct session *session_manager_lookup_session(struct session_manager *mgr, const struct packet *pkt, uint64_t now_ms)
{
mgr->now_ms = now_ms;
struct tuple6 key;
if (packet_get_innermost_tuple6(pkt, &key))
{
@@ -1066,37 +1078,41 @@ struct session *session_manager_lookup_session(struct session_manager *mgr, cons
case IPPROTO_UDP:
return session_table_find_tuple6(mgr->udp_sess_table, &key);
case IPPROTO_TCP:
return session_manager_lookup_tcp_session(mgr, pkt, &key, now);
return session_manager_lookup_tcp_session(mgr, pkt, &key);
default:
return NULL;
}
}
int session_manager_update_session(struct session_manager *mgr, struct session *sess, const struct packet *pkt, uint64_t now)
int session_manager_update_session(struct session_manager *mgr, struct session *sess, const struct packet *pkt, uint64_t now_ms)
{
mgr->now_ms = now_ms;
struct tuple6 key;
if (packet_get_innermost_tuple6(pkt, &key))
{
return -1;
}
if (duplicated_packet_bypass(mgr, sess, pkt, &key, now))
if (duplicated_packet_bypass(mgr, sess, pkt, &key))
{
return -1;
}
switch (session_get_type(sess))
{
case SESSION_TYPE_TCP:
return session_manager_update_tcp_session(mgr, sess, pkt, &key, now);
return session_manager_update_tcp_session(mgr, sess, pkt, &key);
case SESSION_TYPE_UDP:
return session_manager_update_udp_session(mgr, sess, pkt, &key, now);
return session_manager_update_udp_session(mgr, sess, pkt, &key);
default:
return -1;
}
}
struct session *session_manager_get_expired_session(struct session_manager *mgr, uint64_t now)
struct session *session_manager_get_expired_session(struct session_manager *mgr, uint64_t now_ms)
{
struct session *sess = session_timer_expire(mgr->sess_timer, now);
mgr->now_ms = now_ms;
struct session *sess = session_timer_expire(mgr->sess_timer, now_ms);
if (sess)
{
enum session_state curr_state = session_get_current_state(sess);
@@ -1132,10 +1148,10 @@ struct session *session_manager_get_expired_session(struct session_manager *mgr,
switch (session_get_type(sess))
{
case SESSION_TYPE_TCP:
session_timer_update(mgr->sess_timer, sess, now + mgr->opts.tcp_data_timeout);
session_timer_update(mgr->sess_timer, sess, now_ms + mgr->opts.tcp_data_timeout);
break;
case SESSION_TYPE_UDP:
session_timer_update(mgr->sess_timer, sess, now + mgr->opts.udp_data_timeout);
session_timer_update(mgr->sess_timer, sess, now_ms + mgr->opts.udp_data_timeout);
break;
default:
assert(0);
@@ -1174,7 +1190,6 @@ struct session_manager_stat *session_manager_stat(struct session_manager *mgr)
void session_set_discard(struct session *sess)
{
uint64_t now = stellar_get_monotonic_time_msec();
struct session_manager *mgr = sess->mgr;
enum session_type type = session_get_type(sess);
enum session_state curr_state = session_get_current_state(sess);
@@ -1185,11 +1200,11 @@ void session_set_discard(struct session *sess)
switch (type)
{
case SESSION_TYPE_TCP:
session_timer_update(mgr->sess_timer, sess, now + mgr->opts.tcp_discard_timeout);
session_timer_update(mgr->sess_timer, sess, mgr->now_ms + mgr->opts.tcp_discard_timeout);
SESS_MGR_STAT_UPDATE(&mgr->stat, curr_state, next_state, tcp);
break;
case SESSION_TYPE_UDP:
session_timer_update(mgr->sess_timer, sess, now + mgr->opts.udp_discard_timeout);
session_timer_update(mgr->sess_timer, sess, mgr->now_ms + mgr->opts.udp_discard_timeout);
SESS_MGR_STAT_UPDATE(&mgr->stat, curr_state, next_state, udp);
break;
default: