diff --git a/conf/stellar.toml b/conf/stellar.toml index b6e4bae..9454cb6 100644 --- a/conf/stellar.toml +++ b/conf/stellar.toml @@ -68,8 +68,12 @@ path = "" init = "packet_manager_on_init" exit = "packet_manager_on_exit" + thread_init = "packet_manager_on_thread_init" + thread_exit = "packet_manager_on_thread_exit" [[module]] path = "" init = "session_manager_on_init" exit = "session_manager_on_exit" + thread_init = "session_manager_on_thread_init" + thread_exit = "session_manager_on_thread_exit" diff --git a/infra/ip_reassembly/ip_reassembly.c b/infra/ip_reassembly/ip_reassembly.c index fbdf3e6..c0d3e10 100644 --- a/infra/ip_reassembly/ip_reassembly.c +++ b/infra/ip_reassembly/ip_reassembly.c @@ -7,8 +7,8 @@ #include "packet_helper.h" #include "packet_internal.h" -#define IP_DEFRAG_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "ip ip_reass", format, ##__VA_ARGS__) -#define IP_DEFRAG_LOG_INFO(format, ...) STELLAR_LOG_INFO(__thread_local_logger, "ip ip_reass", format, ##__VA_ARGS__) +#define IP_REASSEMBLY_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "IP reassembly", format, ##__VA_ARGS__) +#define IP_REASSEMBLY_LOG_INFO(format, ...) STELLAR_LOG_INFO(__thread_local_logger, "IP reassembly", format, ##__VA_ARGS__) /* * https://datatracker.ietf.org/doc/html/rfc8200#section-4.5 @@ -132,22 +132,22 @@ struct ip_reassembly (stat)->ip6_##filed += n; \ } -#define IP_DEFRAG_ERROR_WITH_KEY(desc, key, ...) \ - do \ - { \ - char saddr_str[INET6_ADDRSTRLEN] = {0}; \ - char daddr_str[INET6_ADDRSTRLEN] = {0}; \ - if ((key)->ip_version == 4) \ - { \ - inet_ntop(AF_INET, &(key)->saddr.v4, saddr_str, INET6_ADDRSTRLEN); \ - inet_ntop(AF_INET, &(key)->daddr.v4, daddr_str, INET6_ADDRSTRLEN); \ - } \ - else \ - { \ - inet_ntop(AF_INET6, &(key)->saddr.v6, saddr_str, INET6_ADDRSTRLEN); \ - inet_ntop(AF_INET6, &(key)->daddr.v6, daddr_str, INET6_ADDRSTRLEN); \ - } \ - IP_DEFRAG_LOG_ERROR("%s (%s-%s 0x%0x)", (desc), saddr_str, daddr_str, (key)->ip_id); \ +#define IP_DEFRAG_ERROR_WITH_KEY(desc, key, ...) \ + do \ + { \ + char saddr_str[INET6_ADDRSTRLEN] = {0}; \ + char daddr_str[INET6_ADDRSTRLEN] = {0}; \ + if ((key)->ip_version == 4) \ + { \ + inet_ntop(AF_INET, &(key)->saddr.v4, saddr_str, INET6_ADDRSTRLEN); \ + inet_ntop(AF_INET, &(key)->daddr.v4, daddr_str, INET6_ADDRSTRLEN); \ + } \ + else \ + { \ + inet_ntop(AF_INET6, &(key)->saddr.v6, saddr_str, INET6_ADDRSTRLEN); \ + inet_ntop(AF_INET6, &(key)->daddr.v6, daddr_str, INET6_ADDRSTRLEN); \ + } \ + IP_REASSEMBLY_LOG_ERROR("%s (%s-%s 0x%0x)", (desc), saddr_str, daddr_str, (key)->ip_id); \ } while (0) static int frag_key_init(struct frag_key *key, const struct packet *pkt) @@ -427,7 +427,7 @@ static struct packet *ip_reassembly_defrag_fq(struct ip_reassembly *ip_reass, st struct packet *pkt = packet_new(total_len); if (pkt == NULL) { - IP_DEFRAG_LOG_ERROR("unable to allocate memory"); + IP_REASSEMBLY_LOG_ERROR("unable to allocate memory"); // TODO stat goto error_out; @@ -569,7 +569,7 @@ struct ip_reassembly *ip_reassembly_new(uint64_t timeout_ms, uint64_t fq_num, ui struct ip_reassembly *ip_reass = (struct ip_reassembly *)calloc(1, sizeof(struct ip_reassembly)); if (ip_reass == NULL) { - IP_DEFRAG_LOG_ERROR("unable to allocate memory for ip_reassembly"); + IP_REASSEMBLY_LOG_ERROR("unable to allocate memory for ip_reassembly"); return NULL; } @@ -586,7 +586,7 @@ struct ip_reassembly *ip_reassembly_new(uint64_t timeout_ms, uint64_t fq_num, ui struct frag_queue *fq = (struct frag_queue *)calloc(1, sizeof(struct frag_queue) + sizeof(struct frag) * ip_reass->fq_size); if (fq == NULL) { - IP_DEFRAG_LOG_ERROR("unable to allocate memory for frag_queue"); + IP_REASSEMBLY_LOG_ERROR("unable to allocate memory for frag_queue"); goto error_out; } TAILQ_INSERT_TAIL(&ip_reass->free_list, fq, tqe); @@ -637,7 +637,7 @@ struct packet *ip_reassembly_defrag(struct ip_reassembly *ip_reass, struct packe struct frag_key key; if (frag_key_init(&key, pkt) != 0) { - IP_DEFRAG_LOG_ERROR("unable to init frag key"); + IP_REASSEMBLY_LOG_ERROR("unable to init frag key"); TAILQ_INSERT_TAIL(&ip_reass->evict_pkt, pkt, frag_tqe); return NULL; } @@ -645,7 +645,7 @@ struct packet *ip_reassembly_defrag(struct ip_reassembly *ip_reass, struct packe struct frag frag; if (frag_init(&frag, pkt) != 0) { - IP_DEFRAG_LOG_ERROR("unable to init frag"); + IP_REASSEMBLY_LOG_ERROR("unable to init frag"); TAILQ_INSERT_TAIL(&ip_reass->evict_pkt, pkt, frag_tqe); return NULL; } @@ -723,30 +723,30 @@ void ip_reassembly_print_stat(struct ip_reassembly *ip_reass) { if (ip_reass) { - IP_DEFRAG_LOG_INFO("ip_reass: %p, ip4_defrags_expected : %lu", ip_reass, ip_reass->stat.ip4_defrags_expected); - IP_DEFRAG_LOG_INFO("ip_reass: %p, ip4_defrags_succeed : %lu", ip_reass, ip_reass->stat.ip4_defrags_succeed); - IP_DEFRAG_LOG_INFO("ip_reass: %p, ip4_defrags_failed : %lu", ip_reass, ip_reass->stat.ip4_defrags_failed); + IP_REASSEMBLY_LOG_INFO("ip_reass: %p, ip4_defrags_expected : %lu", ip_reass, ip_reass->stat.ip4_defrags_expected); + IP_REASSEMBLY_LOG_INFO("ip_reass: %p, ip4_defrags_succeed : %lu", ip_reass, ip_reass->stat.ip4_defrags_succeed); + IP_REASSEMBLY_LOG_INFO("ip_reass: %p, ip4_defrags_failed : %lu", ip_reass, ip_reass->stat.ip4_defrags_failed); - IP_DEFRAG_LOG_INFO("ip_reass: %p, ip4_frags : %lu", ip_reass, ip_reass->stat.ip4_frags); - IP_DEFRAG_LOG_INFO("ip_reass: %p, ip4_frags_freed : %lu", ip_reass, ip_reass->stat.ip4_frags_freed); - IP_DEFRAG_LOG_INFO("ip_reass: %p, ip4_frags_buffered : %lu", ip_reass, ip_reass->stat.ip4_frags_buffered); - IP_DEFRAG_LOG_INFO("ip_reass: %p, ip4_frags_no_buffer : %lu", ip_reass, ip_reass->stat.ip4_frags_no_buffer); - IP_DEFRAG_LOG_INFO("ip_reass: %p, ip4_frags_timeout : %lu", ip_reass, ip_reass->stat.ip4_frags_timeout); - IP_DEFRAG_LOG_INFO("ip_reass: %p, ip4_frags_invalid_length : %lu", ip_reass, ip_reass->stat.ip4_frags_invalid_length); - IP_DEFRAG_LOG_INFO("ip_reass: %p, ip4_frags_overlap : %lu", ip_reass, ip_reass->stat.ip4_frags_overlap); - IP_DEFRAG_LOG_INFO("ip_reass: %p, ip4_frags_too_many : %lu", ip_reass, ip_reass->stat.ip4_frags_too_many); + IP_REASSEMBLY_LOG_INFO("ip_reass: %p, ip4_frags : %lu", ip_reass, ip_reass->stat.ip4_frags); + IP_REASSEMBLY_LOG_INFO("ip_reass: %p, ip4_frags_freed : %lu", ip_reass, ip_reass->stat.ip4_frags_freed); + IP_REASSEMBLY_LOG_INFO("ip_reass: %p, ip4_frags_buffered : %lu", ip_reass, ip_reass->stat.ip4_frags_buffered); + IP_REASSEMBLY_LOG_INFO("ip_reass: %p, ip4_frags_no_buffer : %lu", ip_reass, ip_reass->stat.ip4_frags_no_buffer); + IP_REASSEMBLY_LOG_INFO("ip_reass: %p, ip4_frags_timeout : %lu", ip_reass, ip_reass->stat.ip4_frags_timeout); + IP_REASSEMBLY_LOG_INFO("ip_reass: %p, ip4_frags_invalid_length : %lu", ip_reass, ip_reass->stat.ip4_frags_invalid_length); + IP_REASSEMBLY_LOG_INFO("ip_reass: %p, ip4_frags_overlap : %lu", ip_reass, ip_reass->stat.ip4_frags_overlap); + IP_REASSEMBLY_LOG_INFO("ip_reass: %p, ip4_frags_too_many : %lu", ip_reass, ip_reass->stat.ip4_frags_too_many); - IP_DEFRAG_LOG_INFO("ip_reass: %p, ip6_defrags_expected : %lu", ip_reass, ip_reass->stat.ip6_defrags_expected); - IP_DEFRAG_LOG_INFO("ip_reass: %p, ip6_defrags_succeed : %lu", ip_reass, ip_reass->stat.ip6_defrags_succeed); - IP_DEFRAG_LOG_INFO("ip_reass: %p, ip6_defrags_failed : %lu", ip_reass, ip_reass->stat.ip6_defrags_failed); + IP_REASSEMBLY_LOG_INFO("ip_reass: %p, ip6_defrags_expected : %lu", ip_reass, ip_reass->stat.ip6_defrags_expected); + IP_REASSEMBLY_LOG_INFO("ip_reass: %p, ip6_defrags_succeed : %lu", ip_reass, ip_reass->stat.ip6_defrags_succeed); + IP_REASSEMBLY_LOG_INFO("ip_reass: %p, ip6_defrags_failed : %lu", ip_reass, ip_reass->stat.ip6_defrags_failed); - IP_DEFRAG_LOG_INFO("ip_reass: %p, ip6_frags : %lu", ip_reass, ip_reass->stat.ip6_frags); - IP_DEFRAG_LOG_INFO("ip_reass: %p, ip6_frags_freed : %lu", ip_reass, ip_reass->stat.ip6_frags_freed); - IP_DEFRAG_LOG_INFO("ip_reass: %p, ip6_frags_buffered : %lu", ip_reass, ip_reass->stat.ip6_frags_buffered); - IP_DEFRAG_LOG_INFO("ip_reass: %p, ip6_frags_no_buffer : %lu", ip_reass, ip_reass->stat.ip6_frags_no_buffer); - IP_DEFRAG_LOG_INFO("ip_reass: %p, ip6_frags_timeout : %lu", ip_reass, ip_reass->stat.ip6_frags_timeout); - IP_DEFRAG_LOG_INFO("ip_reass: %p, ip6_frags_invalid_length : %lu", ip_reass, ip_reass->stat.ip6_frags_invalid_length); - IP_DEFRAG_LOG_INFO("ip_reass: %p, ip6_frags_overlap : %lu", ip_reass, ip_reass->stat.ip6_frags_overlap); - IP_DEFRAG_LOG_INFO("ip_reass: %p, ip6_frags_too_many : %lu", ip_reass, ip_reass->stat.ip6_frags_too_many); + IP_REASSEMBLY_LOG_INFO("ip_reass: %p, ip6_frags : %lu", ip_reass, ip_reass->stat.ip6_frags); + IP_REASSEMBLY_LOG_INFO("ip_reass: %p, ip6_frags_freed : %lu", ip_reass, ip_reass->stat.ip6_frags_freed); + IP_REASSEMBLY_LOG_INFO("ip_reass: %p, ip6_frags_buffered : %lu", ip_reass, ip_reass->stat.ip6_frags_buffered); + IP_REASSEMBLY_LOG_INFO("ip_reass: %p, ip6_frags_no_buffer : %lu", ip_reass, ip_reass->stat.ip6_frags_no_buffer); + IP_REASSEMBLY_LOG_INFO("ip_reass: %p, ip6_frags_timeout : %lu", ip_reass, ip_reass->stat.ip6_frags_timeout); + IP_REASSEMBLY_LOG_INFO("ip_reass: %p, ip6_frags_invalid_length : %lu", ip_reass, ip_reass->stat.ip6_frags_invalid_length); + IP_REASSEMBLY_LOG_INFO("ip_reass: %p, ip6_frags_overlap : %lu", ip_reass, ip_reass->stat.ip6_frags_overlap); + IP_REASSEMBLY_LOG_INFO("ip_reass: %p, ip6_frags_too_many : %lu", ip_reass, ip_reass->stat.ip6_frags_too_many); } } diff --git a/infra/packet_io/mars_io.c b/infra/packet_io/mars_io.c index dfd1f49..0056184 100644 --- a/infra/packet_io/mars_io.c +++ b/infra/packet_io/mars_io.c @@ -10,14 +10,14 @@ #include "utils_internal.h" #include "packet_internal.h" -#define MARS_IO_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "mars io", format, ##__VA_ARGS__) -#define MARS_IO_LOG_INFO(format, ...) STELLAR_LOG_INFO(__thread_local_logger, "mars io", format, ##__VA_ARGS__) +#define MARS_IO_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "mars IO", format, ##__VA_ARGS__) +#define MARS_IO_LOG_INFO(format, ...) STELLAR_LOG_INFO(__thread_local_logger, "mars IO", format, ##__VA_ARGS__) struct mars_io_cfg { char app_symbol[64]; char dev_symbol[64]; - uint64_t thread_num; // range [1, MAX_THREAD_NUM] + uint16_t thread_num; // range [1, MAX_THREAD_NUM] uint64_t cpu_mask[MAX_THREAD_NUM]; uint64_t idle_yield_ms; // range: [0, 6000] (ms) @@ -59,7 +59,7 @@ static struct mars_io_cfg *mars_io_cfg_new(const char *toml_file) int num = 0; ret += load_toml_str_config(toml_file, "packet_io.app_symbol", cfg->app_symbol); ret += load_toml_str_config(toml_file, "packet_io.dev_symbol", cfg->dev_symbol); - ret += load_toml_integer_config(toml_file, "packet_io.thread_num", &cfg->thread_num, 1, MAX_THREAD_NUM); + ret += load_toml_integer_config(toml_file, "packet_io.thread_num", (uint64_t *)&cfg->thread_num, 1, MAX_THREAD_NUM); ret += load_toml_integer_config(toml_file, "packet_io.idle_yield_ms", &cfg->idle_yield_ms, 0, 60000); num = load_toml_array_config(toml_file, "packet_io.cpu_mask", cfg->cpu_mask, MAX_THREAD_NUM); ret += load_toml_integer_config(toml_file, "packet_io.packet_pool.capacity", &cfg->capacity, 1, 4294967295); @@ -255,11 +255,13 @@ static void origin_free_cb(struct packet *pkt, void *args) struct packet_origin *origin = packet_get_origin(pkt); marsio_buff_t *mbuff = (marsio_buff_t *)origin->ctx; struct packet_io_stat *stat = &mars_io->stat[origin->thr_idx]; + struct packet_pool *pool = mars_io->pool[origin->thr_idx]; stat->pkts_user_freed++; stat->bytes_user_freed += packet_get_raw_len(pkt); marsio_buff_free(mars_io->mr_ins, &mbuff, 1, 0, origin->thr_idx); + packet_pool_push(pool, pkt); } static struct packet *recv_packet(struct mars_io *mars_io, marsio_buff_t *mbuff, uint16_t thr_idx) diff --git a/infra/packet_io/packet_io.c b/infra/packet_io/packet_io.c index d0efc32..9210927 100644 --- a/infra/packet_io/packet_io.c +++ b/infra/packet_io/packet_io.c @@ -1,11 +1,7 @@ #include "pcap_io.h" #include "mars_io.h" -#include "log_internal.h" #include "utils_internal.h" -#define PACKET_IO_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "packet io", format, ##__VA_ARGS__) -#define PACKET_IO_LOG_INFO(format, ...) STELLAR_LOG_INFO(__thread_local_logger, "packet io", format, ##__VA_ARGS__) - struct packet_io { void *handle; diff --git a/infra/packet_io/pcap_io.c b/infra/packet_io/pcap_io.c index 139dc70..ba294f9 100644 --- a/infra/packet_io/pcap_io.c +++ b/infra/packet_io/pcap_io.c @@ -13,9 +13,9 @@ #include "packet_internal.h" #include "utils_internal.h" -#define PCAP_IO_LOG_FATAL(format, ...) STELLAR_LOG_FATAL(__thread_local_logger, "pcap io", format, ##__VA_ARGS__) -#define PCAP_IO_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "pcap io", format, ##__VA_ARGS__) -#define PCAP_IO_LOG_INFO(format, ...) STELLAR_LOG_INFO(__thread_local_logger, "pcap io", format, ##__VA_ARGS__) +#define PCAP_IO_LOG_FATAL(format, ...) STELLAR_LOG_FATAL(__thread_local_logger, "pcap IO", format, ##__VA_ARGS__) +#define PCAP_IO_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "pcap IO", format, ##__VA_ARGS__) +#define PCAP_IO_LOG_INFO(format, ...) STELLAR_LOG_INFO(__thread_local_logger, "pcap IO", format, ##__VA_ARGS__) #define RING_BUFFER_MAX_SIZE (4096 * 1000) @@ -30,7 +30,7 @@ struct pcap_io_cfg { char mode[16]; // pcapfile, pcaplist char pcap_path[PATH_MAX]; - uint64_t thread_num; // range [1, MAX_THREAD_NUM] + uint16_t thread_num; // range [1, MAX_THREAD_NUM] // packet pool uint64_t capacity; // range: [1, 4294967295] @@ -151,7 +151,7 @@ static struct pcap_io_cfg *pcap_io_cfg_new(const char *toml_file) int ret = 0; ret += load_toml_str_config(toml_file, "packet_io.mode", cfg->mode); ret += load_toml_str_config(toml_file, "packet_io.pcap_path", cfg->pcap_path); - ret += load_toml_integer_config(toml_file, "packet_io.thread_num", &cfg->thread_num, 1, MAX_THREAD_NUM); + ret += load_toml_integer_config(toml_file, "packet_io.thread_num", (uint64_t *)&cfg->thread_num, 1, MAX_THREAD_NUM); ret += load_toml_integer_config(toml_file, "packet_io.packet_pool.capacity", &cfg->capacity, 1, 4294967295); ret += load_toml_integer_config(toml_file, "packet_io.ip_reassembly.fail_action", &cfg->fail_action, 0, 1); ret += load_toml_integer_config(toml_file, "packet_io.ip_reassembly.timeout_ms", &cfg->timeout_ms, 1, 60000); @@ -358,11 +358,13 @@ static void origin_free_cb(struct packet *pkt, void *args) struct packet_origin *origin = packet_get_origin(pkt); struct pcap_pkt *pcap_pkt = origin->ctx; struct packet_io_stat *stat = &pcap_io->stat[origin->thr_idx]; + struct packet_pool *pool = pcap_io->pool[origin->thr_idx]; stat->pkts_user_freed++; stat->bytes_user_freed += packet_get_raw_len(pkt); free(pcap_pkt); + packet_pool_push(pool, pkt); } static struct packet *recv_packet(struct pcap_io *pcap_io, struct pcap_pkt *pcap_pkt, uint16_t thr_idx) diff --git a/infra/packet_manager/packet_manager.c b/infra/packet_manager/packet_manager.c index 1f8c678..e139955 100644 --- a/infra/packet_manager/packet_manager.c +++ b/infra/packet_manager/packet_manager.c @@ -31,7 +31,7 @@ struct packet_manager_schema struct packet_manager { - uint64_t thread_num; + uint16_t thread_num; struct packet_manager_schema *schema; struct packet_manager_runtime *runtime[MAX_THREAD_NUM]; }; @@ -73,7 +73,7 @@ void packet_manager_runtime_free(struct packet_manager_runtime *pkt_mgr_rt) pkt_mgr_rt = NULL; } -struct packet_manager_runtime *packet_manager_runtime_new() +struct packet_manager_runtime *packet_manager_runtime_new(struct mq_runtime *mq_rt) { struct packet_manager_runtime *pkt_mgr_rt = calloc(1, sizeof(struct packet_manager_runtime)); if (pkt_mgr_rt == NULL) @@ -86,6 +86,7 @@ struct packet_manager_runtime *packet_manager_runtime_new() { TAILQ_INIT(&pkt_mgr_rt->queue[i]); } + pkt_mgr_rt->mq = mq_rt; return pkt_mgr_rt; } @@ -178,7 +179,7 @@ error_out: * packet manager ******************************************************************************/ -struct packet_manager *packet_manager_new(struct mq_schema *mq_schema, uint64_t thread_num) +struct packet_manager *packet_manager_new(struct mq_schema *mq_schema, uint16_t thread_num) { struct packet_manager *pkt_mgr = calloc(1, sizeof(struct packet_manager)); if (pkt_mgr == NULL) @@ -195,16 +196,6 @@ struct packet_manager *packet_manager_new(struct mq_schema *mq_schema, uint64_t goto error_out; } - for (uint16_t i = 0; i < pkt_mgr->thread_num; i++) - { - pkt_mgr->runtime[i] = packet_manager_runtime_new(); - if (pkt_mgr->runtime[i] == NULL) - { - PACKET_MANAGER_LOG_ERROR("failed to create packet_manager_runtime"); - goto error_out; - } - } - return pkt_mgr; error_out: @@ -214,21 +205,8 @@ error_out: void packet_manager_free(struct packet_manager *pkt_mgr) { - struct packet_manager_runtime *pkt_mgr_rt = NULL; - if (pkt_mgr) { - - for (uint16_t i = 0; i < pkt_mgr->thread_num; i++) - { - pkt_mgr_rt = pkt_mgr->runtime[i]; - if (pkt_mgr_rt) - { - packet_manager_print_stat(pkt_mgr, i); - packet_manager_runtime_free(pkt_mgr_rt); - } - } - packet_schema_free(pkt_mgr->schema); free(pkt_mgr); @@ -253,11 +231,30 @@ int packet_manager_init(struct packet_manager *pkt_mgr, uint16_t thread_id, stru assert(pkt_mgr); assert(thread_id < pkt_mgr->thread_num); assert(mq_rt); - struct packet_manager_runtime *runtime = pkt_mgr->runtime[thread_id]; - runtime->mq = mq_rt; + struct packet_manager_runtime *pkt_mgr_rt = packet_manager_runtime_new(mq_rt); + if (pkt_mgr_rt == NULL) + { + PACKET_MANAGER_LOG_ERROR("failed to create packet_manager_runtime"); + return -1; + } + else + { + pkt_mgr->runtime[thread_id] = pkt_mgr_rt; + return 0; + } +} - return 0; +void packet_manager_clean(struct packet_manager *pkt_mgr, uint16_t thread_id) +{ + assert(pkt_mgr); + assert(thread_id < pkt_mgr->thread_num); + + struct packet_manager_runtime *pkt_mgr_rt = pkt_mgr->runtime[thread_id]; + PACKET_MANAGER_LOG_INFO("runtime: %p, idx: %d, will be cleaned", pkt_mgr_rt, thread_id); + packet_manager_print_stat(pkt_mgr, thread_id); + packet_manager_runtime_free(pkt_mgr_rt); + pkt_mgr_rt = NULL; } void packet_manager_ingress(struct packet_manager *pkt_mgr, uint16_t thread_id, struct packet *pkt) @@ -412,7 +409,7 @@ struct stellar_module *packet_manager_on_init(struct stellar_module_manager *mod assert(mod_mgr); struct mq_schema *mq_schema = stellar_module_manager_get_mq_schema(mod_mgr); assert(mq_schema); - uint64_t thread_num = stellar_module_manager_get_max_thread_num(mod_mgr); + uint16_t thread_num = stellar_module_manager_get_max_thread_num(mod_mgr); struct packet_manager *pkt_mgr = packet_manager_new(mq_schema, thread_num); if (pkt_mgr == NULL) @@ -443,4 +440,32 @@ void packet_manager_on_exit(struct stellar_module_manager *mod_mgr __attribute__ stellar_module_free(mod); PACKET_MANAGER_LOG_FATAL("packet_manager exited"); } +} + +struct stellar_module *packet_manager_on_thread_init(struct stellar_module_manager *mod_mgr, int thread_id, struct stellar_module *mod) +{ + struct packet_manager *pkt_mgr = stellar_module_get_ctx(mod); + assert(pkt_mgr); + struct mq_runtime *mq_rt = stellar_module_manager_get_mq_runtime(mod_mgr); + assert(mq_rt); + assert(thread_id < pkt_mgr->thread_num); + + if (packet_manager_init(pkt_mgr, thread_id, mq_rt) != 0) + { + PACKET_MANAGER_LOG_ERROR("failed to init packet_manager_init"); + return NULL; + } + else + { + return mod; + } +} + +void packet_manager_on_thread_exit(struct stellar_module_manager *mod_mgr __attribute__((unused)), int thread_id, struct stellar_module *mod) +{ + struct packet_manager *pkt_mgr = stellar_module_get_ctx(mod); + assert(pkt_mgr); + assert(thread_id < pkt_mgr->thread_num); + + packet_manager_clean(pkt_mgr, thread_id); } \ No newline at end of file diff --git a/infra/packet_manager/packet_manager_internal.h b/infra/packet_manager/packet_manager_internal.h index d001770..dc7a979 100644 --- a/infra/packet_manager/packet_manager_internal.h +++ b/infra/packet_manager/packet_manager_internal.h @@ -26,10 +26,11 @@ struct packet_manager_stat } queue[PACKET_QUEUE_MAX]; // the last queue is for sending packets }; -struct packet_manager *packet_manager_new(struct mq_schema *mq_schema, uint64_t thread_num); +struct packet_manager *packet_manager_new(struct mq_schema *mq_schema, uint16_t thread_num); void packet_manager_free(struct packet_manager *pkt_mgr); int packet_manager_init(struct packet_manager *pkt_mgr, uint16_t thread_id, struct mq_runtime *mq_rt); +void packet_manager_clean(struct packet_manager *pkt_mgr, uint16_t thread_id); void packet_manager_ingress(struct packet_manager *pkt_mgr, uint16_t thread_id, struct packet *pkt); struct packet *packet_manager_egress(struct packet_manager *pkt_mgr, uint16_t thread_id); void packet_manager_dispatch(struct packet_manager *pkt_mgr, uint16_t thread_id); diff --git a/infra/packet_manager/test/gtest_packet_manager.cpp b/infra/packet_manager/test/gtest_packet_manager.cpp index d186e45..e447c32 100644 --- a/infra/packet_manager/test/gtest_packet_manager.cpp +++ b/infra/packet_manager/test/gtest_packet_manager.cpp @@ -156,6 +156,7 @@ TEST(PACKET_MANAGER, SUBSCRIBER_PACKET_STAGE) check_stat(curr_stat, &expect_stat); // per-thread free + packet_manager_clean(pkt_mgr, thread_id); // module free packet_manager_free(pkt_mgr); @@ -258,6 +259,7 @@ TEST(PACKET_MANAGER, CLAIM_PACKET) check_stat(curr_stat, &expect_stat); // per-thread free + packet_manager_clean(pkt_mgr, thread_id); // module free packet_manager_free(pkt_mgr); @@ -340,6 +342,7 @@ TEST(PACKET_MANAGER, SCHEDULE_PACKET) check_stat(curr_stat, &expect_stat); // per-thread free + packet_manager_clean(pkt_mgr, thread_id); // module free packet_manager_free(pkt_mgr); @@ -433,6 +436,7 @@ TEST(PACKET_MANAGER, SCHEDULE_CLAIMED_PACKET) check_stat(curr_stat, &expect_stat); // per-thread free + packet_manager_clean(pkt_mgr, thread_id); // module free packet_manager_free(pkt_mgr); diff --git a/infra/session_manager/session_manager.c b/infra/session_manager/session_manager.c index 4b2aed9..a6b7929 100644 --- a/infra/session_manager/session_manager.c +++ b/infra/session_manager/session_manager.c @@ -30,7 +30,6 @@ struct session_manager_schema struct session_manager { - uint16_t thread_num; struct session_manager_config *cfg; struct session_manager_schema *schema; struct session_manager_runtime *runtime[MAX_THREAD_NUM]; @@ -301,30 +300,8 @@ error_out: void session_manager_free(struct session_manager *sess_mgr) { - struct session_manager_stat *stat = NULL; - struct session_manager_runtime *sess_mgr_rt = NULL; - if (sess_mgr) { - for (int i = 0; i < sess_mgr->thread_num; i++) - { - sess_mgr_rt = sess_mgr->runtime[i]; - if (sess_mgr_rt == NULL) - { - continue; - } - - stat = session_manager_runtime_get_stat(sess_mgr_rt); - while (stat->tcp_sess_used || stat->udp_sess_used) - { - clean_session(sess_mgr_rt, UINT64_MAX); - } - - SESSION_MANAGER_LOG_INFO("runtime: %p, idx: %d, will be cleaned", sess_mgr_rt, i); - session_manager_runtime_print_stat(sess_mgr_rt); - session_manager_runtime_free(sess_mgr->runtime[i]); - } - session_manager_schema_free(sess_mgr->schema); session_manager_config_free(sess_mgr->cfg); free(sess_mgr); @@ -333,20 +310,6 @@ void session_manager_free(struct session_manager *sess_mgr) struct session_manager *session_manager_new(struct stellar_module_manager *mod_mgr, struct packet_manager *pkt_mgr, struct mq_schema *mq_schema, const char *toml_file) { - - uint64_t thread_num; - uint64_t instance_id; - uint64_t now_ms = clock_get_real_time_ms(); - - if (load_toml_integer_config(toml_file, "instance.id", (uint64_t *)&instance_id, 0, 4095)) - { - return NULL; - } - if (load_toml_integer_config(toml_file, "packet_io.thread_num", (uint64_t *)&thread_num, 0, MAX_THREAD_NUM)) - { - return NULL; - } - struct session_manager *sess_mgr = calloc(1, sizeof(struct session_manager)); if (sess_mgr == NULL) { @@ -368,18 +331,6 @@ struct session_manager *session_manager_new(struct stellar_module_manager *mod_m goto error_out; } - sess_mgr->thread_num = (uint16_t)thread_num; - for (int i = 0; i < sess_mgr->thread_num; i++) - { - sess_mgr->cfg->session_id_seed = instance_id << 8 | i; - sess_mgr->runtime[i] = session_manager_runtime_new(sess_mgr->cfg, now_ms); - if (sess_mgr->runtime[i] == NULL) - { - SESSION_MANAGER_LOG_ERROR("failed to create session_manager_runtime"); - goto error_out; - } - } - stellar_module_manager_polling_subscribe(mod_mgr, on_polling, sess_mgr); return sess_mgr; @@ -429,6 +380,49 @@ int session_manager_subscribe_tcp_stream(struct session_manager *sess_mgr, on_tc return mq_schema_subscribe(sess_mgr->schema->mq, sess_mgr->schema->topic_id_tcp_stream, (on_msg_cb_func *)(void *)cb, args); } +int session_manager_init(struct session_manager *sess_mgr, uint16_t thread_id) +{ + assert(sess_mgr); + assert(thread_id < sess_mgr->cfg->thread_num); + uint64_t now_ms = clock_get_real_time_ms(); + + sess_mgr->cfg->session_id_seed = sess_mgr->cfg->instance_id << 8 | thread_id; + struct session_manager_runtime *sess_mgr_rt = session_manager_runtime_new(sess_mgr->cfg, now_ms); + if (sess_mgr_rt == NULL) + { + SESSION_MANAGER_LOG_ERROR("failed to create session_manager_runtime"); + return -1; + } + else + { + sess_mgr->runtime[thread_id] = sess_mgr_rt; + return 0; + } +} + +void session_manager_clean(struct session_manager *sess_mgr, uint16_t thread_id) +{ + assert(sess_mgr); + assert(thread_id < sess_mgr->cfg->thread_num); + + struct session_manager_runtime *sess_mgr_rt = sess_mgr->runtime[thread_id]; + if (sess_mgr_rt == NULL) + { + return; + } + + struct session_manager_stat *stat = session_manager_runtime_get_stat(sess_mgr_rt); + while (stat->tcp_sess_used || stat->udp_sess_used) + { + clean_session(sess_mgr_rt, UINT64_MAX); + } + + SESSION_MANAGER_LOG_INFO("runtime: %p, idx: %d, will be cleaned", sess_mgr_rt, thread_id); + session_manager_runtime_print_stat(sess_mgr_rt); + session_manager_runtime_free(sess_mgr_rt); + sess_mgr_rt = NULL; +} + /****************************************************************************** * session manager module ******************************************************************************/ @@ -483,4 +477,30 @@ void session_manager_on_exit(struct stellar_module_manager *mod_mgr __attribute_ stellar_module_free(mod); SESSION_MANAGER_LOG_FATAL("session_manager exited"); } +} + +struct stellar_module *session_manager_on_thread_init(struct stellar_module_manager *mod_mgr, int thread_id, struct stellar_module *mod) +{ + struct session_manager *sess_mgr = stellar_module_get_ctx(mod); + assert(sess_mgr); + assert(thread_id < sess_mgr->cfg->thread_num); + + if (session_manager_init(sess_mgr, thread_id) != 0) + { + SESSION_MANAGER_LOG_ERROR("failed to int session_manager_init"); + return NULL; + } + else + { + return mod; + } +} + +void session_manager_on_thread_exit(struct stellar_module_manager *mod_mgr __attribute__((unused)), int thread_id, struct stellar_module *mod) +{ + struct session_manager *sess_mgr = stellar_module_get_ctx(mod); + assert(sess_mgr); + assert(thread_id < sess_mgr->cfg->thread_num); + + session_manager_clean(sess_mgr, thread_id); } \ No newline at end of file diff --git a/infra/session_manager/session_manager_runtime.c b/infra/session_manager/session_manager_runtime.c index b2aa250..2434c19 100644 --- a/infra/session_manager/session_manager_runtime.c +++ b/infra/session_manager/session_manager_runtime.c @@ -481,6 +481,8 @@ struct session_manager_config *session_manager_config_new(const char *toml_file) } int ret = 0; + ret += load_toml_integer_config(toml_file, "instance.id", (uint64_t *)&sess_mgr_cfg->instance_id, 0, 4095); + ret += load_toml_integer_config(toml_file, "packet_io.thread_num", (uint64_t *)&sess_mgr_cfg->thread_num, 0, MAX_THREAD_NUM); ret += load_toml_integer_config(toml_file, "session_manager.tcp_session_max", (uint64_t *)&sess_mgr_cfg->tcp_session_max, EVICTE_SESSION_BURST * 2, UINT64_MAX); ret += load_toml_integer_config(toml_file, "session_manager.udp_session_max", (uint64_t *)&sess_mgr_cfg->udp_session_max, EVICTE_SESSION_BURST * 2, UINT64_MAX); diff --git a/infra/session_manager/session_manager_runtime.h b/infra/session_manager/session_manager_runtime.h index c72c959..4178e4c 100644 --- a/infra/session_manager/session_manager_runtime.h +++ b/infra/session_manager/session_manager_runtime.h @@ -15,6 +15,8 @@ extern "C" struct session_manager_config { + uint64_t instance_id; + uint16_t thread_num; uint64_t session_id_seed; uint64_t tcp_session_max; uint64_t udp_session_max; diff --git a/infra/session_manager/test/default_config.h b/infra/session_manager/test/default_config.h index f8ce5fd..f6d8af6 100644 --- a/infra/session_manager/test/default_config.h +++ b/infra/session_manager/test/default_config.h @@ -10,6 +10,8 @@ extern "C" #include "session_manager_runtime.h" static struct session_manager_config sess_mgr_cfg = { + .instance_id = 1, + .thread_num = 1, .session_id_seed = 0xFFFFF, .tcp_session_max = 256, .udp_session_max = 256, diff --git a/infra/stellar_core.c b/infra/stellar_core.c index cafc537..06e91de 100644 --- a/infra/stellar_core.c +++ b/infra/stellar_core.c @@ -7,8 +7,8 @@ #include "packet_io.h" #include "log_internal.h" -#include "packet_internal.h" #include "utils_internal.h" +#include "packet_internal.h" #include "packet_manager_internal.h" #include "module_manager_interna.h" @@ -61,12 +61,6 @@ static void *worker_thread(void *arg) __thread_local_logger = st->logger; stellar_module_manager_register_thread(mod_mgr, thread_id, mq_rt); - if (packet_manager_init(pkt_mgr, thread_id, mq_rt) != 0) - { - CORE_LOG_ERROR("unable to init packet manager"); - return NULL; - } - ATOMIC_SET(&thread->is_runing, 1); CORE_LOG_FATAL("worker thread %d runing", thread_id); @@ -231,8 +225,6 @@ void stellar_free(struct stellar *st) { if (st) { - stellar_thread_join(st); - packet_io_free(st->pkt_io); stellar_module_manager_free(st->mod_mgr); mq_schema_free(st->mq_schema); diff --git a/infra/tcp_reassembly/tcp_reassembly.c b/infra/tcp_reassembly/tcp_reassembly.c index 3eb7f18..afc42ab 100644 --- a/infra/tcp_reassembly/tcp_reassembly.c +++ b/infra/tcp_reassembly/tcp_reassembly.c @@ -7,8 +7,8 @@ #include "tcp_reassembly.h" #include "stellar/stellar.h" -#define TCP_REASSEMBLY_LOG_DEBUG(format, ...) STELLAR_LOG_DEBUG(__thread_local_logger, "tcp_reassembly", format, ##__VA_ARGS__) -#define TCP_REASSEMBLY_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "tcp_reassembly", format, ##__VA_ARGS__) +#define TCP_REASSEMBLY_LOG_DEBUG(format, ...) STELLAR_LOG_DEBUG(__thread_local_logger, "TCP reassembly", format, ##__VA_ARGS__) +#define TCP_REASSEMBLY_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "TCP reassembly", format, ##__VA_ARGS__) struct tcp_segment_private { diff --git a/infra/version.map b/infra/version.map index 8032386..57eb3d9 100644 --- a/infra/version.map +++ b/infra/version.map @@ -52,11 +52,10 @@ global: log_print; log_check_level; - polling_manager_on_init; - polling_manager_on_exit; - packet_manager_on_init; packet_manager_on_exit; + packet_manager_on_thread_init; + packet_manager_on_thread_exit; packet_manager_new_packet_exdata_index; packet_manager_subscribe; packet_manager_claim_packet; @@ -64,6 +63,8 @@ global: session_manager_on_init; session_manager_on_exit; + session_manager_on_thread_init; + session_manager_on_thread_exit; session_manager_new_session_exdata_index; session_manager_subscribe_tcp; session_manager_subscribe_udp; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 7cf7e86..a402bf4 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -1,8 +1,6 @@ #add_subdirectory(packet_inject) add_subdirectory(packet_tool) add_subdirectory(session_debugger) -#add_subdirectory(lpi_plugin) -#add_subdirectory(debug_plugin) add_subdirectory(lpi_plus) #add_subdirectory(decoders/http) #add_subdirectory(decoders/socks)