#include #include #include #include #include #include "config.h" #include "timestamp.h" #include "stellar_priv.h" struct stellar_runtime __runtime = {0}; struct stellar_runtime *runtime = &__runtime; struct stellar_config __config = {0}; struct stellar_config *config = &__config; static void update_session_stat(struct session *sess, struct packet *pkt) { if (sess) { enum flow_direction dir = session_get_flow_direction(sess); int is_ctrl = packet_is_ctrl(pkt); switch (packet_get_action(pkt)) { case PACKET_ACTION_DROP: session_inc_stat(sess, dir, (is_ctrl ? STAT_CONTROL_PACKETS_DROPPED : STAT_RAW_PACKETS_DROPPED), 1); session_inc_stat(sess, dir, (is_ctrl ? STAT_CONTROL_BYTES_DROPPED : STAT_RAW_BYTES_DROPPED), packet_get_len(pkt)); break; case PACKET_ACTION_FORWARD: session_inc_stat(sess, dir, (is_ctrl ? STAT_CONTROL_PACKETS_TRANSMITTED : STAT_RAW_PACKETS_TRANSMITTED), 1); session_inc_stat(sess, dir, (is_ctrl ? STAT_CONTROL_BYTES_TRANSMITTED : STAT_RAW_BYTES_TRANSMITTED), packet_get_len(pkt)); break; default: assert(0); break; } session_set_current_packet(sess, NULL); session_set_flow_direction(sess, FLOW_DIRECTION_NONE); } } static inline void free_evicted_sessions(struct session_manager *sess_mgr, uint64_t max_free) { void *plugin_ctx = NULL; struct session *sess = NULL; for (uint64_t i = 0; i < max_free; i++) { sess = session_manager_get_evicted_session(sess_mgr); if (sess) { plugin_ctx = session_get_user_data(sess); plugin_manager_free_ctx(plugin_ctx); session_manager_free_session(sess_mgr, sess); } else { break; } } } static inline void free_expired_sessions(struct session_manager *sess_mgr, uint64_t max_free, uint64_t now) { void *plugin_ctx = NULL; struct session *sess = NULL; for (uint64_t i = 0; i < max_free; i++) { sess = session_manager_get_expired_session(sess_mgr, now); if (sess) { plugin_ctx = session_get_user_data(sess); plugin_manager_free_ctx(plugin_ctx); session_manager_free_session(sess_mgr, sess); } else { break; } } } static inline void merge_thread_stat(struct stellar_thread *thread, uint64_t now) { struct thread_stat thr_stat = { packet_io_stat(runtime->packet_io, thread->idx), ip_reassembly_stat(thread->ip_mgr), session_manager_stat(thread->sess_mgr), }; stellar_stat_merge(runtime->stat, &thr_stat, thread->idx); } static void *work_thread(void *arg) { int nr_recv; uint64_t now = 0; char thd_name[16] = {0}; void *plugin_ctx = NULL; struct packet *pkt = NULL; struct packet *defraged_pkt = NULL; struct packet packets[RX_BURST_MAX]; struct session *sess = NULL; struct packet_io *packet_io = runtime->packet_io; struct plugin_manager *plug_mgr = runtime->plug_mgr; struct stellar_thread *thread = (struct stellar_thread *)arg; struct ip_reassembly *ip_reass = thread->ip_mgr; struct session_manager *sess_mgr = thread->sess_mgr; uint16_t thr_idx = thread->idx; stellar_set_current_thread_index(thr_idx); snprintf(thd_name, sizeof(thd_name), "stellar:%d", thr_idx); prctl(PR_SET_NAME, (unsigned long long)thd_name, NULL, NULL, NULL); if (packet_io_init(packet_io, thr_idx) != 0) { STELLAR_LOG_ERROR("unable to init marsio thread"); return NULL; } ATOMIC_SET(&thread->is_runing, 1); STELLAR_LOG_STATE("worker thread %d runing", thr_idx); while (ATOMIC_READ(&runtime->need_exit) == 0) { now = timestamp_get_msec(); nr_recv = packet_io_ingress(packet_io, thr_idx, packets, RX_BURST_MAX); if (nr_recv == 0) { goto idle_tasks; } for (int i = 0; i < nr_recv; i++) { sess = NULL; defraged_pkt = NULL; pkt = &packets[i]; plugin_manager_dispatch_packet(plug_mgr, pkt); if (packet_is_fragment(pkt)) { defraged_pkt = ip_reassembly_packet(ip_reass, pkt, now); if (defraged_pkt == NULL) { goto fast_path; } else { pkt = defraged_pkt; plugin_manager_dispatch_packet(plug_mgr, pkt); } } sess = session_manager_lookup_session(sess_mgr, pkt); if (sess == NULL) { sess = session_manager_new_session(sess_mgr, pkt, now); if (sess == NULL) { goto fast_path; } plugin_ctx = plugin_manager_new_ctx(sess); session_set_user_data(sess, plugin_ctx); } else { if (session_manager_update_session(sess_mgr, sess, pkt, now) == -1) { goto fast_path; } } if (packet_get_session_id(pkt) == 0) { packet_set_session_id(pkt, session_get_id(sess)); } plugin_manager_dispatch_session(plug_mgr, sess, pkt); fast_path: update_session_stat(sess, pkt); if (packet_get_action(pkt) == PACKET_ACTION_DROP) { if (pkt == defraged_pkt) { packet_io_drop(packet_io, thr_idx, &packets[i], 1); packet_free(defraged_pkt); } else { packet_io_drop(packet_io, thr_idx, pkt, 1); } } else { if (pkt == defraged_pkt) { packet_io_egress(packet_io, thr_idx, &packets[i], 1); packet_free(defraged_pkt); } else { packet_io_egress(packet_io, thr_idx, pkt, 1); } } } idle_tasks: // nr_recv packet atmost trigger nr_recv session evicted free_evicted_sessions(sess_mgr, nr_recv); // per 5 ms, atmost free 8 expired session if (now - thread->timing_wheel_last_update_ts > 5) { free_expired_sessions(sess_mgr, 8, now); thread->timing_wheel_last_update_ts = now; } merge_thread_stat(thread, now); ip_reassembly_expire(ip_reass, now); // TODO // plugin_manager_cron(); // poll_non_packet_events(); if (nr_recv == 0) { packet_io_yield(packet_io, thr_idx, 10); } } ATOMIC_SET(&thread->is_runing, 0); STELLAR_LOG_STATE("worker thread %d exit !!!", thr_idx); return NULL; } static thread_local uint16_t __thread_id = 0; uint16_t stellar_get_current_thread_index() { return __thread_id; } void stellar_set_current_thread_index(uint16_t idx) { __thread_id = idx; } int stellar_thread_init(struct stellar_runtime *runtime, struct stellar_config *config) { uint64_t now = timestamp_get_msec(); for (uint16_t i = 0; i < config->io_opts.nr_threads; i++) { struct stellar_thread *thread = &runtime->threads[i]; thread->idx = i; thread->is_runing = 0; thread->timing_wheel_last_update_ts = now; thread->sess_mgr = session_manager_new(&config->sess_mgr_opts, now); if (thread->sess_mgr == NULL) { STELLAR_LOG_ERROR("unable to create session manager"); return -1; } thread->ip_mgr = ip_reassembly_new(&config->ip_opts); if (thread->ip_mgr == NULL) { STELLAR_LOG_ERROR("unable to create ip reassemble manager"); return -1; } } return 0; } void stellar_thread_clean(struct stellar_runtime *runtime, struct stellar_config *config) { for (uint16_t i = 0; i < config->io_opts.nr_threads; i++) { struct stellar_thread *thread = &runtime->threads[i]; if (ATOMIC_READ(&thread->is_runing) == 0) { STELLAR_LOG_STATE("wait worker thread %d free context", i); session_manager_free(thread->sess_mgr); ip_reassembly_free(thread->ip_mgr); } } } int stellar_thread_run(struct stellar_runtime *runtime, struct stellar_config *config) { for (uint16_t i = 0; i < config->io_opts.nr_threads; i++) { struct stellar_thread *thread = &runtime->threads[i]; if (pthread_create(&thread->tid, NULL, work_thread, (void *)thread) < 0) { STELLAR_LOG_ERROR("unable to create worker thread, error %d: %s", errno, strerror(errno)); return -1; } } return 0; } void stellar_thread_join(struct stellar_runtime *runtime, struct stellar_config *config) { for (uint16_t i = 0; i < config->io_opts.nr_threads; i++) { struct stellar_thread *thread = &runtime->threads[i]; while (ATOMIC_READ(&thread->is_runing) == 1) { STELLAR_LOG_STATE("wait worker thread %d stop", i); sleep(1); } } }