session manager: support session timeouts & tcp dupkt filter & udp eviction filter

This commit is contained in:
luwenpeng
2024-01-17 11:47:55 +08:00
parent 1d4736ac88
commit 4fbafab4e3
20 changed files with 972 additions and 607 deletions

View File

@@ -3,10 +3,7 @@
###############################################################################
add_library(dupkt_filter dupkt_filter.cpp)
target_include_directories(dupkt_filter PUBLIC ${CMAKE_SOURCE_DIR}/src/dupkt_filter)
target_include_directories(dupkt_filter PUBLIC ${CMAKE_SOURCE_DIR}/src/packet)
target_include_directories(dupkt_filter PUBLIC ${CMAKE_SOURCE_DIR}/src/timestamp)
target_include_directories(dupkt_filter PUBLIC ${CMAKE_SOURCE_DIR}/deps/dablooms)
target_include_directories(dupkt_filter PUBLIC ${CMAKE_CURRENT_LIST_DIR})
target_link_libraries(dupkt_filter packet timestamp dablooms)
add_subdirectory(test)

View File

@@ -3,10 +3,7 @@
###############################################################################
add_library(eviction_filter eviction_filter.cpp)
target_include_directories(eviction_filter PUBLIC ${CMAKE_SOURCE_DIR}/src/eviction_filter)
target_include_directories(eviction_filter PUBLIC ${CMAKE_SOURCE_DIR}/src/packet)
target_include_directories(eviction_filter PUBLIC ${CMAKE_SOURCE_DIR}/src/timestamp)
target_include_directories(eviction_filter PUBLIC ${CMAKE_SOURCE_DIR}/deps/dablooms)
target_include_directories(eviction_filter PUBLIC ${CMAKE_CURRENT_LIST_DIR})
target_link_libraries(eviction_filter packet timestamp dablooms)
add_subdirectory(test)

View File

@@ -3,8 +3,7 @@
###############################################################################
add_library(packet packet.cpp)
target_include_directories(packet PUBLIC ${CMAKE_SOURCE_DIR}/src/packet)
target_include_directories(packet PUBLIC ${CMAKE_SOURCE_DIR}/src/tuple)
target_include_directories(packet PUBLIC ${CMAKE_CURRENT_LIST_DIR})
target_include_directories(packet PUBLIC ${CMAKE_SOURCE_DIR}/deps/uthash)
target_link_libraries(packet tuple)

View File

@@ -6,20 +6,16 @@ add_executable(gtest_packet gtest_packet.cpp)
target_link_libraries(gtest_packet packet gtest)
add_executable(gtest_udp_helpers gtest_udp_helpers.cpp)
target_include_directories(gtest_udp_helpers PUBLIC ${CMAKE_SOURCE_DIR}/src/packet)
target_link_libraries(gtest_udp_helpers gtest)
target_link_libraries(gtest_udp_helpers packet gtest)
add_executable(gtest_tcp_helpers gtest_tcp_helpers.cpp)
target_include_directories(gtest_tcp_helpers PUBLIC ${CMAKE_SOURCE_DIR}/src/packet)
target_link_libraries(gtest_tcp_helpers gtest)
target_link_libraries(gtest_tcp_helpers packet gtest)
add_executable(gtest_ipv4_helpers gtest_ipv4_helpers.cpp)
target_include_directories(gtest_ipv4_helpers PUBLIC ${CMAKE_SOURCE_DIR}/src/packet)
target_link_libraries(gtest_ipv4_helpers gtest)
target_link_libraries(gtest_ipv4_helpers packet gtest)
add_executable(gtest_ipv6_helpers gtest_ipv6_helpers.cpp)
target_include_directories(gtest_ipv6_helpers PUBLIC ${CMAKE_SOURCE_DIR}/src/packet)
target_link_libraries(gtest_ipv6_helpers gtest)
target_link_libraries(gtest_ipv6_helpers packet gtest)
add_executable(gtest_packet_helpers gtest_packet_helpers.cpp)
target_link_libraries(gtest_packet_helpers packet gtest)

View File

@@ -10,12 +10,7 @@ add_library(session_manager
session_queue.cpp
session_manager.cpp
)
target_include_directories(session_manager PUBLIC ${CMAKE_SOURCE_DIR}/deps/uthash)
target_include_directories(session_manager PUBLIC ${CMAKE_SOURCE_DIR}/deps/timeout)
target_include_directories(session_manager PUBLIC ${CMAKE_SOURCE_DIR}/src/packet)
target_include_directories(session_manager PUBLIC ${CMAKE_SOURCE_DIR}/src/session)
target_include_directories(session_manager PUBLIC ${CMAKE_SOURCE_DIR}/src/timestamp)
target_include_directories(session_manager PUBLIC ${CMAKE_SOURCE_DIR}/src/tuple)
target_link_libraries(session_manager timeout timestamp tuple packet)
target_include_directories(session_manager PUBLIC ${CMAKE_CURRENT_LIST_DIR})
target_link_libraries(session_manager timeout dupkt_filter eviction_filter)
add_subdirectory(test)

View File

@@ -46,7 +46,7 @@ uint64_t session_get_id(const struct session *sess)
}
// session tuple6
void session_set_tuple6(struct session *sess, struct tuple6 *tuple)
void session_set_tuple6(struct session *sess, const struct tuple6 *tuple)
{
memcpy(&sess->tuple, tuple, sizeof(struct tuple6));
}
@@ -88,6 +88,17 @@ enum session_type session_get_type(const struct session *sess)
return sess->type;
}
// session dup traffic flag
void session_set_dup_traffic_flag(struct session *sess, enum dup_traffic_flag flag)
{
sess->dup_flag = flag;
}
enum dup_traffic_flag session_get_dup_traffic_flag(const struct session *sess)
{
return sess->dup_flag;
}
// closing reasion
void session_set_closing_reasion(struct session *sess, enum closing_reasion reasion)
{
@@ -349,14 +360,14 @@ static void tcp_ex_data_tostring(uint64_t ex_data, char *buffer, size_t buffer_l
nused += snprintf(buffer + nused, buffer_len - nused, "TCP_SYNACK_RECVED ");
}
if (ex_data & TCP_C2S_PAYLOAD_RECVED)
if (ex_data & TCP_C2S_DATA_RECVED)
{
nused += snprintf(buffer + nused, buffer_len - nused, "TCP_C2S_PAYLOAD_RECVED ");
nused += snprintf(buffer + nused, buffer_len - nused, "TCP_C2S_DATA_RECVED ");
}
if (ex_data & TCP_S2C_PAYLOAD_RECVED)
if (ex_data & TCP_S2C_DATA_RECVED)
{
nused += snprintf(buffer + nused, buffer_len - nused, "TCP_S2C_PAYLOAD_RECVED ");
nused += snprintf(buffer + nused, buffer_len - nused, "TCP_S2C_DATA_RECVED ");
}
if (ex_data & TCP_C2S_FIN_RECVED)
@@ -404,17 +415,17 @@ const char *session_closing_reasion_tostring(enum closing_reasion reasion)
switch (reasion)
{
case CLOSING_BY_TIMEOUT:
return "CLOSING_BY_TIMEOUT";
return "closing_by_timeout";
case CLOSING_BY_EVICTED:
return "CLOSING_BY_EVICTED";
return "closing_by_evicted";
case CLOSING_BY_CLIENT_FIN:
return "CLOSING_BY_CLIENT_FIN";
return "closing_by_client_fin";
case CLOSING_BY_CLIENT_RST:
return "CLOSING_BY_CLIENT_RST";
return "closing_by_client_rst";
case CLOSING_BY_SERVER_FIN:
return "CLOSING_BY_SERVER_FIN";
return "closing_by_server_fin";
case CLOSING_BY_SERVER_RST:
return "CLOSING_BY_SERVER_RST";
return "closing_by_server_rst";
default:
return "unknown";
}
@@ -424,8 +435,6 @@ const char *session_state_tostring(enum session_state state)
{
switch (state)
{
case SESSION_STATE_INIT:
return "init";
case SESSION_STATE_OPENING:
return "opening";
case SESSION_STATE_ACTIVE:
@@ -456,8 +465,6 @@ const char *session_dir_tostring(enum session_dir dir)
{
switch (dir)
{
case SESSION_DIR_NONE:
return "none";
case SESSION_DIR_C2S:
return "c2s";
case SESSION_DIR_S2C:

View File

@@ -13,11 +13,10 @@ extern "C"
enum session_state
{
SESSION_STATE_INIT = 0,
SESSION_STATE_OPENING,
SESSION_STATE_ACTIVE,
SESSION_STATE_CLOSING,
SESSION_STATE_CLOSED,
SESSION_STATE_OPENING = 0x1,
SESSION_STATE_ACTIVE = 0x2,
SESSION_STATE_CLOSING = 0x3,
SESSION_STATE_CLOSED = 0x4,
};
enum session_type
@@ -28,19 +27,25 @@ enum session_type
enum session_dir
{
SESSION_DIR_NONE = 0,
SESSION_DIR_C2S = 1,
SESSION_DIR_S2C = 2,
SESSION_DIR_NONE = 0x0,
SESSION_DIR_C2S = 0x1,
SESSION_DIR_S2C = 0x2,
};
enum dup_traffic_flag
{
DUP_TRAFFIC_YES = 0x1,
DUP_TRAFFIC_NO = 0x2,
};
enum closing_reasion
{
CLOSING_BY_TIMEOUT = 1,
CLOSING_BY_EVICTED = 2,
CLOSING_BY_CLIENT_FIN = 3,
CLOSING_BY_CLIENT_RST = 4,
CLOSING_BY_SERVER_FIN = 5,
CLOSING_BY_SERVER_RST = 6,
CLOSING_BY_TIMEOUT = 0x1,
CLOSING_BY_EVICTED = 0x2,
CLOSING_BY_CLIENT_FIN = 0x3,
CLOSING_BY_CLIENT_RST = 0x4,
CLOSING_BY_SERVER_FIN = 0x5,
CLOSING_BY_SERVER_RST = 0x6,
};
struct session;
@@ -56,7 +61,7 @@ void session_set_id(struct session *sess, uint64_t id);
uint64_t session_get_id(const struct session *sess);
// session key
void session_set_tuple6(struct session *sess, struct tuple6 *tuple);
void session_set_tuple6(struct session *sess, const struct tuple6 *tuple);
const struct tuple6 *session_get0_tuple6(const struct session *sess);
void session_set_tuple6_dir(struct session *sess, enum session_dir dir);
enum session_dir session_get_tuple6_dir(const struct session *sess);
@@ -69,6 +74,10 @@ enum session_state session_get_state(const struct session *sess);
void session_set_type(struct session *sess, enum session_type type);
enum session_type session_get_type(const struct session *sess);
// session dup traffic flag
void session_set_dup_traffic_flag(struct session *sess, enum dup_traffic_flag flag);
enum dup_traffic_flag session_get_dup_traffic_flag(const struct session *sess);
// closing reasion
void session_set_closing_reasion(struct session *sess, enum closing_reasion reasion);
enum closing_reasion session_get_closing_reasion(const struct session *sess);

File diff suppressed because it is too large Load Diff

View File

@@ -8,32 +8,65 @@ extern "C"
#include "session.h"
// #define SESSION_LOG_ERROR(format, ...) void(0)
#ifndef SESSION_LOG_ERROR
#define SESSION_LOG_ERROR(format, ...) \
fprintf(stderr, "ERROR (session), " format "\n", ##__VA_ARGS__);
#endif
// #define SESSION_LOG_DEBUG(format, ...) void(0)
#ifndef SESSION_LOG_DEBUG
#define SESSION_LOG_DEBUG(format, ...) \
fprintf(stderr, "DEBUG (session), " format "\n", ##__VA_ARGS__);
#endif
struct session_manager_config
{
// max session number
uint64_t max_tcp_session_num;
uint64_t max_udp_session_num;
// TCP timeout
uint64_t tcp_timeout_init; // seconds, Range: 1-60
uint64_t tcp_timeout_handshake; // seconds, Range: 1-60
uint64_t tcp_timeout_data; // seconds, Range: 1-15,999,999
uint64_t tcp_timeout_half_closed; // seconds, Range: 1-604,800
uint64_t tcp_timeout_discard; // seconds, Range: 1-15,999,999
// UDP timeout
uint64_t udp_timeout_data; // seconds, Range: 1-15,999,999
// TCP duplicate packet filter
uint8_t tcp_dupkt_filter_enable;
uint64_t tcp_dupkt_filter_capacity;
uint64_t tcp_dupkt_filter_timeout; // seconds, Range: 1-60
double tcp_dupkt_filter_error_rate;
// UDP eviction filter
uint8_t udp_eviction_filter_enable;
uint64_t udp_eviction_filter_capacity;
uint64_t udp_eviction_filter_timeout; // seconds, Range: 1-60
double udp_eviction_filter_error_rate;
};
struct session_manager;
struct session_manager *session_manager_create(uint64_t max_session_num);
struct session_manager *session_manager_create(struct session_manager_config *config);
void session_manager_destroy(struct session_manager *mgr);
void session_manager_set_timeout_toclosing(struct session_manager *mgr, uint64_t timeout_ms);
void session_manager_set_timeout_toclosed(struct session_manager *mgr, uint64_t timeout_ms);
struct session *session_manager_lookup(struct session_manager *mgr, const struct packet *pkt);
// return null: invalid tuple6 or tcp first packet is not syn
struct session *session_manager_update(struct session_manager *mgr, const struct packet *pkt);
struct session *session_manager_expire(struct session_manager *mgr);
struct session *session_manager_evicte(struct session_manager *mgr);
// for debug
uint64_t session_manager_get_sessions(struct session_manager *mgr, enum session_type type, enum session_state state);
// only use the packet six-tuple to find the session, not update it
struct session *session_manager_lookup_sesssion(struct session_manager *mgr, const struct packet *pkt);
/*
* Return NULL in the following cases:
* 1.not a TCP or UDP packet
* 2.TCP packet miss session but no syn packet seen
* 3.TCP duplicate packet
* 4.TCP discards packets
* 5.UDP evict packet
* pakcet will not update the session and needs to be fast forwarded
*/
struct session *session_manager_update_session(struct session_manager *mgr, const struct packet *pkt);
struct session *session_manager_get_expired_session(struct session_manager *mgr);
struct session *session_manager_get_evicted_session(struct session_manager *mgr);
// return interval (seconds) to next required update, return 0 if no session
uint64_t session_manager_get_expire_interval(struct session_manager *mgr);
uint64_t session_manager_get_session_number(struct session_manager *mgr, enum session_type type, enum session_state state);
#ifdef __cpluscplus
}

View File

@@ -22,8 +22,8 @@ enum tcp_ex_data
TCP_SYN_RECVED = 1 << 0,
TCP_SYNACK_RECVED = 1 << 1,
// ESTABLISHED
TCP_C2S_PAYLOAD_RECVED = 1 << 2,
TCP_S2C_PAYLOAD_RECVED = 1 << 3,
TCP_C2S_DATA_RECVED = 1 << 2,
TCP_S2C_DATA_RECVED = 1 << 3,
// FIN
TCP_C2S_FIN_RECVED = 1 << 4,
TCP_S2C_FIN_RECVED = 1 << 5,
@@ -49,6 +49,9 @@ struct session
// session type
enum session_type type;
// dup traffic flag
enum dup_traffic_flag dup_flag;
// closing reasion
enum closing_reasion closing_reasion;

View File

@@ -172,7 +172,7 @@ int session_table_add_session(struct session_table *table, const struct tuple6 *
return 0;
}
void session_table_delete_session(struct session_table *table, const struct tuple6 *tuple)
void session_table_del_session(struct session_table *table, const struct tuple6 *tuple)
{
if (table == NULL)
{

View File

@@ -18,7 +18,7 @@ void session_table_set_freecb(struct session_table *table, session_free_cb free_
// return 0: success
// return -1: failed
int session_table_add_session(struct session_table *table, const struct tuple6 *tuple, struct session *sess);
void session_table_delete_session(struct session_table *table, const struct tuple6 *tuple);
void session_table_del_session(struct session_table *table, const struct tuple6 *tuple);
struct session *session_table_find_session(struct session_table *table, const struct tuple6 *tuple);
struct session *session_table_find_least_recently_unused_session(struct session_table *table);
struct session *session_table_find_least_recently_used_session(struct session_table *table);

View File

@@ -67,3 +67,9 @@ struct session *session_timer_expire_session(struct session_timer *timer, uint64
struct session *sess = (struct session *)timeout->callback.arg;
return sess;
}
// return interval to next required update, return 0 if no timer
uint64_t session_timer_next_expire_interval(struct session_timer *timer)
{
return timeouts_timeout(timer->timeouts);
}

View File

@@ -18,6 +18,8 @@ void session_timer_del_session(struct session_timer *timer, struct session *sess
* if return session, the session will be removed from timer.
*/
struct session *session_timer_expire_session(struct session_timer *timer, uint64_t abs_current_ts);
// return interval to next required update, return 0 if no timer
uint64_t session_timer_next_expire_interval(struct session_timer *timer);
#ifdef __cpluscplus
}

View File

@@ -454,7 +454,7 @@ TEST(SESSION_MANAGER, OPENING_TO_ACTIVE_BY_SYN_AND_C2S_PAYLOAD)
EXPECT_TRUE(session_get_create_time(sess) == session_get_last_time(sess));
EXPECT_TRUE(session_get0_cur_pkt(sess) == &pkt);
EXPECT_TRUE(session_get_cur_dir(sess) == SESSION_DIR_C2S);
EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_C2S_PAYLOAD_RECVED));
EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_C2S_DATA_RECVED));
// check session manager info
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_TCP, SESSION_STATE_OPENING) == 0);
@@ -582,7 +582,7 @@ TEST(SESSION_MANAGER, OPENING_TO_ACTIVE_BY_SYN_AND_S2C_PAYLOAD)
EXPECT_TRUE(session_get_create_time(sess) == session_get_last_time(sess));
EXPECT_TRUE(session_get0_cur_pkt(sess) == &pkt);
EXPECT_TRUE(session_get_cur_dir(sess) == SESSION_DIR_S2C);
EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_S2C_PAYLOAD_RECVED));
EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_S2C_DATA_RECVED));
// check session manager info
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_TCP, SESSION_STATE_OPENING) == 0);
@@ -708,7 +708,7 @@ TEST(SESSION_MANAGER, OPENING_TO_ACTIVE_BY_SYNACK_AND_C2S_PAYLOAD)
EXPECT_TRUE(session_get_create_time(sess) == session_get_last_time(sess));
EXPECT_TRUE(session_get0_cur_pkt(sess) == &pkt);
EXPECT_TRUE(session_get_cur_dir(sess) == SESSION_DIR_C2S);
EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYNACK_RECVED | TCP_C2S_PAYLOAD_RECVED));
EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYNACK_RECVED | TCP_C2S_DATA_RECVED));
// check session manager info
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_TCP, SESSION_STATE_OPENING) == 0);
@@ -835,7 +835,7 @@ TEST(SESSION_MANAGER, OPENING_TO_ACTIVE_BY_SYNACK_AND_S2C_PAYLOAD)
EXPECT_TRUE(session_get_create_time(sess) == session_get_last_time(sess));
EXPECT_TRUE(session_get0_cur_pkt(sess) == &pkt);
EXPECT_TRUE(session_get_cur_dir(sess) == SESSION_DIR_S2C);
EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYNACK_RECVED | TCP_S2C_PAYLOAD_RECVED));
EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYNACK_RECVED | TCP_S2C_DATA_RECVED));
// check session manager info
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_TCP, SESSION_STATE_OPENING) == 0);
@@ -1043,7 +1043,7 @@ TEST(SESSION_MANAGER, OPENING_TO_ACTIVE_BY_SYN_AND_SYNACK_AND_ACK_AND_C2S_PAYLOA
EXPECT_TRUE(session_get_create_time(sess) == session_get_last_time(sess));
EXPECT_TRUE(session_get0_cur_pkt(sess) == &pkt);
EXPECT_TRUE(session_get_cur_dir(sess) == SESSION_DIR_C2S);
EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_SYNACK_RECVED | TCP_C2S_PAYLOAD_RECVED));
EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_SYNACK_RECVED | TCP_C2S_DATA_RECVED));
// check session manager info
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_TCP, SESSION_STATE_OPENING) == 0);
@@ -1346,7 +1346,7 @@ TEST(SESSION_MANAGER, TCP_FULL_STREAM)
EXPECT_TRUE(session_get_create_time(sess) == session_get_last_time(sess));
EXPECT_TRUE(session_get0_cur_pkt(sess) == &pkt);
EXPECT_TRUE(session_get_cur_dir(sess) == SESSION_DIR_C2S);
EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_SYNACK_RECVED | TCP_C2S_PAYLOAD_RECVED));
EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_SYNACK_RECVED | TCP_C2S_DATA_RECVED));
// check session manager info
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_TCP, SESSION_STATE_OPENING) == 0);
@@ -1384,7 +1384,7 @@ TEST(SESSION_MANAGER, TCP_FULL_STREAM)
EXPECT_TRUE(session_get_create_time(sess) == session_get_last_time(sess));
EXPECT_TRUE(session_get0_cur_pkt(sess) == &pkt);
EXPECT_TRUE(session_get_cur_dir(sess) == SESSION_DIR_S2C);
EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_SYNACK_RECVED | TCP_C2S_PAYLOAD_RECVED));
EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_SYNACK_RECVED | TCP_C2S_DATA_RECVED));
// check session manager info
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_TCP, SESSION_STATE_OPENING) == 0);
@@ -1422,7 +1422,7 @@ TEST(SESSION_MANAGER, TCP_FULL_STREAM)
EXPECT_TRUE(session_get_create_time(sess) == session_get_last_time(sess));
EXPECT_TRUE(session_get0_cur_pkt(sess) == &pkt);
EXPECT_TRUE(session_get_cur_dir(sess) == SESSION_DIR_S2C);
EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_SYNACK_RECVED | TCP_C2S_PAYLOAD_RECVED | TCP_S2C_PAYLOAD_RECVED));
EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_SYNACK_RECVED | TCP_C2S_DATA_RECVED | TCP_S2C_DATA_RECVED));
// check session manager info
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_TCP, SESSION_STATE_OPENING) == 0);
@@ -1460,7 +1460,7 @@ TEST(SESSION_MANAGER, TCP_FULL_STREAM)
EXPECT_TRUE(session_get_create_time(sess) == session_get_last_time(sess));
EXPECT_TRUE(session_get0_cur_pkt(sess) == &pkt);
EXPECT_TRUE(session_get_cur_dir(sess) == SESSION_DIR_S2C);
EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_SYNACK_RECVED | TCP_C2S_PAYLOAD_RECVED | TCP_S2C_PAYLOAD_RECVED));
EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_SYNACK_RECVED | TCP_C2S_DATA_RECVED | TCP_S2C_DATA_RECVED));
// check session manager info
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_TCP, SESSION_STATE_OPENING) == 0);
@@ -1498,7 +1498,7 @@ TEST(SESSION_MANAGER, TCP_FULL_STREAM)
EXPECT_TRUE(session_get_create_time(sess) == session_get_last_time(sess));
EXPECT_TRUE(session_get0_cur_pkt(sess) == &pkt);
EXPECT_TRUE(session_get_cur_dir(sess) == SESSION_DIR_C2S);
EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_SYNACK_RECVED | TCP_C2S_PAYLOAD_RECVED | TCP_S2C_PAYLOAD_RECVED));
EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_SYNACK_RECVED | TCP_C2S_DATA_RECVED | TCP_S2C_DATA_RECVED));
// check session manager info
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_TCP, SESSION_STATE_OPENING) == 0);
@@ -1536,7 +1536,7 @@ TEST(SESSION_MANAGER, TCP_FULL_STREAM)
EXPECT_TRUE(session_get_create_time(sess) == session_get_last_time(sess));
EXPECT_TRUE(session_get0_cur_pkt(sess) == &pkt);
EXPECT_TRUE(session_get_cur_dir(sess) == SESSION_DIR_C2S);
EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_SYNACK_RECVED | TCP_C2S_PAYLOAD_RECVED | TCP_S2C_PAYLOAD_RECVED | TCP_C2S_FIN_RECVED));
EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_SYNACK_RECVED | TCP_C2S_DATA_RECVED | TCP_S2C_DATA_RECVED | TCP_C2S_FIN_RECVED));
// check session manager info
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_TCP, SESSION_STATE_OPENING) == 0);
@@ -1574,7 +1574,7 @@ TEST(SESSION_MANAGER, TCP_FULL_STREAM)
EXPECT_TRUE(session_get_create_time(sess) == session_get_last_time(sess));
EXPECT_TRUE(session_get0_cur_pkt(sess) == &pkt);
EXPECT_TRUE(session_get_cur_dir(sess) == SESSION_DIR_S2C);
EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_SYNACK_RECVED | TCP_C2S_PAYLOAD_RECVED | TCP_S2C_PAYLOAD_RECVED | TCP_C2S_FIN_RECVED | TCP_S2C_FIN_RECVED));
EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_SYNACK_RECVED | TCP_C2S_DATA_RECVED | TCP_S2C_DATA_RECVED | TCP_C2S_FIN_RECVED | TCP_S2C_FIN_RECVED));
// check session manager info
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_TCP, SESSION_STATE_OPENING) == 0);
@@ -1612,7 +1612,7 @@ TEST(SESSION_MANAGER, TCP_FULL_STREAM)
EXPECT_TRUE(session_get_create_time(sess) == session_get_last_time(sess));
EXPECT_TRUE(session_get0_cur_pkt(sess) == NULL);
EXPECT_TRUE(session_get_cur_dir(sess) == SESSION_DIR_NONE);
EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_SYNACK_RECVED | TCP_C2S_PAYLOAD_RECVED | TCP_S2C_PAYLOAD_RECVED | TCP_C2S_FIN_RECVED | TCP_S2C_FIN_RECVED));
EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_SYNACK_RECVED | TCP_C2S_DATA_RECVED | TCP_S2C_DATA_RECVED | TCP_C2S_FIN_RECVED | TCP_S2C_FIN_RECVED));
// check session manager info
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_TCP, SESSION_STATE_OPENING) == 0);

View File

@@ -106,17 +106,17 @@ TEST(SESSION_TABLE, OP_SESSION)
EXPECT_TRUE(session_table_find_session(sess_table, &reversed_tuple_3) == sess3);
// Delete
session_table_delete_session(sess_table, &tuple_1);
session_table_del_session(sess_table, &tuple_1);
EXPECT_TRUE(session_table_get_count(sess_table) == 2);
EXPECT_TRUE(session_table_find_session(sess_table, &tuple_1) == NULL);
EXPECT_TRUE(session_table_find_session(sess_table, &reversed_tuple_1) == NULL);
session_table_delete_session(sess_table, &reversed_tuple_2);
session_table_del_session(sess_table, &reversed_tuple_2);
EXPECT_TRUE(session_table_get_count(sess_table) == 1);
EXPECT_TRUE(session_table_find_session(sess_table, &tuple_2) == NULL);
EXPECT_TRUE(session_table_find_session(sess_table, &reversed_tuple_2) == NULL);
session_table_delete_session(sess_table, &tuple_3);
session_table_del_session(sess_table, &tuple_3);
EXPECT_TRUE(session_table_get_count(sess_table) == 0);
EXPECT_TRUE(session_table_find_session(sess_table, &tuple_3) == NULL);
EXPECT_TRUE(session_table_find_session(sess_table, &reversed_tuple_3) == NULL);
@@ -176,15 +176,15 @@ TEST(SESSION_TABLE, FIND_OLDEST_NEWEST)
// Delete Session
session_table_delete_session(sess_table, &tuple_1);
session_table_del_session(sess_table, &tuple_1);
EXPECT_TRUE(session_table_find_least_recently_unused_session(sess_table) == sess2);
EXPECT_TRUE(session_table_find_least_recently_used_session(sess_table) == sess3);
session_table_delete_session(sess_table, &tuple_2);
session_table_del_session(sess_table, &tuple_2);
EXPECT_TRUE(session_table_find_least_recently_unused_session(sess_table) == sess3);
EXPECT_TRUE(session_table_find_least_recently_used_session(sess_table) == sess3);
session_table_delete_session(sess_table, &tuple_3);
session_table_del_session(sess_table, &tuple_3);
EXPECT_TRUE(session_table_find_least_recently_unused_session(sess_table) == NULL);
EXPECT_TRUE(session_table_find_least_recently_used_session(sess_table) == NULL);

View File

@@ -1,7 +1,4 @@
add_executable(stellar stellar.cpp)
target_link_libraries(stellar session_manager)
target_link_libraries(stellar dupkt_filter)
target_link_libraries(stellar eviction_filter)
target_link_libraries(stellar pthread)
target_link_libraries(stellar session_manager pthread)
install(TARGETS stellar RUNTIME DESTINATION bin COMPONENT Program)

View File

@@ -10,8 +10,6 @@
#include "packet.h"
#include "timestamp.h"
#include "session_manager.h"
#include "dupkt_filter.h"
#include "eviction_filter.h"
#ifndef STELLAR_LOG_ERROR
#define STELLAR_LOG_ERROR(format, ...) \
@@ -32,8 +30,6 @@ struct thread_ctx
uint64_t need_exit;
uint64_t is_runing;
struct session_manager *sess_mgr;
struct dupkt_filter *dupkt_filter;
struct eviction_filter *eviction_filter;
};
struct stellar_ctx
@@ -41,48 +37,16 @@ struct stellar_ctx
uint64_t need_exit;
uint16_t max_worker_num;
// session manager
uint64_t sess_mgr_max_session_num;
uint64_t sess_mgr_timeout_ms_toclsoing;
uint64_t sess_mgr_timeout_ms_toclosed;
struct session_manager_config sess_mgr_cfg;
// duplicated packet filter
uint8_t dupkt_filter_enable;
unsigned int dupkt_filter_capacity;
double dupkt_filter_error_rate;
int dupkt_filter_timeout_s;
// eviction filter
uint8_t eviction_filter_enable;
unsigned int eviction_filter_capacity;
double eviction_filter_error_rate;
int eviction_filter_timeout_s;
// thread
struct thread_ctx thread_ctx[128];
} g_stellar_ctx = {
.need_exit = 0,
.max_worker_num = 1,
// session manager
.sess_mgr_max_session_num = 1000000,
.sess_mgr_timeout_ms_toclsoing = 1000,
.sess_mgr_timeout_ms_toclosed = 10000,
// duplicated packet filter
.dupkt_filter_enable = 1,
.dupkt_filter_capacity = 1000000,
.dupkt_filter_error_rate = 0.0001,
.dupkt_filter_timeout_s = 10,
// eviction filter
.eviction_filter_enable = 1,
.eviction_filter_capacity = 1000000,
.eviction_filter_error_rate = 0.0001,
.eviction_filter_timeout_s = 10,
// TODO session manager config
};
// TODO
static int recv_packet(const char **data)
{
static unsigned char packet_data[] = {
@@ -96,6 +60,10 @@ static int recv_packet(const char **data)
return sizeof(packet_data);
}
static void send_packet(const char *data, uint16_t len)
{
}
static void signal_handler(int signo)
{
if (signo == SIGINT)
@@ -119,11 +87,6 @@ static void signal_handler(int signo)
static void __session_dispatch(struct session *sess)
{
if (sess == NULL)
{
return;
}
printf("\n");
printf("=> session dispatch: %p\n", sess);
session_dump(sess);
@@ -141,17 +104,9 @@ static void thread_ctx_init(struct stellar_ctx *ctx)
thd_ctx->index = i;
thd_ctx->need_exit = 0;
thd_ctx->is_runing = 0;
// session manager
thd_ctx->sess_mgr = session_manager_create(ctx->sess_mgr_max_session_num);
thd_ctx->sess_mgr = session_manager_create(&ctx->sess_mgr_cfg);
assert(thd_ctx->sess_mgr != NULL);
session_manager_set_timeout_toclosing(thd_ctx->sess_mgr, ctx->sess_mgr_timeout_ms_toclsoing);
session_manager_set_timeout_toclosed(thd_ctx->sess_mgr, ctx->sess_mgr_timeout_ms_toclosed);
// duplicated packet filter
thd_ctx->dupkt_filter = dupkt_filter_create(ctx->dupkt_filter_enable, ctx->dupkt_filter_capacity, ctx->dupkt_filter_error_rate, ctx->dupkt_filter_timeout_s);
assert(thd_ctx->dupkt_filter != NULL);
// eviction filter
thd_ctx->eviction_filter = eviction_filter_create(ctx->eviction_filter_enable, ctx->eviction_filter_capacity, ctx->eviction_filter_error_rate, ctx->eviction_filter_timeout_s);
assert(thd_ctx->eviction_filter != NULL);
}
}
@@ -175,7 +130,6 @@ static void *thread_cycle(void *arg)
struct session *sess = NULL;
struct thread_ctx *thd_ctx = (struct thread_ctx *)arg;
struct session_manager *sess_mgr = thd_ctx->sess_mgr;
struct dupkt_filter *dupkt_filter = thd_ctx->dupkt_filter;
char thread_name[16];
ATOMIC_SET(&thd_ctx->is_runing, 1);
@@ -185,60 +139,38 @@ static void *thread_cycle(void *arg)
while (ATOMIC_READ(&thd_ctx->need_exit) == 0)
{
// TODO recv packet
len = recv_packet(&data);
if (data == NULL)
{
goto poll_wait;
}
// parse packet
packet_parse(&pkt, data, len);
// duplicated packet filter
if (dupkt_filter_lookup(dupkt_filter, &pkt) == 0)
{
STELLAR_LOG_DEBUG("duplicated packet, forward it");
goto fast_forward;
}
// eviction filter
if (eviction_filter_lookup(thd_ctx->eviction_filter, &pkt) == 0)
{
STELLAR_LOG_DEBUG("eviction packet, forward it");
goto fast_forward;
}
// update session
sess = session_manager_update(sess_mgr, &pkt);
sess = session_manager_update_session(sess_mgr, &pkt);
if (sess == NULL)
{
goto fast_forward;
}
// TODO session synchronization
__session_dispatch(sess);
dupkt_filter_add(dupkt_filter, &pkt);
sess = session_manager_get_evicted_session(sess_mgr);
if (sess)
{
__session_dispatch(sess);
}
fast_forward:
// TODO send packet
send_packet(data, len);
// dispatch expire session
sess = session_manager_expire(sess_mgr);
if (sess && session_get_type(sess) == SESSION_TYPE_UDP && session_get_state(sess) == SESSION_STATE_CLOSING)
sess = session_manager_get_expired_session(sess_mgr);
if (sess)
{
const struct packet *sess_1st_pkt = session_get0_1st_pkt(sess);
eviction_filter_add(thd_ctx->eviction_filter, sess_1st_pkt);
__session_dispatch(sess);
}
__session_dispatch(sess);
continue;
sess = session_manager_evicte(sess_mgr);
if (sess && session_get_type(sess) == SESSION_TYPE_UDP)
{
const struct packet *sess_1st_pkt = session_get0_1st_pkt(sess);
eviction_filter_add(thd_ctx->eviction_filter, sess_1st_pkt);
}
__session_dispatch(sess);
// TODO get next timeout
sleep(1);
poll_wait:
sleep(session_manager_get_expire_interval(sess_mgr));
}
ATOMIC_SET(&thd_ctx->is_runing, 0);
@@ -255,18 +187,14 @@ int main(int argc, char **argv)
// TODO init plugin
// register signal handler
signal(SIGINT, signal_handler);
signal(SIGQUIT, signal_handler);
signal(SIGTERM, signal_handler);
// update timestamp
timestamp_update();
// init thread context
thread_ctx_init(&g_stellar_ctx);
// create worker thread
for (uint16_t i = 0; i < g_stellar_ctx.max_worker_num; i++)
{
struct thread_ctx *thd_ctx = &g_stellar_ctx.thread_ctx[i];
@@ -277,14 +205,12 @@ int main(int argc, char **argv)
}
}
// master thread update timestamp
while (!g_stellar_ctx.need_exit)
{
timestamp_update();
sleep(1);
}
// wait worker thread exit
for (uint16_t i = 0; i < g_stellar_ctx.max_worker_num; i++)
{
struct thread_ctx *thd_ctx = &g_stellar_ctx.thread_ctx[i];

View File

@@ -3,7 +3,7 @@
###############################################################################
add_library(timestamp timestamp.cpp)
target_include_directories(timestamp PUBLIC ${CMAKE_SOURCE_DIR}/src/timestamp)
target_include_directories(timestamp PUBLIC ${CMAKE_CURRENT_LIST_DIR})
target_link_libraries(timestamp)
add_subdirectory(test)

View File

@@ -3,7 +3,7 @@
###############################################################################
add_library(tuple tuple.cpp)
target_include_directories(tuple PUBLIC ${CMAKE_SOURCE_DIR}/src/tuple)
target_include_directories(tuple PUBLIC ${CMAKE_CURRENT_LIST_DIR})
target_include_directories(tuple PUBLIC ${CMAKE_SOURCE_DIR}/src/crc32)
target_link_libraries(tuple)