This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
stellar-stellar/infra/stellar_core.c

347 lines
8.7 KiB
C
Raw Normal View History

2024-01-09 18:03:24 +08:00
#include <unistd.h>
#include <pthread.h>
#include <sys/prctl.h>
2024-01-09 18:03:24 +08:00
#include "packet_io.h"
#include "packet_internal.h"
2024-09-20 18:41:07 +08:00
#include "packet_manager_internal.h"
#include "stellar/stellar.h"
#include "stellar/module_manager.h"
#include "polling_manager_internal.h"
#define CORE_LOG_FATAL(format, ...) STELLAR_LOG_FATAL(__thread_local_logger, "core", format, ##__VA_ARGS__)
#define CORE_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "core", format, ##__VA_ARGS__)
#define CORE_LOG_DEBUG(format, ...) STELLAR_LOG_DEBUG(__thread_local_logger, "core", format, ##__VA_ARGS__)
2024-08-16 10:43:00 +08:00
#ifdef STELLAR_GIT_VERSION
static __attribute__((__used__)) const char *version = STELLAR_GIT_VERSION;
#else
static __attribute__((__used__)) const char *version = "Unknown";
#endif
struct stellar_thread
{
pthread_t tid;
uint16_t idx;
uint64_t is_runing;
struct stellar *st;
};
2024-09-20 18:41:07 +08:00
struct stellar
{
2024-09-20 18:41:07 +08:00
uint16_t thread_num;
uint64_t need_exit;
struct logger *logger;
2024-09-20 18:41:07 +08:00
struct packet_io *pkt_io;
struct mq_schema *mq_schema;
2024-09-20 18:41:07 +08:00
struct packet_manager *pkt_mgr;
struct stellar_module_manager *mod_mgr;
struct stellar_thread threads[MAX_THREAD_NUM];
};
static void *worker_thread(void *arg)
2024-01-09 18:03:24 +08:00
{
2024-09-20 18:41:07 +08:00
int nr_pkt_rcv = 0;
char thread_name[16] = {0};
struct packet *pkt = NULL;
2024-04-11 16:30:21 +08:00
struct packet packets[RX_BURST_MAX];
struct stellar_thread *thread = (struct stellar_thread *)arg;
2024-09-20 18:41:07 +08:00
uint16_t thread_id = thread->idx;
struct stellar *st = thread->st;
2024-09-20 18:41:07 +08:00
struct packet_io *pkt_io = st->pkt_io;
struct packet_manager *pkt_mgr = st->pkt_mgr;
struct stellar_module_manager *mod_mgr = st->mod_mgr;
struct mq_runtime *mq_rt = mq_runtime_new(st->mq_schema);
struct stellar_polling_manager *polling_mgr=stellar_module_get_polling_manager(mod_mgr);
2024-09-20 18:41:07 +08:00
snprintf(thread_name, sizeof(thread_name), "stellar:%d", thread_id);
prctl(PR_SET_NAME, (unsigned long long)thread_name, NULL, NULL, NULL);
2024-09-20 18:41:07 +08:00
__thread_local_logger = st->logger;
stellar_module_manager_register_thread(mod_mgr, thread_id, mq_rt);
2024-09-20 18:41:07 +08:00
if (packet_io_init(pkt_io, thread_id) != 0)
{
CORE_LOG_ERROR("unable to init packet io");
return NULL;
}
2024-09-20 18:41:07 +08:00
packet_manager_init(pkt_mgr, thread_id, mq_rt);
2024-01-09 18:03:24 +08:00
ATOMIC_SET(&thread->is_runing, 1);
2024-09-20 18:41:07 +08:00
CORE_LOG_FATAL("worker thread %d runing", thread_id);
2024-01-09 18:03:24 +08:00
2024-09-20 18:41:07 +08:00
while (ATOMIC_READ(&st->need_exit) == 0)
2024-01-09 18:03:24 +08:00
{
2024-09-20 18:41:07 +08:00
// TODO
memset(packets, 0, sizeof(packets));
nr_pkt_rcv = packet_io_ingress(pkt_io, thread_id, packets, RX_BURST_MAX);
if (nr_pkt_rcv == 0)
2024-01-10 10:19:47 +08:00
{
2024-03-08 18:10:38 +08:00
goto idle_tasks;
2024-01-10 10:19:47 +08:00
}
2024-01-09 18:03:24 +08:00
2024-09-20 18:41:07 +08:00
for (int i = 0; i < nr_pkt_rcv; i++)
{
2024-09-20 18:41:07 +08:00
// TODO alloc struct packet from packet pool
pkt = calloc(1, sizeof(struct packet));
memcpy(pkt, &packets[i], sizeof(struct packet));
pkt->need_free = 1;
2024-03-08 18:10:38 +08:00
2024-09-20 18:41:07 +08:00
packet_manager_ingress(pkt_mgr, thread_id, pkt);
packet_manager_dispatch(pkt_mgr, thread_id);
pkt = packet_manager_egress(pkt_mgr, thread_id);
2024-03-08 18:10:38 +08:00
2024-09-20 18:41:07 +08:00
if (pkt == NULL)
{
2024-09-20 18:41:07 +08:00
continue;
}
2024-01-09 18:03:24 +08:00
2024-09-20 18:41:07 +08:00
if (packet_get_action(pkt) == PACKET_ACTION_DROP)
{
2024-09-20 18:41:07 +08:00
packet_io_drop(pkt_io, thread_id, pkt, 1);
}
else
{
2024-09-20 18:41:07 +08:00
packet_io_egress(pkt_io, thread_id, pkt, 1);
2024-05-20 17:02:16 +08:00
}
stellar_polling_dispatch(polling_mgr);
}
idle_tasks:
stellar_polling_dispatch(polling_mgr);
2024-09-20 18:41:07 +08:00
if (nr_pkt_rcv == 0)
{
2024-09-20 18:41:07 +08:00
packet_io_yield(pkt_io, thread_id);
}
}
mq_runtime_free(mq_rt);
ATOMIC_SET(&thread->is_runing, 0);
2024-09-20 18:41:07 +08:00
CORE_LOG_FATAL("worker thread %d exit", thread_id);
2024-01-09 18:03:24 +08:00
return NULL;
}
2024-09-20 18:41:07 +08:00
static int stellar_thread_run(struct stellar *st)
2024-01-29 14:15:33 +08:00
{
2024-09-20 18:41:07 +08:00
for (uint16_t i = 0; i < st->thread_num; i++)
2024-01-29 14:15:33 +08:00
{
2024-09-20 18:41:07 +08:00
struct stellar_thread *thread = &st->threads[i];
thread->idx = i;
thread->is_runing = 0;
thread->st = st;
if (pthread_create(&thread->tid, NULL, worker_thread, (void *)thread) < 0)
2024-01-29 14:15:33 +08:00
{
CORE_LOG_ERROR("unable to create worker thread, error %d: %s", errno, strerror(errno));
2024-01-29 14:15:33 +08:00
return -1;
}
}
return 0;
}
static void stellar_thread_join(struct stellar *st)
2024-01-29 14:15:33 +08:00
{
CORE_LOG_FATAL("waiting worker thread exit");
2024-09-20 18:41:07 +08:00
for (uint16_t i = 0; i < st->thread_num; i++)
2024-01-29 14:15:33 +08:00
{
2024-09-20 18:41:07 +08:00
if (st->threads[i].is_runing == 0)
{
continue;
}
2024-09-20 18:41:07 +08:00
struct stellar_thread *thread = &st->threads[i];
pthread_join(thread->tid, NULL);
2024-01-29 14:15:33 +08:00
}
CORE_LOG_FATAL("all worker thread exited");
2024-01-29 14:15:33 +08:00
}
2024-09-20 18:41:07 +08:00
struct stellar *stellar_new(const char *stellar_cfg_file, const char *module_cfg_file, const char *log_cfg_file)
{
if (stellar_cfg_file == NULL)
{
printf("stellar config file is null\n");
return NULL;
}
2024-09-20 18:41:07 +08:00
if (module_cfg_file == NULL)
{
2024-09-20 18:41:07 +08:00
printf("module config file is null\n");
return NULL;
}
if (log_cfg_file == NULL)
{
printf("log config file is null\n");
return NULL;
}
struct stellar *st = (struct stellar *)calloc(1, sizeof(struct stellar));
if (st == NULL)
{
return NULL;
}
2024-09-20 18:41:07 +08:00
st->logger = log_new(log_cfg_file);
if (st->logger == NULL)
{
printf("unable to create logger");
goto error_out;
}
2024-09-20 18:41:07 +08:00
__thread_local_logger = st->logger;
CORE_LOG_FATAL("stellar start (version: %s)", version);
2024-09-20 18:41:07 +08:00
if (load_and_validate_toml_integer_config(stellar_cfg_file, "packet_io.nr_worker_thread", (uint64_t *)&st->thread_num, 1, MAX_THREAD_NUM) != 0)
{
2024-09-20 18:41:07 +08:00
CORE_LOG_ERROR("unable to get thread number from config file");
goto error_out;
}
2024-09-20 18:41:07 +08:00
st->mq_schema = mq_schema_new();
if (st->mq_schema == NULL)
{
2024-09-20 18:41:07 +08:00
CORE_LOG_ERROR("unable to create mq schema");
goto error_out;
}
2024-09-20 18:41:07 +08:00
st->pkt_mgr = packet_manager_new(st->mq_schema, stellar_cfg_file);
if (st->pkt_mgr == NULL)
{
2024-09-20 18:41:07 +08:00
CORE_LOG_ERROR("unable to create packet manager");
goto error_out;
}
st->mod_mgr = stellar_module_manager_new(module_cfg_file, st->thread_num, st->mq_schema, st->logger);
2024-09-20 18:41:07 +08:00
if (st->mod_mgr == NULL)
{
2024-09-20 18:41:07 +08:00
CORE_LOG_ERROR("unable to create module manager");
goto error_out;
}
2024-09-20 18:41:07 +08:00
st->pkt_io = packet_io_new(stellar_cfg_file);
if (st->pkt_io == NULL)
{
2024-09-20 18:41:07 +08:00
CORE_LOG_ERROR("unable to create packet io");
goto error_out;
}
return st;
error_out:
2024-09-20 18:41:07 +08:00
stellar_free(st);
return NULL;
}
void stellar_run(struct stellar *st)
{
if (st == NULL)
{
return;
}
if (stellar_thread_run(st) != 0)
{
CORE_LOG_ERROR("unable to create worker thread");
return;
}
2024-09-20 18:41:07 +08:00
while (!ATOMIC_READ(&st->need_exit))
{
usleep(1000); // 1ms
// only available in pcap mode
2024-09-20 18:41:07 +08:00
if (packet_io_isbreak(st->pkt_io))
{
2024-09-20 18:41:07 +08:00
ATOMIC_SET(&st->need_exit, 1);
CORE_LOG_FATAL("notify worker thread to exit");
break;
}
}
stellar_thread_join(st);
}
void stellar_free(struct stellar *st)
{
if (st)
{
2024-09-20 18:41:07 +08:00
stellar_thread_join(st);
2024-09-20 18:41:07 +08:00
packet_io_free(st->pkt_io);
stellar_module_manager_free(st->mod_mgr);
packet_manager_free(st->pkt_mgr);
mq_schema_free(st->mq_schema);
CORE_LOG_FATAL("stellar exit\n");
2024-09-20 18:41:07 +08:00
log_free(st->logger);
2024-08-23 17:19:05 +08:00
free(st);
st = NULL;
}
}
void stellar_loopbreak(struct stellar *st)
{
if (st)
{
2024-09-20 18:41:07 +08:00
ATOMIC_SET(&st->need_exit, 1);
}
}
void stellar_reload_log_level(struct stellar *st)
{
if (st)
{
2024-09-20 18:41:07 +08:00
log_reload_level(st->logger);
}
}
/******************************************************************************
* Stellar Utility Function
******************************************************************************/
2024-09-20 18:41:07 +08:00
// TODO
#if 0
// only send user build packet, can't send packet which come from network
void stellar_send_build_packet(struct stellar *st, struct packet *pkt)
2024-06-27 15:07:54 +08:00
{
2024-09-20 18:41:07 +08:00
uint16_t thread_id = stellar_module_manager_get_thread_id(st->st.mod_mgr);
struct packet_io *pkt_io = st->st.pkt_io;
struct session_manager_runtime *sess_mgr_rt = st->st.threads[thread_id].sess_mgr_rt;
session_manager_runtime_record_duplicated_packet(sess_mgr_rt, pkt);
2024-06-27 15:07:54 +08:00
2024-09-20 18:41:07 +08:00
if (packet_is_claim(pkt))
2024-06-27 15:07:54 +08:00
{
2024-09-20 18:41:07 +08:00
PACKET_LOG_ERROR("packet has been claimed and cannot be released, please check the module arrangement order");
assert(0);
return;
2024-06-27 15:07:54 +08:00
}
2024-09-20 18:41:07 +08:00
if (packet_get_origin_ctx(pkt))
{
2024-09-20 18:41:07 +08:00
// TODO
abort();
2024-09-20 18:41:07 +08:00
packet_io_egress(pkt_io, thread_id, pkt, 1);
}
else
{
2024-09-20 18:41:07 +08:00
packet_io_inject(pkt_io, thread_id, pkt, 1);
}
}
2024-09-20 18:41:07 +08:00
#endif
struct logger *stellar_get_logger(struct stellar *st)
{
if (st)
{
2024-09-20 18:41:07 +08:00
return st->logger;
}
else
{
return NULL;
}
2024-06-27 15:07:54 +08:00
}