#include #include #include #include #include #include #include #include #include #include "logo.h" #include "stat.h" #include "stellar.h" #include "config.h" #include "packet_private.h" #include "packet_io.h" #include "timestamp.h" #include "id_generator.h" #include "ip_reassembly.h" #include "session_manager.h" #include "plugin_manager.h" #define STELLAR_LOG_STATE(format, ...) LOG_STATE("stellar", format, ##__VA_ARGS__) #define STELLAR_LOG_ERROR(format, ...) LOG_ERROR("stellar", format, ##__VA_ARGS__) #define STELLAR_LOG_DEBUG(format, ...) LOG_DEBUG("stellar", format, ##__VA_ARGS__) struct thread_ctx { pthread_t tid; uint16_t idx; uint64_t is_runing; struct ip_reassembly *ip_mgr; struct session_manager *sess_mgr; }; struct stellar_runtime { uint64_t need_exit; struct stellar_stat *stat; struct packet_io *packet_io; struct plugin_manager *plug_mgr; struct thread_ctx threads[MAX_THREAD_NUM]; }; struct stellar_runtime __runtime; struct stellar_runtime *runtime = &__runtime; struct stellar_config __config; struct stellar_config *config = &__config; static const char *log_config_file = "./conf/log.toml"; static const char *stellar_config_file = "./conf/stellar.toml"; /****************************************************************************** * util ******************************************************************************/ static void signal_handler(int signo) { if (signo == SIGINT) { STELLAR_LOG_STATE("SIGINT received, notify threads to exit !!!"); ATOMIC_SET(&runtime->need_exit, 1); } if (signo == SIGQUIT) { STELLAR_LOG_STATE("SIGQUIT received, notify threads to exit !!!"); ATOMIC_SET(&runtime->need_exit, 1); } if (signo == SIGTERM) { STELLAR_LOG_STATE("SIGTERM received, notify threads to exit !!!"); ATOMIC_SET(&runtime->need_exit, 1); } if (signo == SIGHUP) { STELLAR_LOG_STATE("SIGHUP received, reload log level !!!"); log_reload_level(log_config_file); } } static void execute_packet_action(struct packet_io *packet_io, struct session *sess, struct packet *pkt, uint16_t thr_idx) { int is_ctrl = packet_is_ctrl(pkt); int need_drop = packet_need_drop(pkt); if (sess != NULL) { enum session_stat stat_pkt; enum session_stat stat_byte; if (need_drop) { stat_pkt = is_ctrl ? STAT_CTRL_PKTS_DROP : STAT_RAW_PKTS_DROP; stat_byte = is_ctrl ? STAT_CTRL_BYTES_DROP : STAT_RAW_BYTES_DROP; } else { stat_pkt = is_ctrl ? STAT_CTRL_PKTS_TX : STAT_RAW_PKTS_TX; stat_byte = is_ctrl ? STAT_CTRL_BYTES_TX : STAT_RAW_BYTES_TX; } session_inc_stat(sess, session_get_current_direction(sess), stat_pkt, 1); session_inc_stat(sess, session_get_current_direction(sess), stat_byte, packet_get_len(pkt)); session_set_current_packet(sess, NULL); session_set_current_direction(sess, SESSION_DIRECTION_NONE); } if (need_drop) { packet_io_drop(packet_io, thr_idx, pkt, 1); } else { packet_io_egress(packet_io, thr_idx, pkt, 1); } } /****************************************************************************** * thread ******************************************************************************/ static inline void thread_set_name(const char *thd_symbol, uint16_t thd_idx) { char thd_name[16]; snprintf(thd_name, sizeof(thd_name), "%s:%d", thd_symbol, thd_idx); prctl(PR_SET_NAME, (unsigned long long)thd_name, NULL, NULL, NULL); } static inline void stellar_thread_cron(struct thread_ctx *thr_ctx) { thread_local uint64_t last = 0; if (timestamp_get_msec() - last > 2000) { struct thread_stat thr_stat = { ip_reassembly_get_stat(thr_ctx->ip_mgr), session_manager_get_stat(thr_ctx->sess_mgr), }; stellar_peek_thr_stat(runtime->stat, &thr_stat, thr_ctx->idx); last = timestamp_get_msec(); } } static void *work_thread(void *arg) { int nr_recv; uint64_t now = 0; uint16_t thr_idx = 0; void *plugin_ctx; struct packet *pkt; struct packet packets[RX_BURST_MAX]; struct session *sess; struct session *evicted_sess; struct session *expired_sess; struct packet_io *packet_io = runtime->packet_io; struct plugin_manager *plug_mgr = runtime->plug_mgr; struct thread_ctx *thr_ctx = (struct thread_ctx *)arg; struct ip_reassembly *ip_reass = thr_ctx->ip_mgr; struct session_manager *sess_mgr = thr_ctx->sess_mgr; thr_idx = thr_ctx->idx; if (packet_io_init(packet_io, thr_idx) != 0) { STELLAR_LOG_ERROR("unable to init marsio thread"); return NULL; } ATOMIC_SET(&thr_ctx->is_runing, 1); thread_set_name("stellar", thr_idx); STELLAR_LOG_STATE("worker thread %d runing", thr_idx); while (ATOMIC_READ(&runtime->need_exit) == 0) { now = timestamp_get_msec(); // TODO nr_recv = packet_io_ingress(packet_io, thr_idx, packets, RX_BURST_MAX); if (nr_recv == 0) { goto idle_tasks; } for (int i = 0; i < nr_recv; i++) { sess = NULL; pkt = &packets[i]; plugin_manager_dispatch_packet(plug_mgr, pkt); if (packet_is_fragment(pkt)) { struct packet *defraged_pkt = ip_reassembly_packet(ip_reass, pkt, now); if (defraged_pkt == NULL) { goto fast_path; } else { pkt = defraged_pkt; plugin_manager_dispatch_packet(plug_mgr, pkt); } } sess = session_manager_lookup_session(sess_mgr, pkt); if (sess == NULL) { sess = session_manager_new_session(sess_mgr, pkt, now); if (sess == NULL) { goto fast_path; } plugin_ctx = plugin_manager_new_ctx(sess); session_set_user_data(sess, plugin_ctx); } else { if (session_manager_update_session(sess_mgr, sess, pkt, now) == -1) { goto fast_path; } } plugin_manager_dispatch_session(plug_mgr, sess, pkt); fast_path: execute_packet_action(packet_io, sess, pkt, thr_idx); } idle_tasks: // nr_recv packet atmost trigger nr_recv session evict for (int i = 0; i < nr_recv; i++) { evicted_sess = session_manager_get_evicted_session(sess_mgr); if (evicted_sess) { plugin_ctx = session_get_user_data(evicted_sess); plugin_manager_free_ctx(plugin_ctx); session_manager_free_session(sess_mgr, evicted_sess); } } while ((expired_sess = session_manager_get_expired_session(sess_mgr, now))) { plugin_ctx = session_get_user_data(expired_sess); plugin_manager_free_ctx(plugin_ctx); session_manager_free_session(sess_mgr, expired_sess); } ip_reassembly_expire(ip_reass, now); stellar_thread_cron(thr_ctx); // TODO // plugin_manager_cron(); // poll_non_packet_events(); // packet_io_yield(); } ATOMIC_SET(&thr_ctx->is_runing, 0); STELLAR_LOG_STATE("worker thread %d exit !!!", thr_idx); return NULL; } static int stellar_thread_init(struct stellar_runtime *ctx, uint8_t nr_threads) { uint64_t now = timestamp_get_msec(); for (uint8_t i = 0; i < nr_threads; i++) { struct thread_ctx *thr_ctx = &ctx->threads[i]; thr_ctx->idx = i; thr_ctx->is_runing = 0; thr_ctx->sess_mgr = session_manager_new(&config->sess_mgr_opts, now); if (thr_ctx->sess_mgr == NULL) { STELLAR_LOG_ERROR("unable to create session manager"); return -1; } thr_ctx->ip_mgr = ip_reassembly_new(&config->ip_opts); if (thr_ctx->ip_mgr == NULL) { STELLAR_LOG_ERROR("unable to create ip reassemble manager"); return -1; } } return 0; } static void stellar_thread_clean(struct stellar_runtime *ctx, uint8_t nr_threads) { for (uint8_t i = 0; i < nr_threads; i++) { struct thread_ctx *thr_ctx = &ctx->threads[i]; if (ATOMIC_READ(&thr_ctx->is_runing) == 0) { STELLAR_LOG_STATE("wait worker thread %d free context", i); session_manager_free(thr_ctx->sess_mgr); ip_reassembly_free(thr_ctx->ip_mgr); } } } static int stellar_thread_run(struct stellar_runtime *ctx, uint8_t nr_threads) { for (uint8_t i = 0; i < nr_threads; i++) { struct thread_ctx *thr_ctx = &ctx->threads[i]; if (pthread_create(&thr_ctx->tid, NULL, work_thread, (void *)thr_ctx) < 0) { STELLAR_LOG_ERROR("unable to create worker thread, error %d: %s", errno, strerror(errno)); return -1; } } return 0; } static void stellar_thread_join(struct stellar_runtime *ctx, uint8_t nr_threads) { for (uint8_t i = 0; i < nr_threads; i++) { struct thread_ctx *thr_ctx = &ctx->threads[i]; while (ATOMIC_READ(&thr_ctx->is_runing) == 1) { STELLAR_LOG_STATE("wait worker thread %d stop", i); sleep(1); } } } /****************************************************************************** * main ******************************************************************************/ int main(int argc, char **argv) { uint8_t nr_threads; uint64_t last_stat = 0; struct io_stat *io_stat; memset(runtime, 0, sizeof(struct stellar_runtime)); memset(config, 0, sizeof(struct stellar_config)); timestamp_update(); signal(SIGINT, signal_handler); signal(SIGQUIT, signal_handler); signal(SIGTERM, signal_handler); signal(SIGHUP, signal_handler); if (log_init(log_config_file) != 0) { STELLAR_LOG_ERROR("unable to init log"); goto error_out; } STELLAR_LOG_STATE("start stellar (version: %s)\n %s", __stellar_version, logo_str); if (stellar_config_load(stellar_config_file, config) != 0) { STELLAR_LOG_ERROR("unable to load config file"); goto error_out; } stellar_config_print(config); nr_threads = config->io_opts.nr_threads; if (id_generator_init(config->dev_opts.base, config->dev_opts.offset) != 0) { STELLAR_LOG_ERROR("unable to init id generator"); goto error_out; } runtime->stat = stellar_stat_new(nr_threads); if (runtime->stat == NULL) { STELLAR_LOG_ERROR("unable to create stellar stat"); goto error_out; } runtime->plug_mgr = plugin_manager_new(); if (runtime->plug_mgr == NULL) { STELLAR_LOG_ERROR("unable to create plugin manager"); goto error_out; } runtime->packet_io = packet_io_new(&config->io_opts); if (runtime->packet_io == NULL) { STELLAR_LOG_ERROR("unable to create packet io"); goto error_out; } if (stellar_thread_init(runtime, nr_threads) != 0) { STELLAR_LOG_ERROR("unable to init thread context"); goto error_out; } if (stellar_thread_run(runtime, nr_threads) != 0) { STELLAR_LOG_ERROR("unable to create worker thread"); goto error_out; } io_stat = packet_io_get_stat(runtime->packet_io); last_stat = timestamp_get_msec(); while (!ATOMIC_READ(&runtime->need_exit)) { timestamp_update(); if (timestamp_get_msec() - last_stat > 2000) { stellar_peek_io_stat(runtime->stat, io_stat); stellar_stat_output(runtime->stat); last_stat = timestamp_get_msec(); } usleep(5 * 1000); } error_out: stellar_thread_join(runtime, nr_threads); stellar_thread_clean(runtime, nr_threads); packet_io_free(runtime->packet_io); plugin_manager_free(runtime->plug_mgr); stellar_stat_free(runtime->stat); STELLAR_LOG_STATE("stellar exit !!!\n"); log_free(); return 0; }