feature: packet IO support IP reassembly
This commit is contained in:
@@ -3,7 +3,7 @@
|
||||
#include <assert.h>
|
||||
#include <errno.h>
|
||||
|
||||
#include "utils.h"
|
||||
#include "utils_internal.h"
|
||||
#include "packet_helper.h"
|
||||
#include "packet_filter.h"
|
||||
#include "session_internal.h"
|
||||
@@ -28,7 +28,7 @@ struct snowflake
|
||||
|
||||
struct session_manager_runtime
|
||||
{
|
||||
struct session_list evicte_list;
|
||||
struct session_queue evicte_list;
|
||||
struct session_pool *sess_pool;
|
||||
struct session_timer *sess_timer;
|
||||
struct session_table *tcp_sess_table;
|
||||
@@ -167,30 +167,30 @@ static uint64_t snowflake_generate(struct snowflake *sf, uint64_t now_sec)
|
||||
|
||||
static void tcp_clean(struct session_manager_runtime *sess_mgr_rt, struct session *sess)
|
||||
{
|
||||
struct tcp_reassembly *c2s_ssembler = sess->tcp_halfs[FLOW_TYPE_C2S].assembler;
|
||||
struct tcp_reassembly *s2c_ssembler = sess->tcp_halfs[FLOW_TYPE_S2C].assembler;
|
||||
struct tcp_reassembly *c2s_tcp_reass = sess->tcp_halfs[FLOW_TYPE_C2S].tcp_reass;
|
||||
struct tcp_reassembly *s2c_tcp_reass = sess->tcp_halfs[FLOW_TYPE_S2C].tcp_reass;
|
||||
struct tcp_segment *seg;
|
||||
if (c2s_ssembler)
|
||||
if (c2s_tcp_reass)
|
||||
{
|
||||
while ((seg = tcp_reassembly_expire(c2s_ssembler, UINT64_MAX)))
|
||||
while ((seg = tcp_reassembly_expire(c2s_tcp_reass, UINT64_MAX)))
|
||||
{
|
||||
session_inc_stat(sess, FLOW_TYPE_C2S, STAT_TCP_SEGMENTS_RELEASED, 1);
|
||||
session_inc_stat(sess, FLOW_TYPE_C2S, STAT_TCP_PAYLOADS_RELEASED, seg->len);
|
||||
sess_mgr_rt->stat.tcp_segs_freed++;
|
||||
tcp_segment_free(seg);
|
||||
}
|
||||
tcp_reassembly_free(c2s_ssembler);
|
||||
tcp_reassembly_free(c2s_tcp_reass);
|
||||
}
|
||||
if (s2c_ssembler)
|
||||
if (s2c_tcp_reass)
|
||||
{
|
||||
while ((seg = tcp_reassembly_expire(s2c_ssembler, UINT64_MAX)))
|
||||
while ((seg = tcp_reassembly_expire(s2c_tcp_reass, UINT64_MAX)))
|
||||
{
|
||||
session_inc_stat(sess, FLOW_TYPE_S2C, STAT_TCP_SEGMENTS_RELEASED, 1);
|
||||
session_inc_stat(sess, FLOW_TYPE_S2C, STAT_TCP_PAYLOADS_RELEASED, seg->len);
|
||||
sess_mgr_rt->stat.tcp_segs_freed++;
|
||||
tcp_segment_free(seg);
|
||||
}
|
||||
tcp_reassembly_free(s2c_ssembler);
|
||||
tcp_reassembly_free(s2c_tcp_reass);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -201,18 +201,18 @@ static int tcp_init(struct session_manager_runtime *sess_mgr_rt, struct session
|
||||
return 0;
|
||||
}
|
||||
|
||||
sess->tcp_halfs[FLOW_TYPE_C2S].assembler = tcp_reassembly_new(sess_mgr_rt->cfg.tcp_reassembly.timeout_ms, sess_mgr_rt->cfg.tcp_reassembly.buffered_segments_max);
|
||||
sess->tcp_halfs[FLOW_TYPE_S2C].assembler = tcp_reassembly_new(sess_mgr_rt->cfg.tcp_reassembly.timeout_ms, sess_mgr_rt->cfg.tcp_reassembly.buffered_segments_max);
|
||||
if (sess->tcp_halfs[FLOW_TYPE_C2S].assembler == NULL || sess->tcp_halfs[FLOW_TYPE_S2C].assembler == NULL)
|
||||
sess->tcp_halfs[FLOW_TYPE_C2S].tcp_reass = tcp_reassembly_new(sess_mgr_rt->cfg.tcp_reassembly.timeout_ms, sess_mgr_rt->cfg.tcp_reassembly.buffered_segments_max);
|
||||
sess->tcp_halfs[FLOW_TYPE_S2C].tcp_reass = tcp_reassembly_new(sess_mgr_rt->cfg.tcp_reassembly.timeout_ms, sess_mgr_rt->cfg.tcp_reassembly.buffered_segments_max);
|
||||
if (sess->tcp_halfs[FLOW_TYPE_C2S].tcp_reass == NULL || sess->tcp_halfs[FLOW_TYPE_S2C].tcp_reass == NULL)
|
||||
{
|
||||
tcp_clean(sess_mgr_rt, sess);
|
||||
return -1;
|
||||
}
|
||||
|
||||
SESSION_MANAGER_LOG_DEBUG("session %lu %s new c2s tcp assembler %p, s2c tcp assembler %p",
|
||||
SESSION_MANAGER_LOG_DEBUG("session %lu %s new c2s tcp tcp_reass %p, s2c tcp tcp_reass %p",
|
||||
session_get_id(sess), session_get0_readable_addr(sess),
|
||||
sess->tcp_halfs[FLOW_TYPE_C2S].assembler,
|
||||
sess->tcp_halfs[FLOW_TYPE_S2C].assembler);
|
||||
sess->tcp_halfs[FLOW_TYPE_C2S].tcp_reass,
|
||||
sess->tcp_halfs[FLOW_TYPE_S2C].tcp_reass);
|
||||
|
||||
return 0;
|
||||
}
|
||||
@@ -257,10 +257,10 @@ static void tcp_update(struct session_manager_runtime *sess_mgr_rt, struct sessi
|
||||
if (unlikely(flags & TH_SYN))
|
||||
{
|
||||
// len > 0 is SYN with data (TCP Fast Open)
|
||||
tcp_reassembly_set_recv_next(half->assembler, len ? half->seq : half->seq + 1);
|
||||
tcp_reassembly_set_recv_next(half->tcp_reass, len ? half->seq : half->seq + 1);
|
||||
}
|
||||
|
||||
seg = tcp_reassembly_expire(half->assembler, sess_mgr_rt->now_ms);
|
||||
seg = tcp_reassembly_expire(half->tcp_reass, sess_mgr_rt->now_ms);
|
||||
if (seg)
|
||||
{
|
||||
session_inc_stat(sess, type, STAT_TCP_SEGMENTS_EXPIRED, 1);
|
||||
@@ -280,7 +280,7 @@ static void tcp_update(struct session_manager_runtime *sess_mgr_rt, struct sessi
|
||||
session_inc_stat(sess, type, STAT_TCP_PAYLOADS_RECEIVED, len);
|
||||
sess_mgr_rt->stat.tcp_segs_input++;
|
||||
|
||||
uint32_t rcv_nxt = tcp_reassembly_get_recv_next(half->assembler);
|
||||
uint32_t rcv_nxt = tcp_reassembly_get_recv_next(half->tcp_reass);
|
||||
// in order
|
||||
if (half->seq == rcv_nxt)
|
||||
{
|
||||
@@ -291,7 +291,7 @@ static void tcp_update(struct session_manager_runtime *sess_mgr_rt, struct sessi
|
||||
half->in_order.data = tcp_layer->pld_ptr;
|
||||
half->in_order.len = len;
|
||||
half->in_order_ref = 0;
|
||||
tcp_reassembly_inc_recv_next(half->assembler, len);
|
||||
tcp_reassembly_inc_recv_next(half->tcp_reass, len);
|
||||
}
|
||||
// retransmission
|
||||
else if (uint32_before(uint32_add(half->seq, len), rcv_nxt))
|
||||
@@ -302,7 +302,7 @@ static void tcp_update(struct session_manager_runtime *sess_mgr_rt, struct sessi
|
||||
}
|
||||
else if ((seg = tcp_segment_new(half->seq, tcp_layer->pld_ptr, len)))
|
||||
{
|
||||
switch (tcp_reassembly_push(half->assembler, seg, sess_mgr_rt->now_ms))
|
||||
switch (tcp_reassembly_push(half->tcp_reass, seg, sess_mgr_rt->now_ms))
|
||||
{
|
||||
case -2:
|
||||
session_inc_stat(sess, type, STAT_TCP_SEGMENTS_RETRANSMIT, 1);
|
||||
@@ -481,39 +481,39 @@ struct session_manager_config *session_manager_config_new(const char *toml_file)
|
||||
}
|
||||
|
||||
int ret = 0;
|
||||
ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_session_max", (uint64_t *)&sess_mgr_cfg->tcp_session_max, EVICTE_SESSION_BURST * 2, UINT64_MAX);
|
||||
ret += load_and_validate_toml_integer_config(toml_file, "session_manager.udp_session_max", (uint64_t *)&sess_mgr_cfg->udp_session_max, EVICTE_SESSION_BURST * 2, UINT64_MAX);
|
||||
ret += load_toml_integer_config(toml_file, "session_manager.tcp_session_max", (uint64_t *)&sess_mgr_cfg->tcp_session_max, EVICTE_SESSION_BURST * 2, UINT64_MAX);
|
||||
ret += load_toml_integer_config(toml_file, "session_manager.udp_session_max", (uint64_t *)&sess_mgr_cfg->udp_session_max, EVICTE_SESSION_BURST * 2, UINT64_MAX);
|
||||
|
||||
ret += load_and_validate_toml_integer_config(toml_file, "session_manager.evict_old_on_tcp_table_limit", (uint64_t *)&sess_mgr_cfg->evict_old_on_tcp_table_limit, 0, 1);
|
||||
ret += load_and_validate_toml_integer_config(toml_file, "session_manager.evict_old_on_udp_table_limit", (uint64_t *)&sess_mgr_cfg->evict_old_on_udp_table_limit, 0, 1);
|
||||
ret += load_toml_integer_config(toml_file, "session_manager.evict_old_on_tcp_table_limit", (uint64_t *)&sess_mgr_cfg->evict_old_on_tcp_table_limit, 0, 1);
|
||||
ret += load_toml_integer_config(toml_file, "session_manager.evict_old_on_udp_table_limit", (uint64_t *)&sess_mgr_cfg->evict_old_on_udp_table_limit, 0, 1);
|
||||
|
||||
ret += load_and_validate_toml_integer_config(toml_file, "session_manager.expire_period_ms", (uint64_t *)&sess_mgr_cfg->expire_period_ms, 0, 60000);
|
||||
ret += load_and_validate_toml_integer_config(toml_file, "session_manager.expire_batch_max", (uint64_t *)&sess_mgr_cfg->expire_batch_max, 1, 1024);
|
||||
ret += load_toml_integer_config(toml_file, "session_manager.expire_period_ms", (uint64_t *)&sess_mgr_cfg->expire_period_ms, 0, 60000);
|
||||
ret += load_toml_integer_config(toml_file, "session_manager.expire_batch_max", (uint64_t *)&sess_mgr_cfg->expire_batch_max, 1, 1024);
|
||||
|
||||
ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.init", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.init, 1, 60000);
|
||||
ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.handshake", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.handshake, 1, 60000);
|
||||
ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.data", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.data, 1, 15999999000);
|
||||
ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.half_closed", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.half_closed, 1, 604800000);
|
||||
ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.time_wait", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.time_wait, 1, 60000);
|
||||
ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.discard_default", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.discard_default, 1, 15999999000);
|
||||
ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.unverified_rst", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.unverified_rst, 1, 60000);
|
||||
ret += load_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.init", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.init, 1, 60000);
|
||||
ret += load_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.handshake", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.handshake, 1, 60000);
|
||||
ret += load_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.data", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.data, 1, 15999999000);
|
||||
ret += load_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.half_closed", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.half_closed, 1, 604800000);
|
||||
ret += load_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.time_wait", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.time_wait, 1, 60000);
|
||||
ret += load_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.discard_default", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.discard_default, 1, 15999999000);
|
||||
ret += load_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.unverified_rst", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.unverified_rst, 1, 60000);
|
||||
|
||||
ret += load_and_validate_toml_integer_config(toml_file, "session_manager.udp_timeout_ms.data", (uint64_t *)&sess_mgr_cfg->udp_timeout_ms.data, 1, 15999999000);
|
||||
ret += load_and_validate_toml_integer_config(toml_file, "session_manager.udp_timeout_ms.discard_default", (uint64_t *)&sess_mgr_cfg->udp_timeout_ms.discard_default, 1, 15999999000);
|
||||
ret += load_toml_integer_config(toml_file, "session_manager.udp_timeout_ms.data", (uint64_t *)&sess_mgr_cfg->udp_timeout_ms.data, 1, 15999999000);
|
||||
ret += load_toml_integer_config(toml_file, "session_manager.udp_timeout_ms.discard_default", (uint64_t *)&sess_mgr_cfg->udp_timeout_ms.discard_default, 1, 15999999000);
|
||||
|
||||
ret += load_and_validate_toml_integer_config(toml_file, "session_manager.duplicated_packet_bloom_filter.enable", (uint64_t *)&sess_mgr_cfg->duplicated_packet_bloom_filter.enable, 0, 1);
|
||||
ret += load_and_validate_toml_integer_config(toml_file, "session_manager.duplicated_packet_bloom_filter.capacity", (uint64_t *)&sess_mgr_cfg->duplicated_packet_bloom_filter.capacity, 1, 4294967295);
|
||||
ret += load_and_validate_toml_integer_config(toml_file, "session_manager.duplicated_packet_bloom_filter.time_window_ms", (uint64_t *)&sess_mgr_cfg->duplicated_packet_bloom_filter.time_window_ms, 1, 60000);
|
||||
ret += load_and_validate_toml_double_config(toml_file, "session_manager.duplicated_packet_bloom_filter.error_rate", (double *)&sess_mgr_cfg->duplicated_packet_bloom_filter.error_rate, 0.0, 1.0);
|
||||
ret += load_toml_integer_config(toml_file, "session_manager.duplicated_packet_bloom_filter.enable", (uint64_t *)&sess_mgr_cfg->duplicated_packet_bloom_filter.enable, 0, 1);
|
||||
ret += load_toml_integer_config(toml_file, "session_manager.duplicated_packet_bloom_filter.capacity", (uint64_t *)&sess_mgr_cfg->duplicated_packet_bloom_filter.capacity, 1, 4294967295);
|
||||
ret += load_toml_integer_config(toml_file, "session_manager.duplicated_packet_bloom_filter.time_window_ms", (uint64_t *)&sess_mgr_cfg->duplicated_packet_bloom_filter.time_window_ms, 1, 60000);
|
||||
ret += load_toml_double_config(toml_file, "session_manager.duplicated_packet_bloom_filter.error_rate", (double *)&sess_mgr_cfg->duplicated_packet_bloom_filter.error_rate, 0.0, 1.0);
|
||||
|
||||
ret += load_and_validate_toml_integer_config(toml_file, "session_manager.evicted_session_bloom_filter.enable", (uint64_t *)&sess_mgr_cfg->evicted_session_bloom_filter.enable, 0, 1);
|
||||
ret += load_and_validate_toml_integer_config(toml_file, "session_manager.evicted_session_bloom_filter.capacity", (uint64_t *)&sess_mgr_cfg->evicted_session_bloom_filter.capacity, 1, 4294967295);
|
||||
ret += load_and_validate_toml_integer_config(toml_file, "session_manager.evicted_session_bloom_filter.time_window_ms", (uint64_t *)&sess_mgr_cfg->evicted_session_bloom_filter.time_window_ms, 1, 60000);
|
||||
ret += load_and_validate_toml_double_config(toml_file, "session_manager.evicted_session_bloom_filter.error_rate", (double *)&sess_mgr_cfg->evicted_session_bloom_filter.error_rate, 0.0, 1.0);
|
||||
ret += load_toml_integer_config(toml_file, "session_manager.evicted_session_bloom_filter.enable", (uint64_t *)&sess_mgr_cfg->evicted_session_bloom_filter.enable, 0, 1);
|
||||
ret += load_toml_integer_config(toml_file, "session_manager.evicted_session_bloom_filter.capacity", (uint64_t *)&sess_mgr_cfg->evicted_session_bloom_filter.capacity, 1, 4294967295);
|
||||
ret += load_toml_integer_config(toml_file, "session_manager.evicted_session_bloom_filter.time_window_ms", (uint64_t *)&sess_mgr_cfg->evicted_session_bloom_filter.time_window_ms, 1, 60000);
|
||||
ret += load_toml_double_config(toml_file, "session_manager.evicted_session_bloom_filter.error_rate", (double *)&sess_mgr_cfg->evicted_session_bloom_filter.error_rate, 0.0, 1.0);
|
||||
|
||||
ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_reassembly.enable", (uint64_t *)&sess_mgr_cfg->tcp_reassembly.enable, 0, 1);
|
||||
ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_reassembly.timeout_ms", (uint64_t *)&sess_mgr_cfg->tcp_reassembly.timeout_ms, 1, 60000);
|
||||
ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_reassembly.buffered_segments_max", (uint64_t *)&sess_mgr_cfg->tcp_reassembly.buffered_segments_max, 1, 512);
|
||||
ret += load_toml_integer_config(toml_file, "session_manager.tcp_reassembly.enable", (uint64_t *)&sess_mgr_cfg->tcp_reassembly.enable, 0, 1);
|
||||
ret += load_toml_integer_config(toml_file, "session_manager.tcp_reassembly.timeout_ms", (uint64_t *)&sess_mgr_cfg->tcp_reassembly.timeout_ms, 1, 60000);
|
||||
ret += load_toml_integer_config(toml_file, "session_manager.tcp_reassembly.buffered_segments_max", (uint64_t *)&sess_mgr_cfg->tcp_reassembly.buffered_segments_max, 1, 512);
|
||||
|
||||
if (ret != 0)
|
||||
{
|
||||
@@ -1066,9 +1066,6 @@ void session_manager_runtime_free_session(struct session_manager_runtime *sess_m
|
||||
{
|
||||
if (sess)
|
||||
{
|
||||
struct exdata_runtime *exdata_rt = (struct exdata_runtime *)session_get_user_data(sess);
|
||||
exdata_runtime_free(exdata_rt);
|
||||
|
||||
SESSION_MANAGER_RUNTIME_LOG_DEBUG("session %lu closed (%s)", session_get_id(sess), closing_reason_to_str(session_get_closing_reason(sess)));
|
||||
SESSION_MANAGER_LOG_DEBUG("session %lu closed (%s)", session_get_id(sess), closing_reason_to_str(session_get_closing_reason(sess)));
|
||||
|
||||
@@ -1353,7 +1350,7 @@ uint64_t session_manager_runtime_scan(const struct session_manager_runtime *sess
|
||||
{
|
||||
return mached_sess_num;
|
||||
}
|
||||
capacity = session_pool_capacity_size(sess_mgr_rt->sess_pool);
|
||||
capacity = sess_mgr_rt->cfg.tcp_session_max + sess_mgr_rt->cfg.udp_session_max;
|
||||
if (opts->cursor >= capacity)
|
||||
{
|
||||
return mached_sess_num;
|
||||
|
||||
Reference in New Issue
Block a user