Add API support thread cron task

This commit is contained in:
luwenpeng
2024-04-17 11:44:31 +08:00
parent f5f09e5e23
commit 3771b68873
4 changed files with 152 additions and 25 deletions

View File

@@ -10,6 +10,7 @@
#include "logo.h"
#include "stat.h"
#include "cron.h"
#include "stellar.h"
#include "config.h"
#include "packet_private.h"
@@ -29,6 +30,7 @@ struct thread_ctx
pthread_t tid;
uint16_t idx;
uint64_t is_runing;
struct thread_cron *cron;
struct ip_reassembly *ip_mgr;
struct session_manager *sess_mgr;
};
@@ -36,6 +38,7 @@ struct thread_ctx
struct stellar_runtime
{
uint64_t need_exit;
struct thread_cron *cron;
struct stellar_stat *stat;
struct packet_io *packet_io;
struct plugin_manager *plug_mgr;
@@ -128,18 +131,14 @@ static inline void thread_set_name(const char *thd_symbol, uint16_t thd_idx)
prctl(PR_SET_NAME, (unsigned long long)thd_name, NULL, NULL, NULL);
}
static inline void stellar_thread_cron(struct thread_ctx *thr_ctx)
static inline void thread_stat_merge_cron(void *ctx)
{
thread_local uint64_t last = 0;
if (timestamp_get_msec() - last > 2000)
{
struct thread_stat thr_stat = {
ip_reassembly_get_stat(thr_ctx->ip_mgr),
session_manager_get_stat(thr_ctx->sess_mgr),
};
stellar_peek_thr_stat(runtime->stat, &thr_stat, thr_ctx->idx);
last = timestamp_get_msec();
}
struct thread_ctx *thr_ctx = (struct thread_ctx *)ctx;
struct thread_stat thr_stat = {
ip_reassembly_get_stat(thr_ctx->ip_mgr),
session_manager_get_stat(thr_ctx->sess_mgr),
};
stellar_peek_thr_stat(runtime->stat, &thr_stat, thr_ctx->idx);
}
static void *work_thread(void *arg)
@@ -156,10 +155,18 @@ static void *work_thread(void *arg)
struct packet_io *packet_io = runtime->packet_io;
struct plugin_manager *plug_mgr = runtime->plug_mgr;
struct thread_ctx *thr_ctx = (struct thread_ctx *)arg;
struct thread_cron *cron = thr_ctx->cron;
struct ip_reassembly *ip_reass = thr_ctx->ip_mgr;
struct session_manager *sess_mgr = thr_ctx->sess_mgr;
thr_idx = thr_ctx->idx;
struct cron_task stat_task = {
.callback = thread_stat_merge_cron,
.data = thr_ctx,
.cycle = 2000, // ms
};
thread_cron_add_task(cron, &stat_task);
if (packet_io_init(packet_io, thr_idx) != 0)
{
STELLAR_LOG_ERROR("unable to init marsio thread");
@@ -172,7 +179,7 @@ static void *work_thread(void *arg)
while (ATOMIC_READ(&runtime->need_exit) == 0)
{
now = timestamp_get_msec(); // TODO
now = timestamp_get_msec();
nr_recv = packet_io_ingress(packet_io, thr_idx, packets, RX_BURST_MAX);
if (nr_recv == 0)
{
@@ -244,7 +251,7 @@ static void *work_thread(void *arg)
}
ip_reassembly_expire(ip_reass, now);
stellar_thread_cron(thr_ctx);
thread_cron_run(cron, now);
// TODO
// plugin_manager_cron();
@@ -266,6 +273,12 @@ static int stellar_thread_init(struct stellar_runtime *ctx, uint8_t nr_threads)
struct thread_ctx *thr_ctx = &ctx->threads[i];
thr_ctx->idx = i;
thr_ctx->is_runing = 0;
thr_ctx->cron = thread_cron_new(now);
if (thr_ctx->cron == NULL)
{
STELLAR_LOG_ERROR("unable to create thread cron");
return -1;
}
thr_ctx->sess_mgr = session_manager_new(&config->sess_mgr_opts, now);
if (thr_ctx->sess_mgr == NULL)
{
@@ -293,6 +306,7 @@ static void stellar_thread_clean(struct stellar_runtime *ctx, uint8_t nr_threads
STELLAR_LOG_STATE("wait worker thread %d free context", i);
session_manager_free(thr_ctx->sess_mgr);
ip_reassembly_free(thr_ctx->ip_mgr);
thread_cron_free(thr_ctx->cron);
}
}
}
@@ -329,11 +343,22 @@ static void stellar_thread_join(struct stellar_runtime *ctx, uint8_t nr_threads)
* main
******************************************************************************/
static inline void stellar_stat_output_cron(void *ctx)
{
struct stellar_runtime *runtime = (struct stellar_runtime *)ctx;
stellar_peek_io_stat(runtime->stat, packet_io_get_stat(runtime->packet_io));
stellar_stat_output(runtime->stat);
}
int main(int argc, char **argv)
{
uint8_t nr_threads;
uint64_t last_stat = 0;
struct io_stat *io_stat;
struct cron_task stat_task =
{
.callback = stellar_stat_output_cron,
.data = runtime,
.cycle = 2000, // ms
};
memset(runtime, 0, sizeof(struct stellar_runtime));
memset(config, 0, sizeof(struct stellar_config));
timestamp_update();
@@ -364,6 +389,13 @@ int main(int argc, char **argv)
goto error_out;
}
runtime->cron = thread_cron_new(timestamp_get_msec());
if (runtime->cron == NULL)
{
STELLAR_LOG_ERROR("unable to create runtime cron");
goto error_out;
}
runtime->stat = stellar_stat_new(nr_threads);
if (runtime->stat == NULL)
{
@@ -397,18 +429,11 @@ int main(int argc, char **argv)
goto error_out;
}
io_stat = packet_io_get_stat(runtime->packet_io);
last_stat = timestamp_get_msec();
thread_cron_add_task(runtime->cron, &stat_task);
while (!ATOMIC_READ(&runtime->need_exit))
{
timestamp_update();
if (timestamp_get_msec() - last_stat > 2000)
{
stellar_peek_io_stat(runtime->stat, io_stat);
stellar_stat_output(runtime->stat);
last_stat = timestamp_get_msec();
}
thread_cron_run(runtime->cron, timestamp_get_msec());
usleep(5 * 1000);
}
@@ -418,6 +443,7 @@ error_out:
packet_io_free(runtime->packet_io);
plugin_manager_free(runtime->plug_mgr);
stellar_stat_free(runtime->stat);
thread_cron_free(runtime->cron);
STELLAR_LOG_STATE("stellar exit !!!\n");
log_free();