diff --git a/src/stellar/CMakeLists.txt b/src/stellar/CMakeLists.txt index a5b9169..b59e1c6 100644 --- a/src/stellar/CMakeLists.txt +++ b/src/stellar/CMakeLists.txt @@ -1,4 +1,4 @@ -add_executable(stellar stellar.cpp stat.cpp) +add_executable(stellar stellar.cpp stat.cpp cron.cpp) target_link_libraries(stellar timestamp session_manager plugin_manager pthread config packet_io fieldstat4) install(TARGETS stellar RUNTIME DESTINATION bin COMPONENT Program) \ No newline at end of file diff --git a/src/stellar/cron.cpp b/src/stellar/cron.cpp new file mode 100644 index 0000000..5bf4a88 --- /dev/null +++ b/src/stellar/cron.cpp @@ -0,0 +1,69 @@ +#include "cron.h" + +#ifndef container_of +#define container_of(ptr, type, member) \ + (type *)((char *)(ptr) - (char *)&((type *)0)->member) +#endif + +struct thread_cron +{ + struct timeouts *timeouts; +}; + +struct thread_cron *thread_cron_new(uint64_t now) +{ + timeout_error_t err; + struct thread_cron *cron = (struct thread_cron *)calloc(1, sizeof(struct thread_cron)); + if (cron == NULL) + { + return NULL; + } + + cron->timeouts = timeouts_open(0, &err); + if (cron->timeouts == NULL) + { + goto error_out; + } + timeouts_update(cron->timeouts, now); + + return cron; + +error_out: + thread_cron_free(cron); + return NULL; +} + +void thread_cron_free(struct thread_cron *cron) +{ + if (cron) + { + if (cron->timeouts) + { + timeouts_close(cron->timeouts); + } + free(cron); + cron = NULL; + } +} + +void thread_cron_run(struct thread_cron *cron, uint64_t now) +{ + struct timeout *timeout; + timeouts_update(cron->timeouts, now); + while ((timeout = timeouts_get(cron->timeouts))) + { + struct cron_task *task = container_of(timeout, struct cron_task, timeout); + task->callback(task->data); + } +} + +void thread_cron_add_task(struct thread_cron *cron, struct cron_task *task) +{ + timeout_init(&task->timeout, TIMEOUT_INT); + timeouts_add(cron->timeouts, &task->timeout, task->cycle); +} + +void thread_cron_del_task(struct thread_cron *cron, struct cron_task *task) +{ + timeouts_del(cron->timeouts, &task->timeout); +} diff --git a/src/stellar/cron.h b/src/stellar/cron.h new file mode 100644 index 0000000..9a6f791 --- /dev/null +++ b/src/stellar/cron.h @@ -0,0 +1,32 @@ +#ifndef _CRON_H +#define _CRON_H + +#ifdef __cpluscplus +extern "C" +{ +#endif + +#include +#include "timeout.h" + +struct cron_task +{ + struct timeout timeout; + void (*callback)(void *); + void *data; + uint64_t cycle; +}; + +struct thread_cron; +struct thread_cron *thread_cron_new(uint64_t now); +void thread_cron_free(struct thread_cron *cron); +void thread_cron_run(struct thread_cron *cron, uint64_t now); + +void thread_cron_add_task(struct thread_cron *cron, struct cron_task *task); +void thread_cron_del_task(struct thread_cron *cron, struct cron_task *task); + +#ifdef __cpluscplus +} +#endif + +#endif diff --git a/src/stellar/stellar.cpp b/src/stellar/stellar.cpp index 273704e..bbfde41 100644 --- a/src/stellar/stellar.cpp +++ b/src/stellar/stellar.cpp @@ -10,6 +10,7 @@ #include "logo.h" #include "stat.h" +#include "cron.h" #include "stellar.h" #include "config.h" #include "packet_private.h" @@ -29,6 +30,7 @@ struct thread_ctx pthread_t tid; uint16_t idx; uint64_t is_runing; + struct thread_cron *cron; struct ip_reassembly *ip_mgr; struct session_manager *sess_mgr; }; @@ -36,6 +38,7 @@ struct thread_ctx struct stellar_runtime { uint64_t need_exit; + struct thread_cron *cron; struct stellar_stat *stat; struct packet_io *packet_io; struct plugin_manager *plug_mgr; @@ -128,18 +131,14 @@ static inline void thread_set_name(const char *thd_symbol, uint16_t thd_idx) prctl(PR_SET_NAME, (unsigned long long)thd_name, NULL, NULL, NULL); } -static inline void stellar_thread_cron(struct thread_ctx *thr_ctx) +static inline void thread_stat_merge_cron(void *ctx) { - thread_local uint64_t last = 0; - if (timestamp_get_msec() - last > 2000) - { - struct thread_stat thr_stat = { - ip_reassembly_get_stat(thr_ctx->ip_mgr), - session_manager_get_stat(thr_ctx->sess_mgr), - }; - stellar_peek_thr_stat(runtime->stat, &thr_stat, thr_ctx->idx); - last = timestamp_get_msec(); - } + struct thread_ctx *thr_ctx = (struct thread_ctx *)ctx; + struct thread_stat thr_stat = { + ip_reassembly_get_stat(thr_ctx->ip_mgr), + session_manager_get_stat(thr_ctx->sess_mgr), + }; + stellar_peek_thr_stat(runtime->stat, &thr_stat, thr_ctx->idx); } static void *work_thread(void *arg) @@ -156,10 +155,18 @@ static void *work_thread(void *arg) struct packet_io *packet_io = runtime->packet_io; struct plugin_manager *plug_mgr = runtime->plug_mgr; struct thread_ctx *thr_ctx = (struct thread_ctx *)arg; + struct thread_cron *cron = thr_ctx->cron; struct ip_reassembly *ip_reass = thr_ctx->ip_mgr; struct session_manager *sess_mgr = thr_ctx->sess_mgr; thr_idx = thr_ctx->idx; + struct cron_task stat_task = { + .callback = thread_stat_merge_cron, + .data = thr_ctx, + .cycle = 2000, // ms + }; + thread_cron_add_task(cron, &stat_task); + if (packet_io_init(packet_io, thr_idx) != 0) { STELLAR_LOG_ERROR("unable to init marsio thread"); @@ -172,7 +179,7 @@ static void *work_thread(void *arg) while (ATOMIC_READ(&runtime->need_exit) == 0) { - now = timestamp_get_msec(); // TODO + now = timestamp_get_msec(); nr_recv = packet_io_ingress(packet_io, thr_idx, packets, RX_BURST_MAX); if (nr_recv == 0) { @@ -244,7 +251,7 @@ static void *work_thread(void *arg) } ip_reassembly_expire(ip_reass, now); - stellar_thread_cron(thr_ctx); + thread_cron_run(cron, now); // TODO // plugin_manager_cron(); @@ -266,6 +273,12 @@ static int stellar_thread_init(struct stellar_runtime *ctx, uint8_t nr_threads) struct thread_ctx *thr_ctx = &ctx->threads[i]; thr_ctx->idx = i; thr_ctx->is_runing = 0; + thr_ctx->cron = thread_cron_new(now); + if (thr_ctx->cron == NULL) + { + STELLAR_LOG_ERROR("unable to create thread cron"); + return -1; + } thr_ctx->sess_mgr = session_manager_new(&config->sess_mgr_opts, now); if (thr_ctx->sess_mgr == NULL) { @@ -293,6 +306,7 @@ static void stellar_thread_clean(struct stellar_runtime *ctx, uint8_t nr_threads STELLAR_LOG_STATE("wait worker thread %d free context", i); session_manager_free(thr_ctx->sess_mgr); ip_reassembly_free(thr_ctx->ip_mgr); + thread_cron_free(thr_ctx->cron); } } } @@ -329,11 +343,22 @@ static void stellar_thread_join(struct stellar_runtime *ctx, uint8_t nr_threads) * main ******************************************************************************/ +static inline void stellar_stat_output_cron(void *ctx) +{ + struct stellar_runtime *runtime = (struct stellar_runtime *)ctx; + stellar_peek_io_stat(runtime->stat, packet_io_get_stat(runtime->packet_io)); + stellar_stat_output(runtime->stat); +} + int main(int argc, char **argv) { uint8_t nr_threads; - uint64_t last_stat = 0; - struct io_stat *io_stat; + struct cron_task stat_task = + { + .callback = stellar_stat_output_cron, + .data = runtime, + .cycle = 2000, // ms + }; memset(runtime, 0, sizeof(struct stellar_runtime)); memset(config, 0, sizeof(struct stellar_config)); timestamp_update(); @@ -364,6 +389,13 @@ int main(int argc, char **argv) goto error_out; } + runtime->cron = thread_cron_new(timestamp_get_msec()); + if (runtime->cron == NULL) + { + STELLAR_LOG_ERROR("unable to create runtime cron"); + goto error_out; + } + runtime->stat = stellar_stat_new(nr_threads); if (runtime->stat == NULL) { @@ -397,18 +429,11 @@ int main(int argc, char **argv) goto error_out; } - io_stat = packet_io_get_stat(runtime->packet_io); - last_stat = timestamp_get_msec(); + thread_cron_add_task(runtime->cron, &stat_task); while (!ATOMIC_READ(&runtime->need_exit)) { timestamp_update(); - if (timestamp_get_msec() - last_stat > 2000) - { - stellar_peek_io_stat(runtime->stat, io_stat); - stellar_stat_output(runtime->stat); - last_stat = timestamp_get_msec(); - } - + thread_cron_run(runtime->cron, timestamp_get_msec()); usleep(5 * 1000); } @@ -418,6 +443,7 @@ error_out: packet_io_free(runtime->packet_io); plugin_manager_free(runtime->plug_mgr); stellar_stat_free(runtime->stat); + thread_cron_free(runtime->cron); STELLAR_LOG_STATE("stellar exit !!!\n"); log_free();