built snowflake into the session manager

This commit is contained in:
luwenpeng
2024-09-02 15:04:55 +08:00
parent e2f2823b91
commit 9069dceae7
36 changed files with 137 additions and 509 deletions

View File

@@ -1,6 +1,5 @@
[snowflake]
snowflake_base = 1 # [0, 31]
snowflake_offset = 2 # [0, 127]
[instance]
id = 1 # range: [0, 4095] (12 bit)
[packet_io]
mode = "pcapfile" # pcapfile, pcaplist, marsio

View File

@@ -1,4 +1,4 @@
set(INFRA tuple packet_parser packet_io snowflake ip_reassembly tcp_reassembly session_manager plugin_manager core)
set(INFRA tuple packet_parser packet_io ip_reassembly tcp_reassembly session_manager plugin_manager core)
set(DEPS bitmap dablooms interval_tree logger nmx_pool rbtree timeout toml)
set(DECODERS http lpi)
set(WHOLE_ARCHIVE ${DEPS} ${INFRA} ${DECODERS})

View File

@@ -1,3 +1,3 @@
add_library(core stellar_config.c stellar_stat.c stellar_core.c)
target_link_libraries(core PUBLIC packet_io snowflake ip_reassembly plugin_manager)
target_link_libraries(core PUBLIC packet_io ip_reassembly plugin_manager)

View File

@@ -9,39 +9,6 @@
#define CONFIG_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "config", format, ##__VA_ARGS__)
#define CONFIG_LOG_DEBUG(format, ...) STELLAR_LOG_DEBUG(__thread_local_logger, "config", format, ##__VA_ARGS__)
// return 0: success
// retuun -1: failed
static int parse_snowflake_section(toml_table_t *root, struct snowflake_options *opts)
{
const char *ptr;
toml_table_t *table;
table = toml_table_in(root, "snowflake");
if (table == NULL)
{
CONFIG_LOG_ERROR("config file missing snowflake section");
return -1;
}
ptr = toml_raw_in(table, "snowflake_base");
if (ptr == NULL)
{
CONFIG_LOG_ERROR("config file missing snowflake->snowflake_base");
return -1;
}
opts->snowflake_base = atoi(ptr);
ptr = toml_raw_in(table, "snowflake_offset");
if (ptr == NULL)
{
CONFIG_LOG_ERROR("config file missing snowflake->snowflake_offset");
return -1;
}
opts->snowflake_offset = atoi(ptr);
return 0;
}
// return 0: success
// retuun -1: failed
static int parse_schedule_options(toml_table_t *root, struct schedule_options *opts)
@@ -108,11 +75,6 @@ int stellar_config_load(struct stellar_config *config, const char *file)
goto error_out;
}
if (parse_snowflake_section(table, &config->snowflake_opts) != 0)
{
goto error_out;
}
if (parse_schedule_options(table, &config->sched_opts) != 0)
{
goto error_out;
@@ -141,12 +103,6 @@ void stellar_config_print(const struct stellar_config *config)
return;
}
const struct snowflake_options *snowflake_opts = &config->snowflake_opts;
// snowflake config
CONFIG_LOG_DEBUG("snowflake->snowflake_base : %d", snowflake_opts->snowflake_base);
CONFIG_LOG_DEBUG("snowflake->snowflake_offset : %d", snowflake_opts->snowflake_offset);
// schedule config
CONFIG_LOG_DEBUG("schedule->merge_stat_interval : %ld", config->sched_opts.merge_stat_interval);
CONFIG_LOG_DEBUG("schedule->output_stat_interval : %ld", config->sched_opts.output_stat_interval);

View File

@@ -11,15 +11,8 @@ struct schedule_options
uint64_t output_stat_interval; // range: [1, 60000] (ms)
};
struct snowflake_options
{
uint8_t snowflake_base;
uint8_t snowflake_offset;
};
struct stellar_config
{
struct snowflake_options snowflake_opts;
struct schedule_options sched_opts;
};

View File

@@ -9,7 +9,6 @@
#include <sys/prctl.h>
#include "utils.h"
#include "snowflake.h"
#include "packet_io.h"
#include "log_private.h"
#include "stellar_stat.h"
@@ -43,7 +42,6 @@ struct stellar_thread
uint16_t idx;
uint64_t is_runing;
uint64_t last_merge_thread_stat_timestamp;
struct snowflake *snowflake;
struct ip_reassembly *ip_reass;
struct session_manager *sess_mgr;
struct stellar *st;
@@ -58,6 +56,9 @@ struct stellar_runtime
struct packet_io *packet_io;
struct plugin_manager_schema *plug_mgr;
struct stellar_thread threads[MAX_THREAD_NUM];
// config
uint64_t instance_id;
struct session_manager_config *sess_mgr_cfg;
struct ip_reassembly_config *ip_reass_cfg;
struct packet_io_config *pkt_io_cfg;
@@ -65,16 +66,11 @@ struct stellar_runtime
struct stellar
{
char stellar_cfg_file[PATH_MAX];
char plugin_cfg_file[PATH_MAX];
char log_cfg_file[PATH_MAX];
struct stellar_runtime runtime;
struct stellar_config config;
};
static __thread uint16_t __current_thread_idx = UINT16_MAX;
static __thread struct snowflake *__current_thread_snowflake = NULL;
uint64_t stellar_generate_session_id(uint64_t now_sec);
@@ -150,7 +146,6 @@ static void *worker_thread(void *arg)
__current_thread_idx = thr_idx;
__thread_local_logger = runtime->logger;
__current_thread_snowflake = thread->snowflake;
memset(packets, 0, sizeof(packets));
@@ -325,24 +320,9 @@ static void *worker_thread(void *arg)
* Stellar Main Function
******************************************************************************/
uint64_t stellar_generate_session_id(uint64_t now_sec)
{
if (__current_thread_snowflake == NULL)
{
printf("get current thread snowflake before set\n");
abort();
return 0;
}
else
{
return snowflake_generate(__current_thread_snowflake, now_sec);
}
}
static int stellar_thread_init(struct stellar *st)
{
struct stellar_runtime *runtime = &st->runtime;
struct stellar_config *config = &st->config;
uint64_t now_ms = clock_get_real_time_ms();
for (uint16_t i = 0; i < runtime->pkt_io_cfg->nr_worker_thread; i++)
@@ -353,20 +333,13 @@ static int stellar_thread_init(struct stellar *st)
thread->last_merge_thread_stat_timestamp = now_ms;
thread->snowflake = snowflake_new(i, config->snowflake_opts.snowflake_base, config->snowflake_opts.snowflake_offset);
if (thread->snowflake == NULL)
{
CORE_LOG_ERROR("unable to create snowflake id generator");
return -1;
}
runtime->sess_mgr_cfg->session_id_seed = runtime->instance_id << 8 | i;
thread->sess_mgr = session_manager_new(runtime->sess_mgr_cfg, now_ms);
if (thread->sess_mgr == NULL)
{
CORE_LOG_ERROR("unable to create session manager");
return -1;
}
session_manager_set_session_id_generator(thread->sess_mgr, stellar_generate_session_id);
thread->ip_reass = ip_reassembly_new(runtime->ip_reass_cfg, now_ms);
if (thread->ip_reass == NULL)
@@ -392,7 +365,6 @@ static void stellar_thread_clean(struct stellar *st)
{
ip_reassembly_free(thread->ip_reass);
session_manager_free(thread->sess_mgr);
snowflake_free(thread->snowflake);
}
}
CORE_LOG_FATAL("worker thread context cleaned");
@@ -455,14 +427,10 @@ struct stellar *stellar_new(const char *stellar_cfg_file, const char *plugin_cfg
return NULL;
}
memcpy(st->stellar_cfg_file, stellar_cfg_file, strlen(stellar_cfg_file));
memcpy(st->plugin_cfg_file, plugin_cfg_file, strlen(plugin_cfg_file));
memcpy(st->log_cfg_file, log_cfg_file, strlen(log_cfg_file));
struct stellar_runtime *runtime = &st->runtime;
struct stellar_config *config = &st->config;
runtime->logger = log_new(st->log_cfg_file);
runtime->logger = log_new(log_cfg_file);
if (runtime->logger == NULL)
{
printf("unable to create logger");
@@ -470,23 +438,29 @@ struct stellar *stellar_new(const char *stellar_cfg_file, const char *plugin_cfg
}
__thread_local_logger = runtime->logger;
CORE_LOG_FATAL("stellar start (version: %s)\n %s", version, logo_str);
CORE_LOG_FATAL("stellar config file : %s", st->stellar_cfg_file);
CORE_LOG_FATAL("plugin config file : %s", st->plugin_cfg_file);
CORE_LOG_FATAL("log config file : %s", st->log_cfg_file);
CORE_LOG_FATAL("stellar config file : %s", stellar_cfg_file);
CORE_LOG_FATAL("plugin config file : %s", plugin_cfg_file);
CORE_LOG_FATAL("log config file : %s", log_cfg_file);
runtime->sess_mgr_cfg = session_manager_config_new(st->stellar_cfg_file);
if (load_and_validate_toml_integer_config(stellar_cfg_file, "instance.id", (uint64_t *)&runtime->instance_id, 0, 4095) != 0)
{
CORE_LOG_ERROR("unable to load instance id");
goto error_out;
}
runtime->sess_mgr_cfg = session_manager_config_new(stellar_cfg_file);
if (runtime->sess_mgr_cfg == NULL)
{
CORE_LOG_ERROR("unable to create session manager config");
goto error_out;
}
runtime->ip_reass_cfg = ip_reassembly_config_new(st->stellar_cfg_file);
runtime->ip_reass_cfg = ip_reassembly_config_new(stellar_cfg_file);
if (runtime->ip_reass_cfg == NULL)
{
CORE_LOG_ERROR("unable to create ip reassembly config");
goto error_out;
}
runtime->pkt_io_cfg = packet_io_config_new(st->stellar_cfg_file);
runtime->pkt_io_cfg = packet_io_config_new(stellar_cfg_file);
if (runtime->pkt_io_cfg == NULL)
{
CORE_LOG_ERROR("unable to create packet io config");
@@ -496,7 +470,7 @@ struct stellar *stellar_new(const char *stellar_cfg_file, const char *plugin_cfg
packet_io_config_print(runtime->pkt_io_cfg);
session_manager_config_print(runtime->sess_mgr_cfg);
ip_reassembly_config_print(runtime->ip_reass_cfg);
if (stellar_config_load(config, st->stellar_cfg_file) != 0)
if (stellar_config_load(config, stellar_cfg_file) != 0)
{
CORE_LOG_ERROR("unable to load config file");
goto error_out;
@@ -509,7 +483,7 @@ struct stellar *stellar_new(const char *stellar_cfg_file, const char *plugin_cfg
CORE_LOG_ERROR("unable to create stellar stat");
goto error_out;
}
runtime->plug_mgr = plugin_manager_init(st, st->plugin_cfg_file);
runtime->plug_mgr = plugin_manager_init(st, plugin_cfg_file);
if (runtime->plug_mgr == NULL)
{
CORE_LOG_ERROR("unable to create plugin manager");

View File

@@ -18,6 +18,12 @@
#define SESSION_LOG_DEBUG(format, ...) STELLAR_LOG_DEBUG(__thread_local_logger, "session", format, ##__VA_ARGS__)
#define SESSION_LOG_INFO(format, ...) STELLAR_LOG_INFO(__thread_local_logger, "session", format, ##__VA_ARGS__)
struct snowflake
{
uint64_t seed;
uint64_t sequence;
};
struct session_manager
{
struct list_head evicte_queue;
@@ -38,7 +44,7 @@ struct session_manager
*/
uint64_t now_ms;
uint64_t last_clean_expired_sess_ts;
session_id_generate_fn id_generator;
struct snowflake *sf;
};
#define EVICTE_SESSION_BURST (RX_BURST_MAX)
@@ -104,6 +110,55 @@ struct session_manager
} \
}
/******************************************************************************
* snowflake id generator
******************************************************************************/
static struct snowflake *snowflake_new(uint64_t seed)
{
struct snowflake *sf = (struct snowflake *)calloc(1, sizeof(struct snowflake));
if (sf == NULL)
{
return NULL;
}
sf->seed = seed & 0xFFFFF;
sf->sequence = 0;
return sf;
}
static void snowflake_free(struct snowflake *sf)
{
if (sf != NULL)
{
free(sf);
sf = NULL;
}
}
/*
* high -> low
*
* +------+------------------+----------------+------------------------+---------------------------+
* | 1bit | 12bit device_id | 8bit thread_id | 28bit timestamp in sec | 15bit sequence per thread |
* +------+------------------+----------------+------------------------+---------------------------+
*/
#define MAX_ID_PER_THREAD (32768)
#define MAX_ID_BASE_TIME (268435456L)
static uint64_t snowflake_generate(struct snowflake *sf, uint64_t now_sec)
{
uint64_t id = 0;
uint64_t id_per_thread = (sf->sequence++) % MAX_ID_PER_THREAD;
uint64_t id_base_time = now_sec % MAX_ID_BASE_TIME;
id = (sf->seed << 43) | (id_base_time << 15) | (id_per_thread);
return id;
}
/******************************************************************************
* TCP utils
******************************************************************************/
@@ -408,15 +463,8 @@ static void session_update(struct session_manager *mgr, struct session *sess, en
{
if (session_get_current_state(sess) == SESSION_STATE_INIT)
{
if (mgr->id_generator)
{
uint64_t sess_id = mgr->id_generator(mgr->now_ms / 1000);
uint64_t sess_id = snowflake_generate(mgr->sf, mgr->now_ms / 1000);
session_set_id(sess, sess_id);
}
else
{
session_set_id(sess, 0);
}
enum packet_direction pkt_dir = packet_get_direction(pkt);
if (dir == FLOW_DIRECTION_C2S)
{
@@ -923,6 +971,11 @@ struct session_manager *session_manager_new(const struct session_manager_config
goto error;
}
}
mgr->sf = snowflake_new(mgr->cfg.session_id_seed);
if (mgr->sf == NULL)
{
goto error;
}
INIT_LIST_HEAD(&mgr->evicte_queue);
session_transition_init();
@@ -966,6 +1019,7 @@ void session_manager_free(struct session_manager *mgr)
{
packet_filter_free(mgr->dup_pkt_filter);
}
snowflake_free(mgr->sf);
session_timer_free(mgr->sess_timer);
session_table_free(mgr->udp_sess_table);
session_table_free(mgr->tcp_sess_table);
@@ -975,11 +1029,6 @@ void session_manager_free(struct session_manager *mgr)
}
}
void session_manager_set_session_id_generator(struct session_manager *mgr, session_id_generate_fn generator)
{
mgr->id_generator = generator;
}
void session_manager_record_duplicated_packet(struct session_manager *mgr, const struct packet *pkt)
{
if (mgr->cfg.duplicated_packet_bloom_filter.enable)

View File

@@ -10,6 +10,7 @@ extern "C"
struct session_manager_config
{
uint64_t session_id_seed;
uint64_t tcp_session_max;
uint64_t udp_session_max;
@@ -149,8 +150,6 @@ struct session_manager;
struct session_manager *session_manager_new(const struct session_manager_config *cfg, uint64_t now_ms);
void session_manager_free(struct session_manager *mgr);
typedef uint64_t (*session_id_generate_fn)(uint64_t now_ms);
void session_manager_set_session_id_generator(struct session_manager *mgr, session_id_generate_fn generator);
void session_manager_record_duplicated_packet(struct session_manager *mgr, const struct packet *pkt);
struct session *session_manager_new_session(struct session_manager *mgr, const struct packet *pkt, uint64_t now_ms);

View File

@@ -108,6 +108,13 @@ target_link_libraries(gtest_sess_mgr_scan session_manager gtest)
add_executable(gtest_case_tcp_fast_open gtest_case_tcp_fast_open.cpp)
target_link_libraries(gtest_case_tcp_fast_open session_manager gtest)
###############################################################################
# session id decoder
###############################################################################
add_executable(session_id_decoder session_id_decoder.cpp)
target_link_libraries(session_id_decoder)
###############################################################################
# gtest
###############################################################################

View File

@@ -10,6 +10,7 @@ extern "C"
#include "session_manager.h"
static struct session_manager_config cfg = {
.session_id_seed = 0xFFFFF,
.tcp_session_max = 256,
.udp_session_max = 256,

View File

@@ -6,12 +6,6 @@
#include "session_private.h"
#include "default_config.h"
static uint64_t session_id_generator(uint64_t now_ms __attribute__((unused)))
{
static uint64_t count = 0;
return (++count);
}
/******************************************************************************
* case: TCP init -> opening (by TCP Fast Open)
******************************************************************************/
@@ -274,7 +268,6 @@ TEST(CASE, TCP_FAST_OPEN)
mgr = session_manager_new(&cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// C2S SYN Packet
printf("\n=> Packet Parse: TCP C2S fast open packet\n");

View File

@@ -6,12 +6,6 @@
#include "default_config.h"
#include "test_packets.h"
static uint64_t session_id_generator(uint64_t now_ms __attribute__((unused)))
{
static uint64_t count = 0;
return (++count);
}
static void packet_set_ip_id(struct packet *pkt, uint16_t ip_id)
{
const struct layer_private *ipv4_layer = packet_get_innermost_layer(pkt, LAYER_PROTO_IPV4);
@@ -30,7 +24,6 @@ TEST(TCP_DUPKT_FILTER_ENABLE, SYN_DUP)
mgr = session_manager_new(&cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// C2S SYN Packet
printf("\n=> Packet Parse: TCP C2S SYN packet\n");
@@ -97,7 +90,6 @@ TEST(TCP_DUPKT_FILTER_ENABLE, SYNACK_DUP)
mgr = session_manager_new(&cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// S2C SYNACK Packet
printf("\n=> Packet Parse: TCP S2C SYNACK packet\n");
@@ -165,7 +157,6 @@ TEST(TCP_DUPKT_FILTER_ENABLE, SKIP)
mgr = session_manager_new(&cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// C2S SYN Packet
printf("\n=> Packet Parse: TCP C2S SYN packet\n");
@@ -252,7 +243,6 @@ TEST(TCP_DUPKT_FILTER_DISABLE, SYN_DUP)
mgr = session_manager_new(&_cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// C2S SYN Packet
printf("\n=> Packet Parse: TCP C2S SYN packet\n");
@@ -303,7 +293,6 @@ TEST(TCP_DUPKT_FILTER_DISABLE, SYNACK_DUP)
mgr = session_manager_new(&_cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// S2C SYNACK Packet
printf("\n=> Packet Parse: TCP S2C SYNACK packet\n");

View File

@@ -7,12 +7,6 @@
#include "default_config.h"
#include "test_packets.h"
static uint64_t session_id_generator(uint64_t now_ms __attribute__((unused)))
{
static uint64_t count = 0;
return (++count);
}
static void packet_set_ip_src_addr(struct packet *pkt, uint32_t addr)
{
const struct layer_private *ipv4_layer = packet_get_innermost_layer(pkt, LAYER_PROTO_IPV4);
@@ -34,7 +28,6 @@ TEST(TCP_OVERLOAD, EVICT_OLD_SESS)
mgr = session_manager_new(&_cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// C2S SYN Packet
printf("\n=> Packet Parse: TCP C2S SYN packet\n");
@@ -78,7 +71,6 @@ TEST(TCP_OVERLOAD, EVICT_NEW_SESS)
mgr = session_manager_new(&_cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// C2S SYN Packet
printf("\n=> Packet Parse: TCP C2S SYN packet\n");

View File

@@ -7,12 +7,6 @@
#include "default_config.h"
#include "test_packets.h"
static uint64_t session_id_generator(uint64_t now_ms __attribute__((unused)))
{
static uint64_t count = 0;
return (++count);
}
static void packet_set_ip_src_addr(struct packet *pkt, uint32_t addr)
{
const struct layer_private *ipv4_layer = packet_get_innermost_layer(pkt, LAYER_PROTO_IPV4);
@@ -35,7 +29,6 @@ TEST(UDP_OVERLOAD, EVICT_OLD_SESS)
mgr = session_manager_new(&_cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// C2S REQ Packet
printf("\n=> Packet Parse: UDP C2S REQ packet\n");
@@ -127,7 +120,6 @@ TEST(UDP_OVERLOAD, EVICT_NEW_SESS)
mgr = session_manager_new(&_cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// C2S REQ Packet
printf("\n=> Packet Parse: UDP C2S REQ packet\n");

View File

@@ -6,12 +6,6 @@
#include "default_config.h"
#include "test_packets.h"
static uint64_t session_id_generator(uint64_t now_ms __attribute__((unused)))
{
static uint64_t count = 0;
return (++count);
}
static inline void packet_overwrite_src_addr(struct packet *pkt, struct in_addr addr)
{
const struct layer_private *ipv4_layer = packet_get_innermost_layer(pkt, LAYER_PROTO_IPV4);
@@ -93,7 +87,6 @@ TEST(SESS_MGR_SCAN, OPTS)
mgr = session_manager_new(&cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// new session
memset(&pkt, 0, sizeof(pkt));

View File

@@ -6,12 +6,6 @@
#include "default_config.h"
#include "test_packets.h"
static uint64_t session_id_generator(uint64_t now_ms __attribute__((unused)))
{
static uint64_t count = 0;
return (++count);
}
static void hex_dump(const char *payload, uint32_t len)
{
printf("Payload Length: %u\n", len);
@@ -36,7 +30,6 @@ TEST(SESS_MGR_TCP_REASSEMBLY, OUT_OF_ORDER)
mgr = session_manager_new(&cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// C2S SYN Packet
printf("\n=> Packet Parse: TCP C2S SYN packet\n");
@@ -228,7 +221,6 @@ TEST(SESS_MGR_TCP_REASSEMBLY, SEQ_WRAPAROUND)
mgr = session_manager_new(&cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// C2S SYN Packet
printf("\n=> Packet Parse: TCP C2S SYN packet\n");

View File

@@ -8,12 +8,6 @@
#include "default_config.h"
#include "test_packets.h"
static uint64_t session_id_generator(uint64_t now_ms __attribute__((unused)))
{
static uint64_t count = 0;
return (++count);
}
static void build_active_tcp_session(struct session_manager *mgr, struct session *sess)
{
struct packet pkt;
@@ -58,7 +52,6 @@ TEST(TCP_ACTIVE_TO_CLOSING, BY_FIN_FIN)
mgr = session_manager_new(&cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// C2S SYN Packet & C2S DATA Packet
build_active_tcp_session(mgr, sess);
@@ -150,7 +143,6 @@ TEST(TCP_ACTIVE_TO_CLOSING, BY_C2S_RST)
mgr = session_manager_new(&cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// C2S SYN Packet & C2S DATA Packet
build_active_tcp_session(mgr, sess);
@@ -236,7 +228,6 @@ TEST(TCP_ACTIVE_TO_CLOSING, BY_S2C_RST)
mgr = session_manager_new(&cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// C2S SYN Packet & C2S DATA Packet
build_active_tcp_session(mgr, sess);
@@ -320,7 +311,6 @@ TEST(TCP_ACTIVE_TO_CLOSING, BY_DATA_TIMEOUT)
mgr = session_manager_new(&cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// C2S SYN Packet & C2S DATA Packet
build_active_tcp_session(mgr, sess);
@@ -370,7 +360,6 @@ TEST(TCP_ACTIVE_TO_CLOSING, BY_C2S_HALF_CLOSED_TIMEOUT)
mgr = session_manager_new(&cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// C2S SYN Packet & C2S DATA Packet
build_active_tcp_session(mgr, sess);
@@ -450,7 +439,6 @@ TEST(TCP_ACTIVE_TO_CLOSING, BY_S2C_HALF_CLOSED_TIMEOUT)
mgr = session_manager_new(&cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// C2S SYN Packet & C2S DATA Packet
build_active_tcp_session(mgr, sess);

View File

@@ -8,12 +8,6 @@
#include "default_config.h"
#include "test_packets.h"
static uint64_t session_id_generator(uint64_t now_ms __attribute__((unused)))
{
static uint64_t count = 0;
return (++count);
}
/******************************************************************************
* case: TCP init -> opening (by SYN)
******************************************************************************/
@@ -29,7 +23,6 @@ TEST(TCP_INIT_TO_OPENING, BY_SYN)
mgr = session_manager_new(&cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// C2S SYN Packet
printf("\n=> Packet Parse: TCP C2S SYN packet\n");
@@ -107,7 +100,6 @@ TEST(TCP_INIT_TO_OPENING, BY_SYNACK)
mgr = session_manager_new(&cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// S2C SYNACK Packet
printf("\n=> Packet Parse: TCP S2C SYNACK packet\n");
@@ -185,7 +177,6 @@ TEST(TCP_INIT_TO_OPENING, BY_SYN_SYNACK)
mgr = session_manager_new(&cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// C2S SYN Packet
printf("\n=> Packet Parse: TCP C2S SYN packet\n");
@@ -275,7 +266,6 @@ TEST(TCP_INIT_TO_OPENING, BY_SYN_SYNACK_ACK)
mgr = session_manager_new(&cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// C2S SYN Packet
printf("\n=> Packet Parse: TCP C2S SYN packet\n");
@@ -379,7 +369,6 @@ TEST(TCP_INIT_TO_OPENING, BY_SYN_RETRANSMISSION)
mgr = session_manager_new(&cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// C2S SYN Packet
printf("\n=> Packet Parse: TCP C2S SYN packet\n");
@@ -476,7 +465,6 @@ TEST(TCP_INIT_TO_OPENING, BY_SYNACK_RETRANSMISSION)
mgr = session_manager_new(&cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// S2C SYNACK Packet
printf("\n=> Packet Parse: TCP S2C SYNACK packet\n");
@@ -572,7 +560,6 @@ TEST(TCP_INIT_TO_OPENING, BY_C2S_ASMMETRIC)
mgr = session_manager_new(&cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// C2S SYN Packet
printf("\n=> Packet Parse: TCP C2S SYN packet\n");
@@ -662,7 +649,6 @@ TEST(TCP_INIT_TO_OPENING, BY_S2C_ASMMETRIC)
mgr = session_manager_new(&cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// S2C SYNACK Packet
printf("\n=> Packet Parse: TCP S2C SYNACK packet\n");

View File

@@ -8,12 +8,6 @@
#include "default_config.h"
#include "test_packets.h"
static uint64_t session_id_generator(uint64_t now_ms __attribute__((unused)))
{
static uint64_t count = 0;
return (++count);
}
#if 1
TEST(TCP_INIT_TO_OPENING_TO_ACTIVE_TO_CLOSING_TO_CLOSED, TEST)
{
@@ -25,7 +19,6 @@ TEST(TCP_INIT_TO_OPENING_TO_ACTIVE_TO_CLOSING_TO_CLOSED, TEST)
mgr = session_manager_new(&cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// C2S SYN Packet
printf("\n=> Packet Parse: TCP C2S SYN packet\n");

View File

@@ -8,12 +8,6 @@
#include "default_config.h"
#include "test_packets.h"
static uint64_t session_id_generator(uint64_t now_ms __attribute__((unused)))
{
static uint64_t count = 0;
return (++count);
}
/******************************************************************************
* case: TCP opening -> active (by C2S DATA)
******************************************************************************/
@@ -29,7 +23,6 @@ TEST(TCP_OPENING_TO_ACTIVE, BY_SYN_C2S_DATA)
mgr = session_manager_new(&cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// C2S SYN Packet
printf("\n=> Packet Parse: TCP C2S SYN packet\n");
@@ -119,7 +112,6 @@ TEST(TCP_OPENING_TO_ACTIVE, BY_SYNACK_S2C_DATA)
mgr = session_manager_new(&cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// S2C SYNACK Packet
printf("\n=> Packet Parse: TCP S2C SYNACK packet\n");

View File

@@ -8,12 +8,6 @@
#include "default_config.h"
#include "test_packets.h"
static uint64_t session_id_generator(uint64_t now_ms __attribute__((unused)))
{
static uint64_t count = 0;
return (++count);
}
/******************************************************************************
* case: TCP opening -> closing (by FIN-FIN)
******************************************************************************/
@@ -29,7 +23,6 @@ TEST(TCP_OPENING_TO_CLOSING, BY_FIN_FIN)
mgr = session_manager_new(&cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// C2S SYN Packet
printf("\n=> Packet Parse: TCP C2S SYN packet\n");
@@ -130,7 +123,6 @@ TEST(TCP_OPENING_TO_CLOSING, BY_C2S_RST)
mgr = session_manager_new(&cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// C2S SYN Packet
printf("\n=> Packet Parse: TCP C2S SYN packet\n");
@@ -225,7 +217,6 @@ TEST(TCP_OPENING_TO_CLOSING, BY_S2C_RST)
mgr = session_manager_new(&cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// C2S SYN Packet
printf("\n=> Packet Parse: TCP C2S SYN packet\n");
@@ -319,7 +310,6 @@ TEST(TCP_OPENING_TO_CLOSING, BY_INIT_TIMEOUT)
mgr = session_manager_new(&cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// C2S SYN Packet
printf("\n=> Packet Parse: TCP C2S SYN packet\n");
@@ -378,7 +368,6 @@ TEST(TCP_OPENING_TO_CLOSING, BY_HANDSHAKE_TIMEOUT)
mgr = session_manager_new(&cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// C2S SYN Packet
printf("\n=> Packet Parse: TCP C2S SYN packet\n");
@@ -468,7 +457,6 @@ TEST(TCP_OPENING_TO_CLOSING, BY_DATA_TIMEOUT)
mgr = session_manager_new(&cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// C2S SYN Packet
printf("\n=> Packet Parse: TCP C2S SYN packet\n");
@@ -570,7 +558,6 @@ TEST(TCP_OPENING_TO_CLOSING, BY_C2S_HALF_FIN)
mgr = session_manager_new(&cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// C2S SYN Packet
printf("\n=> Packet Parse: TCP C2S SYN packet\n");
@@ -659,7 +646,6 @@ TEST(TCP_OPENING_TO_CLOSING, BY_S2C_HALF_FIN)
mgr = session_manager_new(&cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// C2S SYN Packet
printf("\n=> Packet Parse: TCP C2S SYN packet\n");

View File

@@ -8,12 +8,6 @@
#include "default_config.h"
#include "test_packets.h"
static uint64_t session_id_generator(uint64_t now_ms __attribute__((unused)))
{
static uint64_t count = 0;
return (++count);
}
#if 1
TEST(UDP_INIT_TO_OPENING_TO_ACTIVE_TO_CLOSING, TEST)
{
@@ -25,7 +19,6 @@ TEST(UDP_INIT_TO_OPENING_TO_ACTIVE_TO_CLOSING, TEST)
mgr = session_manager_new(&cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// C2S REQ Packet
printf("\n=> Packet Parse: UDP C2S REQ packet\n");

View File

@@ -8,12 +8,6 @@
#include "default_config.h"
#include "test_packets.h"
static uint64_t session_id_generator(uint64_t now_ms __attribute__((unused)))
{
static uint64_t count = 0;
return (++count);
}
/******************************************************************************
* case: UDP init -> opening (by C2S Packet)
* case: UDP opening -> closing (by timeout)
@@ -30,7 +24,6 @@ TEST(UDP_INIT_TO_OPENING_TO_CLOSING, BY_C2S)
mgr = session_manager_new(&cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// C2S REQ Packet
printf("\n=> Packet Parse: UDP C2S REQ packet\n");
@@ -109,7 +102,6 @@ TEST(UDP_INIT_TO_OPENING_TO_CLOSING, BY_S2C)
mgr = session_manager_new(&cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// S2C RESP Packet
printf("\n=> Packet Parse: UDP S2C RESP packet\n");

View File

@@ -7,12 +7,6 @@
#include "default_config.h"
#include "test_packets.h"
static uint64_t session_id_generator(uint64_t now_ms __attribute__((unused)))
{
static uint64_t count = 0;
return (++count);
}
#if 1
TEST(TIMEOUT, TCP_TIMEOUT_DATA)
{
@@ -22,7 +16,6 @@ TEST(TIMEOUT, TCP_TIMEOUT_DATA)
mgr = session_manager_new(&cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// C2S SYN Packet
printf("\n=> Packet Parse: TCP C2S SYN packet\n");

View File

@@ -7,12 +7,6 @@
#include "default_config.h"
#include "test_packets.h"
static uint64_t session_id_generator(uint64_t now_ms __attribute__((unused)))
{
static uint64_t count = 0;
return (++count);
}
#if 1
TEST(TIMEOUT, TCP_TIMEOUT_HANDSHAKE)
{
@@ -22,7 +16,6 @@ TEST(TIMEOUT, TCP_TIMEOUT_HANDSHAKE)
mgr = session_manager_new(&cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// S2C SYNACK Packet
printf("\n=> Packet Parse: TCP S2C SYNACK packet\n");

View File

@@ -7,12 +7,6 @@
#include "default_config.h"
#include "test_packets.h"
static uint64_t session_id_generator(uint64_t now_ms __attribute__((unused)))
{
static uint64_t count = 0;
return (++count);
}
#if 1
TEST(TIMEOUT, TCP_TIMEOUT_INIT)
{
@@ -22,7 +16,6 @@ TEST(TIMEOUT, TCP_TIMEOUT_INIT)
mgr = session_manager_new(&cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// C2S SYN Packet
printf("\n=> Packet Parse: TCP C2S SYN packet\n");

View File

@@ -7,12 +7,6 @@
#include "default_config.h"
#include "test_packets.h"
static uint64_t session_id_generator(uint64_t now_ms __attribute__((unused)))
{
static uint64_t count = 0;
return (++count);
}
#if 1
TEST(TIMEOUT, UDP_TIMEOUT_DATA1)
{
@@ -22,7 +16,6 @@ TEST(TIMEOUT, UDP_TIMEOUT_DATA1)
mgr = session_manager_new(&cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// C2S REQ Packet
printf("\n=> Packet Parse: UDP C2S REQ packet\n");
@@ -59,7 +52,6 @@ TEST(TIMEOUT, UDP_TIMEOUT_DATA2)
mgr = session_manager_new(&cfg, 1);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_id_generator(mgr, session_id_generator);
// C2S REQ Packet
printf("\n=> Packet Parse: UDP C2S REQ packet\n");

View File

@@ -0,0 +1,37 @@
#include <time.h>
#include <stdio.h>
#include <stdint.h>
#include <stdlib.h>
int main(int argc, char **argv)
{
if (argc != 2)
{
printf("Usage: %s <id>\n", argv[0]);
return -1;
}
uint64_t id = atoll(argv[1]);
uint64_t sequence = id & 0x7FFF;
uint64_t time_relative = (id >> 15) & 0xFFFFFFF;
uint64_t seed = (id >> 43) & 0xFFFFF;
uint64_t thread_id = seed & 0xFF;
uint64_t instance_id = (seed >> 8) & 0xFFF;
printf("id: %lu, seed: %lu, instance_id: %lu, thread_id: %lu, time_relative: %lu, sequence: %lu\n", id, seed, instance_id, thread_id, time_relative, sequence);
#define MAX_ID_BASE_TIME (268435456L)
for (int i = 6; i < 10; i++)
{
char buff[64];
time_t tt = MAX_ID_BASE_TIME * i + time_relative;
struct tm *tm = localtime(&tt);
strftime(buff, sizeof(buff), "%Y-%m-%d %H:%M:%S", tm);
printf("\ttime_period * %d + time_relative => %s\n", i, buff);
}
return 0;
}

View File

@@ -1,7 +0,0 @@
add_library(snowflake snowflake.c)
target_include_directories(snowflake PUBLIC ${CMAKE_CURRENT_LIST_DIR})
target_include_directories(snowflake PUBLIC ${CMAKE_SOURCE_DIR}/deps/logger)
target_link_libraries(snowflake logger)
add_subdirectory(test)

View File

@@ -1,105 +0,0 @@
#include <stdint.h>
#include <stdlib.h>
#include "snowflake.h"
#include "log_private.h"
#define SNOWFLAKE_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "snowflake", format, ##__VA_ARGS__)
struct snowflake
{
uint8_t worker_base_id; // 5 bit [0, 31]
uint8_t worker_offset_id; // 7 bit [0, 127]
uint64_t thread_id; // limit 256
uint64_t device_id; // 12 bit [0, 4095] ( base << 7 | offset )
uint64_t sequence;
};
struct snowflake *snowflake_new(uint16_t thread_id, uint8_t worker_base_id, uint8_t worker_offset_id)
{
if (thread_id > 255)
{
SNOWFLAKE_LOG_ERROR("thread_id %u is invalid, range [0, 255]", thread_id);
return NULL;
}
if (worker_base_id > 31)
{
SNOWFLAKE_LOG_ERROR("worker_base_id %u is invalid, range [0, 31]", worker_base_id);
return NULL;
}
if (worker_offset_id > 127)
{
SNOWFLAKE_LOG_ERROR("worker_offset_id %u is invalid, range [0, 127]", worker_offset_id);
return NULL;
}
struct snowflake *sf = (struct snowflake *)calloc(1, sizeof(struct snowflake));
if (sf == NULL)
{
SNOWFLAKE_LOG_ERROR("calloc snowflake failed");
return NULL;
}
sf->worker_base_id = worker_base_id;
sf->worker_offset_id = worker_offset_id;
sf->thread_id = thread_id;
sf->device_id = ((worker_base_id << 7) | worker_offset_id) & 0xFFF;
sf->sequence = 0;
return sf;
}
void snowflake_free(struct snowflake *sf)
{
if (sf != NULL)
{
free(sf);
sf = NULL;
}
}
/*
* high -> low
*
* +------+------------------+----------------+------------------------+---------------------------+
* | 1bit | 12bit device_id | 8bit thread_id | 28bit timestamp in sec | 15bit sequence per thread |
* +------+------------------+----------------+------------------------+---------------------------+
*/
#define MAX_ID_PER_THREAD (32768)
#define MAX_ID_BASE_TIME (268435456L)
uint64_t snowflake_generate(struct snowflake *sf, uint64_t now_sec)
{
if (sf == NULL)
{
SNOWFLAKE_LOG_ERROR("snowflake is NULL");
return 0;
}
uint64_t id = 0;
uint64_t id_per_thread = (sf->sequence++) % MAX_ID_PER_THREAD;
uint64_t id_base_time = now_sec % MAX_ID_BASE_TIME;
id = (sf->device_id << 51) |
(sf->thread_id << 43) |
(id_base_time << 15) |
(id_per_thread);
return id;
}
void snowflake_deserialize(uint64_t id, struct snowflake_meta *meta)
{
if (meta == NULL)
{
return;
}
meta->sequence = id & 0x7FFF;
meta->time_cycle = MAX_ID_BASE_TIME;
meta->time_relative = (id >> 15) & 0xFFFFFFF;
meta->thread_id = (id >> 43) & 0xFF;
meta->device_id = (id >> 51) & 0xFFF;
meta->worker_base_id = (meta->device_id >> 7) & 0x1F;
meta->worker_offset_id = meta->device_id & 0x7F;
}

View File

@@ -1,31 +0,0 @@
#pragma once
#ifdef __cplusplus
extern "C"
{
#endif
#include <stdint.h>
struct snowflake;
struct snowflake *snowflake_new(uint16_t thread_id, uint8_t worker_base_id, uint8_t worker_offset_id);
void snowflake_free(struct snowflake *sf);
struct snowflake_meta
{
uint64_t sequence;
uint64_t time_cycle;
uint64_t time_relative;
uint16_t thread_id;
uint16_t device_id;
uint8_t worker_base_id;
uint8_t worker_offset_id;
};
uint64_t snowflake_generate(struct snowflake *sf, uint64_t now_sec);
void snowflake_deserialize(uint64_t id, struct snowflake_meta *meta);
#ifdef __cplusplus
}
#endif

View File

@@ -1,8 +0,0 @@
add_executable(gtest_snowflake gtest_snowflake.cpp)
target_link_libraries(gtest_snowflake snowflake gtest)
add_executable(snowflake_tool snowflake_tool.cpp)
target_link_libraries(snowflake_tool snowflake)
include(GoogleTest)
gtest_discover_tests(gtest_snowflake)

View File

@@ -1,48 +0,0 @@
#include <gtest/gtest.h>
#include "snowflake.h"
TEST(SNOWFLAKE, GENERATE)
{
uint16_t thread_id = 1;
uint8_t worker_base_id = 2;
uint8_t worker_offset_id = 3;
struct snowflake *sf = snowflake_new(thread_id, worker_base_id, worker_offset_id);
EXPECT_TRUE(sf != NULL);
EXPECT_TRUE(snowflake_generate(sf, 1000) != 0);
snowflake_free(sf);
}
TEST(SNOWFLAKE, DESERIALIZE)
{
uint16_t thread_id = 1;
uint8_t worker_base_id = 2;
uint8_t worker_offset_id = 3;
struct snowflake *sf = snowflake_new(thread_id, worker_base_id, worker_offset_id);
EXPECT_TRUE(sf != NULL);
for (uint64_t i = 0; i < 5; i++)
{
uint64_t id = snowflake_generate(sf, i + 1000);
EXPECT_TRUE(id != 0);
struct snowflake_meta meta = {};
snowflake_deserialize(id, &meta);
EXPECT_EQ(meta.sequence, i);
EXPECT_EQ(meta.time_relative, i + 1000);
EXPECT_EQ(meta.thread_id, thread_id);
EXPECT_EQ(meta.device_id, (worker_base_id << 7) | worker_offset_id);
EXPECT_EQ(meta.worker_base_id, worker_base_id);
EXPECT_EQ(meta.worker_offset_id, worker_offset_id);
}
snowflake_free(sf);
}
int main(int argc, char **argv)
{
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@@ -1,38 +0,0 @@
#include <time.h>
#include <stdio.h>
#include <stdlib.h>
#include "snowflake.h"
int main(int argc, char **argv)
{
if (argc != 2)
{
printf("Usage: %s <id>\n", argv[0]);
return -1;
}
uint64_t id = atoll(argv[1]);
struct snowflake_meta meta = {};
snowflake_deserialize(id, &meta);
printf("id : %lu\n", id);
printf("base_id : %u\n", meta.worker_base_id);
printf("offset_id : %u\n", meta.worker_offset_id);
printf("device_id : %u\n", meta.device_id);
printf("thread_id : %u\n", meta.thread_id);
printf("time_cycle : %lu sec (%lu year)\n", meta.time_cycle, meta.time_cycle / 3600 / 24 / 365);
printf("time_relative : %lu sec\n", meta.time_relative);
printf("sequence : %lu\n", meta.sequence);
for (int i = 6; i < 10; i++)
{
char buff[64];
time_t tt = meta.time_cycle * i + meta.time_relative;
struct tm *tm = localtime(&tt);
strftime(buff, sizeof(buff), "%Y-%m-%d %H:%M:%S", tm);
printf("\ttime_cycle * %d + time_relative => %s\n", i, buff);
}
return 0;
}

View File

@@ -1,6 +1,5 @@
[snowflake]
snowflake_base = 1 # [0, 31]
snowflake_offset = 2 # [0, 127]
[instance]
id = 1 # range: [0, 4095] (20 bit)
[packet_io]
mode = "pcapfile" # pcapfile, pcaplist, marsio

View File

@@ -1,6 +1,5 @@
[snowflake]
snowflake_base = 1 # [0, 31]
snowflake_offset = 2 # [0, 127]
[instance]
id = 1 # range: [0, 4095] (20 bit)
[packet_io]
mode = "pcapfile" # pcapfile, pcaplist, marsio