#include #include #include #include #include #include #include #include #include #include #include "logo.h" #include "marsio.h" #include "config.h" #include "packet.h" #include "timestamp.h" #include "session_manager.h" #ifdef STELLAR_GIT_VERSION static __attribute__((__used__)) const char *__stellar_version = STELLAR_GIT_VERSION; #else static __attribute__((__used__)) const char *__stellar_version = "Unknown"; #endif #define STELLAR_LOG_STATE(format, ...) LOG_STATE("stellar", format, ##__VA_ARGS__) #define STELLAR_LOG_ERROR(format, ...) LOG_ERROR("stellar", format, ##__VA_ARGS__) #define STELLAR_LOG_DEBUG(format, ...) LOG_DEBUG("stellar", format, ##__VA_ARGS__) #define ATOMIC_SET(x, y) __atomic_store_n(x, y, __ATOMIC_RELAXED) #define ATOMIC_READ(x) __atomic_load_n(x, __ATOMIC_RELAXED) #define RX_BURST_MAX 128 struct packet_io { struct mr_instance *mr_ins; struct mr_vdev *mr_dev; struct mr_sendpath *mr_path; }; struct thread_context { pthread_t tid; uint16_t index; uint64_t need_exit; uint64_t is_runing; struct session_manager *sess_mgr; }; struct stellar_context { uint64_t need_exit; struct config config; struct packet_io *pkt_io; struct thread_context threads_ctx[MAX_THREAD_NUM]; }; struct stellar_context stellar_context; struct stellar_context *stellar_ctx_ptr = &stellar_context; static const char *log_config_file = "./conf/log.toml"; static const char *stellar_config_file = "./conf/stellar.toml"; /****************************************************************************** * example ******************************************************************************/ static void __packet_plugin_dispatch_example(const struct packet *pkt) { if (pkt == NULL) { return; } printf("\n"); printf("=> packet dispatch: %p\n", pkt); printf("<= packet dispatch\n\n"); } static void __session_plugin_dispatch_example(struct session *sess) { if (sess == NULL) { return; } printf("\n"); printf("=> session dispatch: %p\n", sess); session_dump(sess); printf("<= session dispatch\n\n"); // after session dispatch, we should reset session current packet and direction session_set0_cur_pkt(sess, NULL); session_set_cur_dir(sess, SESSION_DIR_NONE); } /****************************************************************************** * util ******************************************************************************/ static void signal_handler(int signo) { if (signo == SIGINT) { STELLAR_LOG_DEBUG("recv SIGINT, exit !!!"); ATOMIC_SET(&stellar_ctx_ptr->need_exit, 1); } if (signo == SIGQUIT) { STELLAR_LOG_DEBUG("recv SIGQUIT, exit !!!"); ATOMIC_SET(&stellar_ctx_ptr->need_exit, 1); } if (signo == SIGTERM) { STELLAR_LOG_DEBUG("recv SIGTERM, exit !!!"); ATOMIC_SET(&stellar_ctx_ptr->need_exit, 1); } if (signo == SIGHUP) { STELLAR_LOG_DEBUG("recv SIGHUP, reload log config file !!!"); log_reload_level(log_config_file); } } // return 0 : not keepalive packet // return 1 : is keepalive packet static inline int is_keepalive_packet(const char *data, int len) { if (data == NULL || len < (int)(sizeof(struct ethhdr))) { return 0; } struct ethhdr *eth_hdr = (struct ethhdr *)data; if (eth_hdr->h_proto == 0xAAAA) { return 1; } else { return 0; } } /****************************************************************************** * thread ******************************************************************************/ static inline void thread_set_name(const char *thd_symbol, uint16_t thd_idx) { char thd_name[16]; snprintf(thd_name, sizeof(thd_name), "%s:%d", thd_symbol, thd_idx); prctl(PR_SET_NAME, (unsigned long long)thd_name, NULL, NULL, NULL); } static void *main_loop(void *arg) { int n_pkt_recved; uint16_t len = 0; const char *data; struct packet pkt; struct session *sess; marsio_buff_t *rx_buff; marsio_buff_t *rx_buffs[RX_BURST_MAX]; struct thread_context *threads_ctx = (struct thread_context *)arg; struct session_manager *sess_mgr = threads_ctx->sess_mgr; struct packet_io *pkt_io = stellar_ctx_ptr->pkt_io; uint16_t thd_idx = threads_ctx->index; struct mr_vdev *mr_dev = pkt_io->mr_dev; struct mr_sendpath *mr_path = pkt_io->mr_path; struct mr_instance *mr_ins = pkt_io->mr_ins; struct mr_vdev *vdevs[1] = { mr_dev, }; int min_timeout_ms = 10; if (marsio_thread_init(mr_ins) != 0) { STELLAR_LOG_ERROR("unable to init marsio thread"); return NULL; } ATOMIC_SET(&threads_ctx->is_runing, 1); thread_set_name("stellar", thd_idx); STELLAR_LOG_DEBUG("worker thread %d runing\n", thd_idx); while (ATOMIC_READ(&threads_ctx->need_exit) == 0) { n_pkt_recved = marsio_recv_burst(mr_dev, thd_idx, rx_buffs, RX_BURST_MAX); if (n_pkt_recved <= 0) { goto poll_wait; } for (int i = 0; i < n_pkt_recved; i++) { rx_buff = rx_buffs[i]; data = marsio_buff_mtod(rx_buff); len = marsio_buff_datalen(rx_buff); if (is_keepalive_packet(data, len)) { marsio_send_burst(mr_path, thd_idx, &rx_buff, 1); continue; } else { packet_parse(&pkt, data, len); __packet_plugin_dispatch_example(&pkt); sess = session_manager_update_session(sess_mgr, &pkt); __session_plugin_dispatch_example(sess); sess = session_manager_get_evicted_session(sess_mgr); __session_plugin_dispatch_example(sess); // TODO if (1) // action == forward { marsio_send_burst(mr_path, thd_idx, &rx_buff, 1); } else // action == drop { marsio_buff_free(mr_ins, &rx_buff, 1, 0, thd_idx); } } } poll_wait: sess = session_manager_get_expired_session(sess_mgr); __session_plugin_dispatch_example(sess); marsio_poll_wait(mr_ins, vdevs, 1, thd_idx, min_timeout_ms); } ATOMIC_SET(&threads_ctx->is_runing, 0); STELLAR_LOG_DEBUG("worker thread %d exit\n", thd_idx); return NULL; } static int thread_context_init(struct stellar_context *ctx) { struct system_config *sys_cfg = &ctx->config.sys_cfg; struct session_manager_config *sess_mgr_cfg = &ctx->config.sess_mgr_cfg; for (uint16_t i = 0; i < sys_cfg->nr_threads; i++) { struct thread_context *threads_ctx = &ctx->threads_ctx[i]; threads_ctx->index = i; threads_ctx->need_exit = 0; threads_ctx->is_runing = 0; threads_ctx->sess_mgr = session_manager_create(sess_mgr_cfg); if (threads_ctx->sess_mgr == NULL) { STELLAR_LOG_ERROR("unable to create session manager"); return -1; } } return 0; } static void thread_context_free(struct stellar_context *ctx) { struct system_config *sys_cfg = &ctx->config.sys_cfg; for (uint16_t i = 0; i < sys_cfg->nr_threads; i++) { struct thread_context *threads_ctx = &ctx->threads_ctx[i]; if (ATOMIC_READ(&threads_ctx->is_runing) == 0) { session_manager_destroy(threads_ctx->sess_mgr); } } } static int thread_create(struct thread_context threads_ctx[], uint16_t nr_threads) { for (uint16_t i = 0; i < nr_threads; i++) { struct thread_context *ctx = &threads_ctx[i]; if (pthread_create(&ctx->tid, NULL, main_loop, (void *)ctx) < 0) { STELLAR_LOG_ERROR("unable to create worker thread, error %d: %s", errno, strerror(errno)); return -1; } } return 0; } static void thread_destroy(struct thread_context threads_ctx[], uint16_t nr_threads) { for (uint16_t i = 0; i < nr_threads; i++) { struct thread_context *ctx = &threads_ctx[i]; ATOMIC_SET(&ctx->need_exit, 1); while (ATOMIC_READ(&ctx->is_runing) == 1) { sleep(1); } } } /****************************************************************************** * packet io ******************************************************************************/ void packet_io_destroy(struct packet_io *pkt_io) { if (pkt_io == NULL) { return; } if (pkt_io->mr_path != NULL) { marsio_sendpath_destory(pkt_io->mr_path); pkt_io->mr_path = NULL; } if (pkt_io->mr_dev != NULL) { marsio_close_device(pkt_io->mr_dev); pkt_io->mr_dev = NULL; } if (pkt_io->mr_ins != NULL) { marsio_destory(pkt_io->mr_ins); pkt_io->mr_ins = NULL; } free(pkt_io); pkt_io = NULL; } struct packet_io *packet_io_create(struct system_config *sys_cfg) { struct packet_io *pkt_io = (struct packet_io *)calloc(1, sizeof(struct packet_io)); if (pkt_io == NULL) { STELLAR_LOG_ERROR("unable to alloc packet io"); return NULL; } int opt = 1; cpu_set_t coremask; CPU_ZERO(&coremask); for (int i = 0; i < sys_cfg->nr_threads; i++) { CPU_SET(sys_cfg->cpu_mask[i], &coremask); } pkt_io->mr_ins = marsio_create(); if (pkt_io->mr_ins == NULL) { STELLAR_LOG_ERROR("unable to create marsio instance"); goto error_out; } marsio_option_set(pkt_io->mr_ins, MARSIO_OPT_THREAD_MASK_IN_CPUSET, &coremask, sizeof(coremask)); marsio_option_set(pkt_io->mr_ins, MARSIO_OPT_EXIT_WHEN_ERR, &opt, sizeof(opt)); if (marsio_init(pkt_io->mr_ins, sys_cfg->app_symbol) != 0) { STELLAR_LOG_ERROR("unable to init marsio instance"); goto error_out; } pkt_io->mr_dev = marsio_open_device(pkt_io->mr_ins, sys_cfg->dev_symbol, sys_cfg->nr_threads, sys_cfg->nr_threads); if (pkt_io->mr_dev == NULL) { STELLAR_LOG_ERROR("unable to open marsio device"); goto error_out; } pkt_io->mr_path = marsio_sendpath_create_by_vdev(pkt_io->mr_dev); if (pkt_io->mr_path == NULL) { STELLAR_LOG_ERROR("unable to create marsio sendpath"); goto error_out; } return pkt_io; error_out: packet_io_destroy(pkt_io); return NULL; } /****************************************************************************** * main ******************************************************************************/ int main(int argc, char **argv) { memset(stellar_ctx_ptr, 0, sizeof(struct stellar_context)); timestamp_update(); if (log_init(log_config_file) != 0) { return -1; } STELLAR_LOG_STATE("Start Stellar (version: %s)\n %s", __stellar_version, logo_str); if (config_load(&stellar_ctx_ptr->config, stellar_config_file) != 0) { return -1; } config_dump(&stellar_ctx_ptr->config); struct system_config *sys_cfg = &stellar_ctx_ptr->config.sys_cfg; uint16_t nr_threads = sys_cfg->nr_threads; // TODO init plugin signal(SIGINT, signal_handler); signal(SIGQUIT, signal_handler); signal(SIGTERM, signal_handler); signal(SIGHUP, signal_handler); stellar_ctx_ptr->pkt_io = packet_io_create(sys_cfg); if (stellar_ctx_ptr->pkt_io == NULL) { STELLAR_LOG_ERROR("unable to create packet io"); goto error_out; } if (thread_context_init(stellar_ctx_ptr) != 0) { STELLAR_LOG_ERROR("unable to init thread context"); goto error_out; } if (thread_create(stellar_ctx_ptr->threads_ctx, nr_threads) != 0) { STELLAR_LOG_ERROR("unable to create worker thread"); goto error_out; } while (!ATOMIC_READ(&stellar_ctx_ptr->need_exit)) { timestamp_update(); sleep(1); } error_out: packet_io_destroy(stellar_ctx_ptr->pkt_io); thread_destroy(stellar_ctx_ptr->threads_ctx, nr_threads); thread_context_free(stellar_ctx_ptr); // TODO free plugin log_free(); return 0; }