#include #include #include #include "packet_io.h" #include "packet_internal.h" #include "packet_manager_internal.h" #include "stellar/stellar.h" #include "stellar/module_manager.h" #define CORE_LOG_FATAL(format, ...) STELLAR_LOG_FATAL(__thread_local_logger, "core", format, ##__VA_ARGS__) #define CORE_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "core", format, ##__VA_ARGS__) #define CORE_LOG_DEBUG(format, ...) STELLAR_LOG_DEBUG(__thread_local_logger, "core", format, ##__VA_ARGS__) #ifdef STELLAR_GIT_VERSION static __attribute__((__used__)) const char *version = STELLAR_GIT_VERSION; #else static __attribute__((__used__)) const char *version = "Unknown"; #endif struct stellar_thread { pthread_t tid; uint16_t idx; uint64_t is_runing; struct stellar *st; }; struct stellar { uint16_t thread_num; uint64_t need_exit; struct logger *logger; struct packet_io *pkt_io; struct mq_schema *mq_schema; struct packet_manager *pkt_mgr; struct stellar_module_manager *mod_mgr; struct stellar_thread threads[MAX_THREAD_NUM]; }; static void *worker_thread(void *arg) { int nr_pkt_rcv = 0; char thread_name[16] = {0}; struct packet *pkt = NULL; struct packet packets[RX_BURST_MAX]; struct stellar_thread *thread = (struct stellar_thread *)arg; uint16_t thread_id = thread->idx; struct stellar *st = thread->st; struct packet_io *pkt_io = st->pkt_io; struct packet_manager *pkt_mgr = st->pkt_mgr; struct stellar_module_manager *mod_mgr = st->mod_mgr; struct mq_runtime *mq_rt = mq_runtime_new(st->mq_schema); snprintf(thread_name, sizeof(thread_name), "stellar:%d", thread_id); prctl(PR_SET_NAME, (unsigned long long)thread_name, NULL, NULL, NULL); __thread_local_logger = st->logger; stellar_module_manager_register_thread(mod_mgr, thread_id, mq_rt); if (packet_io_init(pkt_io, thread_id) != 0) { CORE_LOG_ERROR("unable to init packet io"); return NULL; } packet_manager_init(pkt_mgr, thread_id, mq_rt); ATOMIC_SET(&thread->is_runing, 1); CORE_LOG_FATAL("worker thread %d runing", thread_id); while (ATOMIC_READ(&st->need_exit) == 0) { // TODO memset(packets, 0, sizeof(packets)); nr_pkt_rcv = packet_io_ingress(pkt_io, thread_id, packets, RX_BURST_MAX); if (nr_pkt_rcv == 0) { goto idle_tasks; } for (int i = 0; i < nr_pkt_rcv; i++) { // TODO alloc struct packet from packet pool pkt = calloc(1, sizeof(struct packet)); memcpy(pkt, &packets[i], sizeof(struct packet)); pkt->need_free = 1; packet_manager_ingress(pkt_mgr, thread_id, pkt); packet_manager_dispatch(pkt_mgr, thread_id); pkt = packet_manager_egress(pkt_mgr, thread_id); if (pkt == NULL) { continue; } if (packet_get_action(pkt) == PACKET_ACTION_DROP) { packet_io_drop(pkt_io, thread_id, pkt, 1); } else { packet_io_egress(pkt_io, thread_id, pkt, 1); } // TODO polling } idle_tasks: // TODO polling if (nr_pkt_rcv == 0) { packet_io_yield(pkt_io, thread_id); } } mq_runtime_free(mq_rt); ATOMIC_SET(&thread->is_runing, 0); CORE_LOG_FATAL("worker thread %d exit", thread_id); return NULL; } static int stellar_thread_run(struct stellar *st) { for (uint16_t i = 0; i < st->thread_num; i++) { struct stellar_thread *thread = &st->threads[i]; thread->idx = i; thread->is_runing = 0; thread->st = st; if (pthread_create(&thread->tid, NULL, worker_thread, (void *)thread) < 0) { CORE_LOG_ERROR("unable to create worker thread, error %d: %s", errno, strerror(errno)); return -1; } } return 0; } static void stellar_thread_join(struct stellar *st) { CORE_LOG_FATAL("waiting worker thread exit"); for (uint16_t i = 0; i < st->thread_num; i++) { if (st->threads[i].is_runing == 0) { continue; } struct stellar_thread *thread = &st->threads[i]; pthread_join(thread->tid, NULL); } CORE_LOG_FATAL("all worker thread exited"); } struct stellar *stellar_new(const char *stellar_cfg_file, const char *module_cfg_file, const char *log_cfg_file) { if (stellar_cfg_file == NULL) { printf("stellar config file is null\n"); return NULL; } if (module_cfg_file == NULL) { printf("module config file is null\n"); return NULL; } if (log_cfg_file == NULL) { printf("log config file is null\n"); return NULL; } struct stellar *st = (struct stellar *)calloc(1, sizeof(struct stellar)); if (st == NULL) { return NULL; } st->logger = log_new(log_cfg_file); if (st->logger == NULL) { printf("unable to create logger"); goto error_out; } __thread_local_logger = st->logger; CORE_LOG_FATAL("stellar start (version: %s)", version); if (load_and_validate_toml_integer_config(stellar_cfg_file, "packet_io.nr_worker_thread", (uint64_t *)&st->thread_num, 1, MAX_THREAD_NUM) != 0) { CORE_LOG_ERROR("unable to get thread number from config file"); goto error_out; } st->mq_schema = mq_schema_new(); if (st->mq_schema == NULL) { CORE_LOG_ERROR("unable to create mq schema"); goto error_out; } st->pkt_mgr = packet_manager_new(st->mq_schema, stellar_cfg_file); if (st->pkt_mgr == NULL) { CORE_LOG_ERROR("unable to create packet manager"); goto error_out; } st->mod_mgr = stellar_module_manager_new(module_cfg_file, st->thread_num, st->mq_schema); if (st->mod_mgr == NULL) { CORE_LOG_ERROR("unable to create module manager"); goto error_out; } st->pkt_io = packet_io_new(stellar_cfg_file); if (st->pkt_io == NULL) { CORE_LOG_ERROR("unable to create packet io"); goto error_out; } return st; error_out: stellar_free(st); return NULL; } void stellar_run(struct stellar *st) { if (st == NULL) { return; } if (stellar_thread_run(st) != 0) { CORE_LOG_ERROR("unable to create worker thread"); return; } while (!ATOMIC_READ(&st->need_exit)) { usleep(1000); // 1ms // only available in pcap mode if (packet_io_isbreak(st->pkt_io)) { ATOMIC_SET(&st->need_exit, 1); CORE_LOG_FATAL("notify worker thread to exit"); break; } } stellar_thread_join(st); } void stellar_free(struct stellar *st) { if (st) { stellar_thread_join(st); packet_io_free(st->pkt_io); stellar_module_manager_free(st->mod_mgr); packet_manager_free(st->pkt_mgr); mq_schema_free(st->mq_schema); CORE_LOG_FATAL("stellar exit\n"); log_free(st->logger); free(st); st = NULL; } } void stellar_loopbreak(struct stellar *st) { if (st) { ATOMIC_SET(&st->need_exit, 1); } } void stellar_reload_log_level(struct stellar *st) { if (st) { log_reload_level(st->logger); } } /****************************************************************************** * Stellar Utility Function ******************************************************************************/ // TODO #if 0 // only send user build packet, can't send packet which come from network void stellar_send_build_packet(struct stellar *st, struct packet *pkt) { uint16_t thread_id = stellar_module_manager_get_thread_id(st->st.mod_mgr); struct packet_io *pkt_io = st->st.pkt_io; struct session_manager_runtime *sess_mgr_rt = st->st.threads[thread_id].sess_mgr_rt; session_manager_runtime_record_duplicated_packet(sess_mgr_rt, pkt); if (packet_is_claim(pkt)) { PACKET_LOG_ERROR("packet has been claimed and cannot be released, please check the module arrangement order"); assert(0); return; } if (packet_get_origin_ctx(pkt)) { // TODO abort(); packet_io_egress(pkt_io, thread_id, pkt, 1); } else { packet_io_inject(pkt_io, thread_id, pkt, 1); } } #endif struct logger *stellar_get_logger(struct stellar *st) { if (st) { return st->logger; } else { return NULL; } }