#include #include #include #include #include #include #include #include #include #include "log.h" #include "utils.h" #include "packet_io.h" #include "packet_private.h" #include "session_private.h" #include "snowflake.h" #include "stellar_stat.h" #include "stellar_core.h" #include "stellar_config.h" #include "plugin_manager.h" #include "session_manager.h" #define STELLAR_LOG_FATAL(format, ...) LOG_FATAL("stellar", format, ##__VA_ARGS__) #define STELLAR_LOG_ERROR(format, ...) LOG_ERROR("stellar", format, ##__VA_ARGS__) #define STELLAR_LOG_DEBUG(format, ...) LOG_DEBUG("stellar", format, ##__VA_ARGS__) #ifdef STELLAR_GIT_VERSION static __attribute__((__used__)) const char *version = STELLAR_GIT_VERSION; #else static __attribute__((__used__)) const char *version = "Unknown"; #endif static const char logo_str[] = " _ _ _\n" " ___ | |_ ___ | | | | __ _ _ __\n" " / __| | __| / _ \\ | | | | / _` | | '__|\n" " \\__ \\ | |_ | __/ | | | | | (_| | | |\n" " |___/ \\__| \\___| |_| |_| \\__,_| |_|\n"; struct schedule_data { uint64_t last_free_expired_session_timestamp; uint64_t last_free_expired_ip_frag_timestamp; uint64_t last_merge_thread_stat_timestamp; uint64_t free_expired_session_interval; uint64_t free_expired_session_batch; uint64_t free_expired_ip_frag_interval; uint64_t free_expired_ip_frag_batch; uint64_t merge_stat_interval; uint64_t packet_io_yield_interval; }; struct stellar_thread { pthread_t tid; uint16_t idx; uint64_t is_runing; struct schedule_data sched_data; struct ip_reassembly *ip_mgr; struct session_manager *sess_mgr; struct stellar *st; }; struct stellar_runtime { uint64_t need_exit; uint64_t stat_last_output_ts; struct stellar_stat *stat; struct packet_io *packet_io; struct plugin_manager_schema *plug_mgr; struct stellar_thread threads[MAX_THREAD_NUM]; }; struct stellar { char stellar_cfg_file[PATH_MAX]; char plugin_cfg_file[PATH_MAX]; char log_cfg_file[PATH_MAX]; struct stellar_runtime runtime; struct stellar_config config; }; static thread_local uint16_t __thread_id = 0; // TODO /****************************************************************************** * Stellar Thread Main Loop ******************************************************************************/ static void update_session_stat(struct session *sess, struct packet *pkt) { if (sess) { enum flow_direction dir = session_get_current_flow_direction(sess); assert(dir != FLOW_DIRECTION_NONE); int is_ctrl = packet_is_ctrl(pkt); uint16_t len = packet_get_raw_len(pkt); switch (packet_get_action(pkt)) { case PACKET_ACTION_DROP: session_inc_stat(sess, dir, (is_ctrl ? STAT_CONTROL_PACKETS_DROPPED : STAT_RAW_PACKETS_DROPPED), 1); session_inc_stat(sess, dir, (is_ctrl ? STAT_CONTROL_BYTES_DROPPED : STAT_RAW_BYTES_DROPPED), len); break; case PACKET_ACTION_FORWARD: session_inc_stat(sess, dir, (is_ctrl ? STAT_CONTROL_PACKETS_TRANSMITTED : STAT_RAW_PACKETS_TRANSMITTED), 1); session_inc_stat(sess, dir, (is_ctrl ? STAT_CONTROL_BYTES_TRANSMITTED : STAT_RAW_BYTES_TRANSMITTED), len); break; default: assert(0); break; } session_set_current_packet(sess, NULL); session_set_current_flow_direction(sess, FLOW_DIRECTION_NONE); } } static inline void free_evicted_sessions(struct session_manager *sess_mgr, uint64_t max_free) { void *plugin_ctx = NULL; struct session *sess = NULL; for (uint64_t i = 0; i < max_free; i++) { sess = session_manager_get_evicted_session(sess_mgr); if (sess) { plugin_ctx = session_get_user_data(sess); plugin_manager_on_session_closing(sess); plugin_manager_session_runtime_free((struct plugin_manager_runtime *)plugin_ctx); session_manager_free_session(sess_mgr, sess); } else { break; } } } static inline void free_expired_sessions(struct session_manager *sess_mgr, uint64_t max_free, uint64_t now_ms) { void *plugin_ctx = NULL; struct session *sess = NULL; for (uint64_t i = 0; i < max_free; i++) { sess = session_manager_get_expired_session(sess_mgr, now_ms); if (sess) { plugin_ctx = session_get_user_data(sess); plugin_manager_on_session_closing(sess); plugin_manager_session_runtime_free((struct plugin_manager_runtime *)plugin_ctx); session_manager_free_session(sess_mgr, sess); } else { break; } } } static inline void merge_thread_stat(struct stellar_thread *thread) { struct stellar *st = thread->st; struct stellar_runtime *runtime = &st->runtime; struct thread_stat thr_stat = { .packet_io = packet_io_stat(runtime->packet_io, thread->idx), .ip_reassembly = ip_reassembly_stat(thread->ip_mgr), .session_mgr = session_manager_stat(thread->sess_mgr), }; stellar_stat_merge(runtime->stat, &thr_stat, thread->idx); } static void *work_thread(void *arg) { int nr_recv; uint64_t now_ms = 0; char thd_name[16] = {0}; void *plugin_ctx = NULL; struct packet *pkt = NULL; struct packet *defraged_pkt = NULL; struct packet packets[RX_BURST_MAX]; struct session *sess = NULL; struct stellar_thread *thread = (struct stellar_thread *)arg; struct ip_reassembly *ip_reass = thread->ip_mgr; struct session_manager *sess_mgr = thread->sess_mgr; struct stellar *st = thread->st; struct stellar_runtime *runtime = &st->runtime; struct schedule_data *sched_data = &thread->sched_data; struct packet_io *packet_io = runtime->packet_io; struct plugin_manager_schema *plug_mgr = runtime->plug_mgr; uint16_t thr_idx = thread->idx; __thread_id = thr_idx; memset(packets, 0, sizeof(packets)); for (int i = 0; i < RX_BURST_MAX; i++) { packet_set_user_data(&packets[i], (void *)plug_mgr); } snprintf(thd_name, sizeof(thd_name), "stellar:%d", thr_idx); prctl(PR_SET_NAME, (unsigned long long)thd_name, NULL, NULL, NULL); if (packet_io_init(packet_io, thr_idx) != 0) { STELLAR_LOG_ERROR("unable to init marsio thread"); return NULL; } ATOMIC_SET(&thread->is_runing, 1); STELLAR_LOG_FATAL("worker thread %d runing", thr_idx); while (ATOMIC_READ(&runtime->need_exit) == 0) { /* * We use the system's real time instead of monotonic time for the following reasons: * -> Session creation/closure times require real time (e.g., for logging session activities). * -> Session ID generation relies on real time (e.g., for reverse calculating session creation time from the session ID). * * Note: Modifying the system time will affect the timing wheel, impacting session expiration, IP reassembly expiration, and TCP reassembly expiration. * Suggestion: After modifying the system time, restart the service to ensure consistent timing. */ now_ms = clock_get_real_time_ms(); nr_recv = packet_io_ingress(packet_io, thr_idx, packets, RX_BURST_MAX); if (nr_recv == 0) { goto idle_tasks; } for (int i = 0; i < nr_recv; i++) { sess = NULL; defraged_pkt = NULL; pkt = &packets[i]; plugin_manager_on_packet_ingress(plug_mgr, pkt); if (packet_is_fragment(pkt)) { defraged_pkt = ip_reassembly_packet(ip_reass, pkt, now_ms); if (defraged_pkt == NULL) { goto fast_path; } else { pkt = defraged_pkt; plugin_manager_on_packet_ingress(plug_mgr, defraged_pkt); } } sess = session_manager_lookup_session_by_packet(sess_mgr, pkt); if (sess == NULL) { sess = session_manager_new_session(sess_mgr, pkt, now_ms); if (sess == NULL) { goto fast_path; } plugin_ctx = plugin_manager_session_runtime_new(plug_mgr, sess); session_set_user_data(sess, plugin_ctx); } else { if (session_manager_update_session(sess_mgr, sess, pkt, now_ms) == -1) { goto fast_path; } } if (packet_get_session_id(pkt) == 0) { packet_set_session_id(pkt, session_get_id(sess)); } plugin_manager_on_session_ingress(sess, pkt); fast_path: plugin_manager_on_session_egress(sess, pkt); if (pkt == defraged_pkt) { plugin_manager_on_packet_egress(plug_mgr, defraged_pkt); plugin_manager_on_packet_egress(plug_mgr, &packets[i]); } else { plugin_manager_on_packet_egress(plug_mgr, pkt); } if (sess && session_get_current_state(sess) == SESSION_STATE_DISCARD) { packet_set_action(pkt, PACKET_ACTION_DROP); } update_session_stat(sess, pkt); if (packet_get_action(pkt) == PACKET_ACTION_DROP) { if (pkt == defraged_pkt) { packet_io_drop(packet_io, thr_idx, &packets[i], 1); packet_free(defraged_pkt); } else { packet_io_drop(packet_io, thr_idx, pkt, 1); } } else // PACKET_ACTION_FORWARD { if (pkt == defraged_pkt) { // TODO // copy meta from defraged_pkt to packets[i] packet_io_egress(packet_io, thr_idx, &packets[i], 1); packet_free(defraged_pkt); } else { packet_io_egress(packet_io, thr_idx, pkt, 1); } } } idle_tasks: // nr_recv packet atmost trigger nr_recv session evicted free_evicted_sessions(sess_mgr, nr_recv); plugin_manager_on_polling(plug_mgr); // per free_expired_session_interval MAX free_expired_session_batch sessions are released if (now_ms - sched_data->last_free_expired_session_timestamp > sched_data->free_expired_session_interval) { free_expired_sessions(sess_mgr, sched_data->free_expired_session_batch, now_ms); sched_data->last_free_expired_session_timestamp = now_ms; } // per merge_stat_interval merge thread stat if (now_ms - sched_data->last_merge_thread_stat_timestamp > sched_data->merge_stat_interval) { merge_thread_stat(thread); sched_data->last_merge_thread_stat_timestamp = now_ms; } // per free_expired_ip_frag_interval MAX free_expired_ip_frag_batch ip fragments are released if (now_ms - sched_data->last_free_expired_ip_frag_timestamp > sched_data->free_expired_ip_frag_interval) { ip_reassembly_expire(ip_reass, sched_data->free_expired_ip_frag_batch, now_ms); sched_data->last_free_expired_ip_frag_timestamp = now_ms; } if (nr_recv == 0) { packet_io_yield(packet_io, thr_idx, sched_data->packet_io_yield_interval); } } ATOMIC_SET(&thread->is_runing, 0); STELLAR_LOG_FATAL("worker thread %d exit", thr_idx); return NULL; } /****************************************************************************** * Stellar Main Function ******************************************************************************/ static int all_session_have_freed(struct stellar_runtime *runtime, struct stellar_config *config) { for (int i = 0; i < config->pkt_io_opts.nr_threads; i++) { struct session_manager *sess_mgr = runtime->threads[i].sess_mgr; struct session_manager_stat *sess_stat = session_manager_stat(sess_mgr); if (ATOMIC_READ(&sess_stat->tcp_sess_used) != 0) { return 0; } if (ATOMIC_READ(&sess_stat->udp_sess_used) != 0) { return 0; } } return 1; } static int stellar_thread_init(struct stellar *st) { struct stellar_runtime *runtime = &st->runtime; struct stellar_config *config = &st->config; uint64_t now_ms = clock_get_real_time_ms(); for (uint16_t i = 0; i < config->pkt_io_opts.nr_threads; i++) { struct stellar_thread *thread = &runtime->threads[i]; thread->idx = i; thread->is_runing = 0; thread->sched_data.last_free_expired_session_timestamp = now_ms; thread->sched_data.last_free_expired_ip_frag_timestamp = now_ms; thread->sched_data.last_merge_thread_stat_timestamp = now_ms; thread->sched_data.free_expired_session_interval = config->sched_opts.free_expired_session_interval; thread->sched_data.free_expired_session_batch = config->sched_opts.free_expired_session_batch; thread->sched_data.free_expired_ip_frag_interval = config->sched_opts.free_expired_ip_frag_interval; thread->sched_data.free_expired_ip_frag_batch = config->sched_opts.free_expired_ip_frag_batch; thread->sched_data.merge_stat_interval = config->sched_opts.merge_stat_interval; thread->sched_data.packet_io_yield_interval = config->sched_opts.packet_io_yield_interval; thread->sess_mgr = session_manager_new(&config->sess_mgr_opts, now_ms); if (thread->sess_mgr == NULL) { STELLAR_LOG_ERROR("unable to create session manager"); return -1; } thread->ip_mgr = ip_reassembly_new(&config->ip_reass_opts); if (thread->ip_mgr == NULL) { STELLAR_LOG_ERROR("unable to create ip reassemble manager"); return -1; } thread->st = st; } return 0; } static void stellar_thread_clean(struct stellar *st) { struct stellar_runtime *runtime = &st->runtime; struct stellar_config *config = &st->config; STELLAR_LOG_FATAL("cleaning worker thread context ..."); for (uint16_t i = 0; i < config->pkt_io_opts.nr_threads; i++) { struct stellar_thread *thread = &runtime->threads[i]; if (ATOMIC_READ(&thread->is_runing) == 0) { session_manager_free(thread->sess_mgr); ip_reassembly_free(thread->ip_mgr); } } STELLAR_LOG_FATAL("worker thread context cleaned"); } static int stellar_thread_run(struct stellar *st) { struct stellar_runtime *runtime = &st->runtime; struct stellar_config *config = &st->config; for (uint16_t i = 0; i < config->pkt_io_opts.nr_threads; i++) { struct stellar_thread *thread = &runtime->threads[i]; if (pthread_create(&thread->tid, NULL, work_thread, (void *)thread) < 0) { STELLAR_LOG_ERROR("unable to create worker thread, error %d: %s", errno, strerror(errno)); return -1; } } return 0; } static void stellar_thread_join(struct stellar *st) { struct stellar_runtime *runtime = &st->runtime; struct stellar_config *config = &st->config; STELLAR_LOG_FATAL("waiting worker thread stop ..."); for (uint16_t i = 0; i < config->pkt_io_opts.nr_threads; i++) { struct stellar_thread *thread = &runtime->threads[i]; while (ATOMIC_READ(&thread->is_runing) == 1) { usleep(1000); // 1ms } } STELLAR_LOG_FATAL("all worker thread stoped"); } struct stellar *stellar_new(const char *stellar_cfg_file, const char *plugin_cfg_file, const char *log_cfg_file) { if (stellar_cfg_file == NULL) { printf("stellar config file is null\n"); return NULL; } if (plugin_cfg_file == NULL) { printf("plugin config file is null\n"); return NULL; } if (log_cfg_file == NULL) { printf("log config file is null\n"); return NULL; } struct stellar *st = (struct stellar *)calloc(1, sizeof(struct stellar)); if (st == NULL) { return NULL; } memcpy(st->stellar_cfg_file, stellar_cfg_file, strlen(stellar_cfg_file)); memcpy(st->plugin_cfg_file, plugin_cfg_file, strlen(plugin_cfg_file)); memcpy(st->log_cfg_file, log_cfg_file, strlen(log_cfg_file)); struct stellar_runtime *runtime = &st->runtime; struct stellar_config *config = &st->config; if (log_init(st->log_cfg_file) != 0) { STELLAR_LOG_ERROR("unable to init log"); goto error_out; } STELLAR_LOG_FATAL("stellar start (version: %s)\n %s", version, logo_str); STELLAR_LOG_FATAL("stellar config file : %s", st->stellar_cfg_file); STELLAR_LOG_FATAL("plugin config file : %s", st->plugin_cfg_file); STELLAR_LOG_FATAL("log config file : %s", st->log_cfg_file); if (stellar_config_load(config, st->stellar_cfg_file) != 0) { STELLAR_LOG_ERROR("unable to load config file"); goto error_out; } stellar_config_print(config); if (snowflake_id_init(&config->snowflake_opts) != 0) { STELLAR_LOG_ERROR("unable to init id generator"); goto error_out; } runtime->stat = stellar_stat_new(config->pkt_io_opts.nr_threads); if (runtime->stat == NULL) { STELLAR_LOG_ERROR("unable to create stellar stat"); goto error_out; } runtime->plug_mgr = plugin_manager_init(st, st->plugin_cfg_file); if (runtime->plug_mgr == NULL) { STELLAR_LOG_ERROR("unable to create plugin manager"); goto error_out; } runtime->packet_io = packet_io_new(&config->pkt_io_opts); if (runtime->packet_io == NULL) { STELLAR_LOG_ERROR("unable to create packet io"); goto error_out; } if (stellar_thread_init(st) != 0) { STELLAR_LOG_ERROR("unable to init thread context"); goto error_out; } return st; error_out: if (st == NULL) { stellar_free(st); st = NULL; } return NULL; } void stellar_run(struct stellar *st) { if (st == NULL) { return; } struct stellar_runtime *runtime = &st->runtime; struct stellar_config *config = &st->config; if (stellar_thread_run(st) != 0) { STELLAR_LOG_ERROR("unable to create worker thread"); return; } runtime->stat_last_output_ts = clock_get_real_time_ms(); while (!ATOMIC_READ(&runtime->need_exit)) { if (clock_get_real_time_ms() - runtime->stat_last_output_ts > config->sched_opts.output_stat_interval) { runtime->stat_last_output_ts = clock_get_real_time_ms(); stellar_stat_output(runtime->stat); } usleep(1000); // 1ms // only available in dump file mode, automatically exits when all sessions have been released if (packet_io_isbreak(runtime->packet_io) && all_session_have_freed(runtime, config)) { STELLAR_LOG_FATAL("all sessions have been released, notify threads to exit"); stellar_stat_output(runtime->stat); // flush stat ATOMIC_SET(&runtime->need_exit, 1); } } } void stellar_free(struct stellar *st) { if (st) { struct stellar_runtime *runtime = &st->runtime; stellar_thread_join(st); stellar_thread_clean(st); packet_io_free(runtime->packet_io); plugin_manager_exit(runtime->plug_mgr); stellar_stat_free(runtime->stat); STELLAR_LOG_FATAL("stellar exit\n"); log_free(); } } void stellar_loopbreak(struct stellar *st) { if (st) { struct stellar_runtime *runtime = &st->runtime; ATOMIC_SET(&runtime->need_exit, 1); } } void stellar_reload_log_level(struct stellar *st) { if (st) { log_level_reload(st->log_cfg_file); } } /****************************************************************************** * Stellar Utility Function ******************************************************************************/ struct packet_io *stellar_get_packet_io(const struct stellar *st) { return st->runtime.packet_io; } struct plugin_manager_schema *stellar_get_plugin_manager(const struct stellar *st) { return st->runtime.plug_mgr; } void stellar_set_plugin_manger(struct stellar *st, struct plugin_manager_schema *plug_mgr) { st->runtime.plug_mgr = plug_mgr; } struct session_manager *stellar_get_session_manager(const struct stellar *st) { uint16_t idx = stellar_get_current_thread_index(); return st->runtime.threads[idx].sess_mgr; } uint16_t stellar_get_current_thread_index() { return __thread_id; } // only send user crafted packet, can't send packet which come from network void stellar_send_build_packet(struct stellar *st, struct packet *pkt) { uint16_t thr_idx = stellar_get_current_thread_index(); struct packet_io *packet_io = stellar_get_packet_io(st); struct session_manager *sess_mgr = stellar_get_session_manager(st); session_manager_record_duplicated_packet(sess_mgr, pkt); if (packet_get_origin_ctx(pkt)) { // TODO abort(); packet_io_egress(packet_io, thr_idx, pkt, 1); } else { packet_io_inject(packet_io, thr_idx, pkt, 1); } } int stellar_get_worker_thread_num(struct stellar *st) { return st->config.pkt_io_opts.nr_threads; }