This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
tsg-packetadapter/platform/src/packet_adapter.cpp
2024-11-21 10:20:23 +08:00

220 lines
5.7 KiB
C++

#include <errno.h>
#include <string.h>
#include <signal.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/prctl.h>
#include "log.h"
#include "system.h"
#include "packet_io.h"
#include "packet_stat.h"
#include "packet_handle.h"
#include "http_healthcheck.h"
#define LOG_MAIN "PacketAdapter"
#ifdef GIT_VERSION
static __attribute__((__used__)) const char *Packet_Adapter_Version = GIT_VERSION;
#else
static __attribute__((__used__)) const char *Packet_Adapter_Version = "Unknown";
#endif
/******************************************************************************
* Struct
******************************************************************************/
struct thread
{
int index;
pthread_t tid;
struct runtime_ctx *runtime;
};
struct runtime_ctx
{
int need_stop;
struct metrics metrics;
struct packet_io *handle;
struct packet_stat *stat;
struct thread threads[MAX_THREAD_NUM];
};
/******************************************************************************
* Static
******************************************************************************/
struct runtime_ctx static_runtime_ctx = {0};
struct runtime_ctx *runtime = &static_runtime_ctx;
/******************************************************************************
* API
******************************************************************************/
static enum action packet_handle_callback(const char *data, int len, void *args)
{
struct metrics *metrics = (struct metrics *)args;
packet_handle(data, len, metrics);
return ACTION_BYPASS;
}
static void *worker_thread_cycle(void *arg)
{
struct thread *thread = (struct thread *)arg;
struct runtime_ctx *runtime = thread->runtime;
struct packet_io *handle = runtime->handle;
char thread_name[16];
snprintf(thread_name, sizeof(thread_name), "pkt-adapter:%d", thread->index);
prctl(PR_SET_NAME, (unsigned long long)thread_name, NULL, NULL, NULL);
if (packet_io_thread_init(handle, thread->index) != 0)
{
goto error_out;
}
LOG_INFO("%s: worker thread %d is running", LOG_MAIN, thread->index);
while (!runtime->need_stop)
{
if (packet_io_thread_polling(handle, thread->index) == 0)
{
packet_io_thread_wait(handle, thread->index, -1);
}
}
error_out:
LOG_ERROR("%s: worker thread %d exiting", LOG_MAIN, thread->index);
return (void *)NULL;
}
static void signal_handler(int signo)
{
if (signo == SIGHUP)
{
LOG_RELOAD();
LOG_ERROR("%s: received SIGHUP, reload zlog.conf", LOG_MAIN);
}
if (signo == SIGINT)
{
runtime->need_stop = 1;
LOG_ERROR("%s: received SIGINT, exit !!!", LOG_MAIN);
}
if (signo == SIGQUIT)
{
runtime->need_stop = 1;
LOG_ERROR("%s: received SIGQUIT, exit !!!", LOG_MAIN);
}
if (signo == SIGTERM)
{
runtime->need_stop = 1;
LOG_ERROR("%s: received SIGTERM, exit !!!", LOG_MAIN);
}
}
static void usage(char *cmd)
{
fprintf(stderr, "USAGE: %s [OPTIONS]\n", cmd);
fprintf(stderr, " -v -- show version\n");
fprintf(stderr, " -d -- run daemon\n");
fprintf(stderr, " -h -- show help\n");
}
int main(int argc, char **argv)
{
int opt;
const char *profile = "./conf/packet_adapter.conf";
if (LOG_INIT("./conf/zlog.conf") == -1)
{
return -1;
}
while ((opt = getopt(argc, argv, "vdh")) != -1)
{
switch (opt)
{
case 'v':
fprintf(stderr, "Packet Adapter Version: %s\n", Packet_Adapter_Version);
return 0;
case 'd':
run_daemon();
break;
case 'h': /* fall through */
default:
usage(argv[0]);
return 0;
}
}
LOG_ERROR("%s: TSG Packet Adapter Engine, Version: %s Start ...", LOG_MAIN, Packet_Adapter_Version);
if (signal(SIGHUP, signal_handler) == SIG_ERR)
{
LOG_ERROR("%s: failed at signal(SIGHUP), %d: %s", LOG_MAIN, errno, strerror(errno));
goto error;
}
if (signal(SIGINT, signal_handler) == SIG_ERR)
{
LOG_ERROR("%s: failed at signal(SIGINT), %d: %s", LOG_MAIN, errno, strerror(errno));
goto error;
}
if (signal(SIGQUIT, signal_handler) == SIG_ERR)
{
LOG_ERROR("%s: failed at signal(SIGQUIT), %d: %s", LOG_MAIN, errno, strerror(errno));
goto error;
}
if (signal(SIGTERM, signal_handler) == SIG_ERR)
{
LOG_ERROR("%s: failed at signal(SIGTERM), %d: %s", LOG_MAIN, errno, strerror(errno));
goto error;
}
runtime->stat = packet_stat_create(profile);
if (runtime->stat == NULL)
{
goto error;
}
runtime->handle = packet_io_create(profile);
if (runtime->handle == NULL)
{
goto error;
}
packet_io_set_callback(runtime->handle, packet_handle_callback, &runtime->metrics);
for (int i = 0; i < packet_io_thread_number(runtime->handle); i++)
{
runtime->threads[i].tid = 0;
runtime->threads[i].index = i;
runtime->threads[i].runtime = runtime;
}
for (int i = 0; i < packet_io_thread_number(runtime->handle); i++)
{
struct thread *thread = &runtime->threads[i];
if (pthread_create(&thread->tid, NULL, worker_thread_cycle, (void *)thread) < 0)
{
LOG_ERROR("%s: unable to create worker thread %d, error %d: %s", LOG_MAIN, i, errno, strerror(errno));
runtime->need_stop = 1;
}
}
http_healthcheck_server_start(profile);
while (!runtime->need_stop)
{
packet_stat_flush(runtime->stat, &runtime->metrics);
sleep(1);
}
error:
http_healthcheck_server_stop();
packet_stat_destory(runtime->stat);
packet_io_destory(runtime->handle);
LOG_CLOSE();
return 0;
}