#include #include #include #include "stellar/stellar.h" #include "stellar/module.h" #include "packet_io.h" #include "log_internal.h" #include "utils_internal.h" #include "packet_internal.h" #include "packet_manager.h" #define CORE_LOG_FATAL(format, ...) STELLAR_LOG_FATAL(__thread_local_logger, "core", format, ##__VA_ARGS__) #define CORE_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "core", format, ##__VA_ARGS__) #define CORE_LOG_DEBUG(format, ...) STELLAR_LOG_DEBUG(__thread_local_logger, "core", format, ##__VA_ARGS__) #ifdef STELLAR_GIT_VERSION static __attribute__((__used__)) const char *version = STELLAR_GIT_VERSION; #else static __attribute__((__used__)) const char *version = "Unknown"; #endif struct thread { pthread_t tid; uint16_t idx; uint64_t is_runing; struct stellar *st; }; struct stellar { uint16_t thread_num; uint64_t need_exit; struct logger *logger; struct packet_io *pkt_io; struct module_manager *mod_mgr; struct thread threads[MAX_THREAD_NUM]; }; static void *worker_thread(void *arg) { int nr_recv = 0; char thread_name[16] = {0}; struct packet *pkt = NULL; struct packet *pkts[RX_BURST_MAX] = {NULL}; struct thread *thread = (struct thread *)arg; uint16_t thread_id = thread->idx; struct stellar *st = thread->st; struct packet_io *pkt_io = st->pkt_io; struct module_manager *mod_mgr = st->mod_mgr; struct module *pkt_mgr_mod = module_manager_get_module(mod_mgr, PACKET_MANAGER_MODULE_NAME); struct packet_manager *pkt_mgr = module_to_packet_manager(pkt_mgr_mod); snprintf(thread_name, sizeof(thread_name), "stellar:%d", thread_id); prctl(PR_SET_NAME, (unsigned long long)thread_name, NULL, NULL, NULL); __thread_local_logger = st->logger; module_manager_register_thread(mod_mgr, thread_id); ATOMIC_SET(&thread->is_runing, 1); CORE_LOG_FATAL("worker thread %d runing", thread_id); while (ATOMIC_READ(&st->need_exit) == 0) { nr_recv = packet_io_recv(pkt_io, thread_id, pkts, RX_BURST_MAX); for (int i = 0; i < nr_recv; i++) { packet_manager_ingress(pkt_mgr, thread_id, pkts[i]); } packet_manager_dispatch(pkt_mgr, thread_id); while ((pkt = packet_manager_egress(pkt_mgr, thread_id))) { packet_io_send(pkt_io, thread_id, &pkt, 1); } packet_io_clean(pkt_io, thread_id); module_manager_polling_dispatch(mod_mgr); if (nr_recv == 0) { packet_io_yield(pkt_io, thread_id); } } CORE_LOG_FATAL("worker thread %d cleaning", thread_id); module_manager_unregister_thread(mod_mgr, thread_id); ATOMIC_SET(&thread->is_runing, 0); CORE_LOG_FATAL("worker thread %d exit", thread_id); return NULL; } static int stellar_thread_run(struct stellar *st) { for (uint64_t i = 0; i < st->thread_num; i++) { struct thread *thread = &st->threads[i]; thread->idx = i; thread->is_runing = 0; thread->st = st; if (pthread_create(&thread->tid, NULL, worker_thread, (void *)thread) < 0) { CORE_LOG_ERROR("unable to create worker thread, error %d: %s", errno, strerror(errno)); return -1; } } return 0; } static void stellar_thread_join(struct stellar *st) { for (uint64_t i = 0; i < st->thread_num; i++) { if (st->threads[i].is_runing == 0) { continue; } struct thread *thread = &st->threads[i]; pthread_join(thread->tid, NULL); } } #include "stellar/monitor.h" #include "stellar/session.h" #include "stellar/lpi_plus.h" struct module_hooks mod_hooks[] = { {monitor_on_init, monitor_on_exit, NULL, NULL}, {packet_manager_on_init, packet_manager_on_exit, packet_manager_on_thread_init, packet_manager_on_thread_exit}, {session_manager_on_init, session_manager_on_exit, session_manager_on_thread_init, session_manager_on_thread_exit}, {session_debugger_on_init, session_debugger_on_exit, NULL, NULL}, {session_monitor_on_init, session_monitor_on_exit, NULL, NULL}, {lpi_plus_init, lpi_plus_exit, NULL, NULL}, }; struct packet_node_spec { const char *module_name; const char *node_name; enum packet_stage stage; uint64_t interested_tag_key_bits; uint64_t interested_tag_val_bits; on_packet_callback *cb; }; struct packet_node_spec packet_nodes[] = { // PACKET_STAGE_FORWARD {SESSION_MANAGER_MODULE_NAME, "session_manager", PACKET_STAGE_FORWARD, PKT_TAG_KEY_IPPROTO, PKT_TAG_VAL_IPPROTO_TCP | PKT_TAG_VAL_IPPROTO_UDP, session_manager_on_packet_forward}, {SESSION_DEBUGGER_MODULE_NAME, "session_debugger", PACKET_STAGE_FORWARD, PKT_TAG_KEY_SESS, PKT_TAG_VAL_SESS_ALL, session_debugger_on_packet_forward}, {LPI_PLUS_MODULE_NAME, "lpi_plus", PACKET_STAGE_FORWARD, PKT_TAG_KEY_IPPROTO, PKT_TAG_VAL_IPPROTO_TCP | PKT_TAG_VAL_IPPROTO_UDP, lpi_plus_on_packet}, // PACKET_STAGE_OUTPUT {SESSION_MANAGER_MODULE_NAME, "session_manager", PACKET_STAGE_OUTPUT, PKT_TAG_KEY_IPPROTO, PKT_TAG_VAL_IPPROTO_TCP | PKT_TAG_VAL_IPPROTO_UDP, session_manager_on_packet_output}, }; static int register_packet_node_for_module(struct module_manager *mod_mgr, struct packet_node_spec *specs, size_t n_specs) { struct module *pkt_mgr_mod = module_manager_get_module(mod_mgr, PACKET_MANAGER_MODULE_NAME); struct packet_manager *pkt_mgr = module_to_packet_manager(pkt_mgr_mod); struct module *mod = NULL; for (size_t i = 0; i < n_specs; i++) { mod = module_manager_get_module(mod_mgr, specs[i].module_name); if (mod == NULL) { CORE_LOG_FATAL("%s unable to get module %s", __FUNCTION__, specs[i].module_name); continue; } if (packet_manager_register_node(pkt_mgr, specs[i].node_name, specs[i].stage, specs[i].interested_tag_key_bits, specs[i].interested_tag_val_bits, specs[i].cb, mod) < 0) { CORE_LOG_FATAL("%s failed to register node:%s for module:%s in stage:%d", __FUNCTION__, specs[i].node_name, specs[i].module_name, specs[i].stage); } else { CORE_LOG_FATAL("%s success to register node:%s for module:%s in stage:%d", __FUNCTION__, specs[i].node_name, specs[i].module_name, specs[i].stage); } } return 0; } struct stellar *stellar_new(const char *toml_file) { if (toml_file == NULL) { printf("stellar config file is null\n"); return NULL; } struct stellar *st = (struct stellar *)calloc(1, sizeof(struct stellar)); if (st == NULL) { return NULL; } st->logger = log_new(toml_file); if (st->logger == NULL) { printf("unable to create logger"); goto error_out; } __thread_local_logger = st->logger; CORE_LOG_FATAL("stellar start (version: %s)", version); if (load_toml_integer_config(toml_file, "packet_io.thread_num", (uint64_t *)&st->thread_num, 1, MAX_THREAD_NUM) != 0) { CORE_LOG_ERROR("unable to get thread number from config file"); goto error_out; } st->mod_mgr = module_manager_new(mod_hooks, count_of(mod_hooks), st->thread_num, toml_file, st->logger); if (st->mod_mgr == NULL) { CORE_LOG_ERROR("unable to create packet manager"); goto error_out; } st->pkt_io = packet_io_new(toml_file); if (st->pkt_io == NULL) { CORE_LOG_ERROR("unable to create packet io"); goto error_out; } if(register_packet_node_for_module(st->mod_mgr, packet_nodes, count_of(packet_nodes)) != 0) { CORE_LOG_ERROR("unable to register packet node"); goto error_out; } return st; error_out: stellar_free(st); return NULL; } void stellar_run(struct stellar *st) { if (st == NULL) { return; } if (stellar_thread_run(st) != 0) { CORE_LOG_ERROR("unable to create worker thread"); return; } while (!ATOMIC_READ(&st->need_exit)) { usleep(1000); // 1ms // only available in pcap mode if (packet_io_is_done(st->pkt_io)) { ATOMIC_SET(&st->need_exit, 1); break; } } stellar_thread_join(st); } void stellar_free(struct stellar *st) { if (st) { packet_io_free(st->pkt_io); module_manager_free(st->mod_mgr); CORE_LOG_FATAL("stellar exit\n"); log_free(st->logger); free(st); st = NULL; } } void stellar_loopbreak(struct stellar *st) { if (st) { ATOMIC_SET(&st->need_exit, 1); } } void stellar_reload_log_level(struct stellar *st) { if (st) { log_reload_level(st->logger); } } struct logger *stellar_get_logger(struct stellar *st) { if (st) { return st->logger; } else { return NULL; } } struct module_manager *stellar_get_module_manager(struct stellar *st) { if (st) { return st->mod_mgr; } else { return NULL; } }