This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
stellar-stellar/infra/monitor/monitor_server.c

581 lines
20 KiB
C

#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <stddef.h>
#include <pthread.h>
#include <assert.h>
#include <evhttp.h>
#include <event2/event.h>
#include <event2/keyvalq_struct.h>
#include <sys/queue.h>
#include "stellar/log.h"
#include "monitor_private.h"
#include "monitor_cmd_assistant.h"
#include "monitor/monitor_utils.h"
#include "toml/toml.h"
#include "sds/sds.h"
#include "uthash/utlist.h"
static __thread struct stellar_monitor *__thread_local_stm;
static __thread pthread_t __thread_local_tid;
static __thread pthread_t __stm_libevevt_callback_thread_local_tid;
static __thread struct logger *__stm_thread_local_logger = NULL;
static void stm_save_thread_local_context(struct stellar_monitor *stm)
{
__thread_local_stm = stm;
__thread_local_tid = pthread_self();
__stm_thread_local_logger = stm->logger_ref;
}
static void stm_connection_close_notify(struct stellar_monitor *stm, UNUSED struct evhttp_connection *evconn)
{
struct stm_conn_close_cb_manager *ele, *tmp;
DL_FOREACH_SAFE(stm->conn_close_mgr, ele, tmp)
{
ele->cb(&stm->current_conn, ele->arg);
}
}
static void on_connection_close_cb(UNUSED struct evhttp_connection *ev_conn, UNUSED void *arg)
{
__stm_libevevt_callback_thread_local_tid = pthread_self();
struct stellar_monitor *stm = stellar_monitor_get();
stm->current_conn.current_evconn_ref = ev_conn;
stm_spinlock_lock(stm->lock);
stm_connection_delete(ev_conn);
stm_connection_close_notify(stm, ev_conn);
stm_spinlock_unlock(stm->lock);
char *peer_ip_addr;
uint16_t peer_port;
evhttp_connection_get_peer(ev_conn, &peer_ip_addr, &peer_port);
STELLAR_LOG_INFO(stm->logger_ref, STM_LOG_MODULE_NAME, "cli connection closed, client %s:%u\n", peer_ip_addr, peer_port);
stm_stat_update(stm->stat, stm->worker_thread_num, STM_STAT_CLI_CONNECTION_CLOSE, 1);
stm->current_conn.current_evconn_ref = NULL;
}
static void stm_command_send_reply_by_cstr(struct evhttp_request *request, int http_status_code, const char *reply, UNUSED void *params)
{
struct evbuffer *buffer = evbuffer_new();
evbuffer_add(buffer, reply, strlen(reply));
evhttp_send_reply(request, http_status_code, "OK", buffer);
evbuffer_free(buffer);
}
static void stm_command_send_reply(struct evhttp_request *request, struct monitor_reply *reply)
{
struct evbuffer *buffer = evbuffer_new();
sds reply_str = monitor_reply_to_string(reply);
evbuffer_add(buffer, reply_str, sdslen(reply_str));
evhttp_send_reply(request, reply->http_code, reply->http_reason, buffer);
evbuffer_free(buffer);
sdsfree(reply_str);
monitor_reply_free(reply);
}
static void stm_command_notfound(struct evhttp_request *request, UNUSED void *arg)
{
struct stellar_monitor *stm = stellar_monitor_get();
const char *req_str_uri = evhttp_request_get_uri(request);
struct evkeyvalq headers = {};
evhttp_parse_query(req_str_uri, &headers);
const char *raw_cmd_content = evhttp_find_header(&headers, STM_RESTFUL_URI_CMD_KEY);
stm_command_send_reply(request, monitor_reply_new_error(error_format_unknown_command, raw_cmd_content));
STELLAR_LOG_ERROR(stm->logger_ref, STM_LOG_MODULE_NAME, "invlid http uri: %s\r\n", evhttp_request_get_uri(request));
evhttp_clear_headers(&headers);
}
static void stm_exec_command(struct stellar_monitor *stm, struct evhttp_request *request, const char *cmd_line)
{
stm_spinlock_lock(stm->lock);
monitor_cmd_cb *cmd_cb = stm_cmd_assistant_get_cb(stm->aide, cmd_line);
if (NULL == cmd_cb)
{
stm_command_notfound(request, NULL);
stm_spinlock_unlock(stm->lock);
return;
}
void *cmd_user_arg = stm_cmd_assistant_get_user_arg(stm->aide, cmd_line);
int argc;
sds *cmd_argv = sdssplitargs(cmd_line, &argc);
struct monitor_reply *reply = cmd_cb(stm, argc, cmd_argv, cmd_user_arg);
stm_command_send_reply(request, reply);
sdsfreesplitres(cmd_argv, argc);
stm_spinlock_unlock(stm->lock);
}
static void stm_new_request_cb(struct evhttp_request *request, UNUSED void *privParams)
{
__stm_libevevt_callback_thread_local_tid = pthread_self();
struct stellar_monitor *stm = stellar_monitor_get();
struct evhttp_connection *ev_conn = evhttp_request_get_connection(request);
stm->current_conn.current_evconn_ref = ev_conn;
stm_spinlock_lock(stm->lock);
stm_connection_insert(ev_conn);
stm_spinlock_unlock(stm->lock);
evhttp_connection_set_closecb(ev_conn, on_connection_close_cb, request);
// evhttp_request_set_error_cb(request, on_request_error_cb);
const char *req_str_uri = evhttp_request_get_uri(request);
char *peer_ip_addr;
uint16_t peer_port;
evhttp_connection_get_peer(ev_conn, &peer_ip_addr, &peer_port);
STELLAR_LOG_INFO(stm->logger_ref, STM_LOG_MODULE_NAME, "new cli request, client:%s:%u, uri: %s\n", peer_ip_addr, peer_port, req_str_uri);
struct evkeyvalq headers = {};
evhttp_parse_query(req_str_uri, &headers);
const char *raw_cmd_content = evhttp_find_header(&headers, STM_RESTFUL_URI_CMD_KEY);
if (NULL == raw_cmd_content)
{
stm_command_send_reply_by_cstr(request, HTTP_BADREQUEST, "http uri syntax error\r\n", NULL);
evhttp_clear_headers(&headers);
return;
}
stm_exec_command(stm, request, raw_cmd_content);
evhttp_clear_headers(&headers);
}
static int stm_event_http_init(struct stellar_monitor *stm)
{
// Create a new event handler
stm->evt_base = event_base_new();
// Create a http server using that handler
stm->evt_http_server = evhttp_new(stm->evt_base);
// Limit serving GET requests
evhttp_set_allowed_methods(stm->evt_http_server, EVHTTP_REQ_GET);
char restful_path[256] = {0}; /* must start with '/' */
snprintf(restful_path, sizeof(restful_path), "/%s/%s", STM_RESTFUL_VERSION, STM_RESTFUL_RESOURCE);
evhttp_set_cb(stm->evt_http_server, restful_path, stm_new_request_cb, stm->evt_base);
// Set the callback for anything not recognized
evhttp_set_gencb(stm->evt_http_server, stm_command_notfound, NULL);
if (evhttp_bind_socket(stm->evt_http_server, stm->config->listen_ipaddr, stm->config->listen_port_host_order) != 0)
{
STELLAR_LOG_FATAL(stm->logger_ref, STM_LOG_MODULE_NAME, "Could not bind to %s:%u\r\n", stm->config->listen_ipaddr, stm->config->listen_port_host_order);
return -1;
}
evhttp_set_timeout(stm->evt_http_server, stm->config->connection_idle_timeout);
STELLAR_LOG_INFO(stm->logger_ref, STM_LOG_MODULE_NAME, "accept http uri path: %s\r\n", restful_path);
return 0;
}
static void *stm_event_main_loop(void *arg)
{
struct stellar_monitor *stm = (struct stellar_monitor *)arg;
stm_save_thread_local_context(stm);
event_base_dispatch(stm->evt_base);
return NULL;
}
static void stm_event_http_free(struct stellar_monitor *stm)
{
event_base_loopbreak(stm->evt_base);
pthread_cancel(stm->evt_main_loop_tid);
pthread_join(stm->evt_main_loop_tid, NULL);
evhttp_free(stm->evt_http_server);
// event_free(stm->ev_timeout);
event_base_free(stm->evt_base);
}
static void stm_server_set_default_cfg(struct stellar_monitor_config *config)
{
config->ringbuf_size = STM_RINGBUF_SIZE;
config->connection_idle_timeout = STM_CONNECTION_IDLE_TIMEOUT;
config->cli_request_timeout = STM_REQUEST_TIMEOUT;
config->listen_ipaddr = "0.0.0.0";
config->listen_port_host_order = STM_SERVER_LISTEN_PORT;
config->data_link_bind_port_host_order = STM_TZSP_UDP_PORT;
config->output_interval_ms = STM_STAT_OUTPUT_INTERVAL_MS;
}
struct stellar_monitor_config *stellar_monitor_config_new(const char *toml_file)
{
struct stellar_monitor_config *config = CALLOC(struct stellar_monitor_config, 1);
stm_server_set_default_cfg(config);
int64_t int64_val = 0;
char errbuf[256];
FILE *fp = NULL;
toml_table_t *root = NULL;
toml_table_t *table = NULL;
toml_raw_t ptr = NULL;
fp = fopen(toml_file, "r");
if (fp == NULL)
{
fprintf(stderr, "config file %s open failed, %s", toml_file, strerror(errno));
goto fail_exit;
}
root = toml_parse_file(fp, errbuf, sizeof(errbuf));
if (root == NULL)
{
fprintf(stderr, "config file %s parse failed, %s", toml_file, errbuf);
goto fail_exit;
}
table = toml_table_in(root, "monitor");
if (table == NULL)
{
fprintf(stderr, "config file %s missing [monitor]", toml_file);
goto fail_exit;
}
/* listen_port */
ptr = toml_raw_in(table, "listen_port");
if (ptr != NULL && toml_rtoi(ptr, &int64_val) == 0)
{
if (int64_val < 1 || int64_val > 65535)
{
fprintf(stderr, "invalid monitor.listen_port %ld\n", int64_val);
FREE(config);
goto fail_exit;
}
config->listen_port_host_order = (uint16_t)int64_val;
}
/* data link bind port */
ptr = toml_raw_in(table, "data_link_bind_port");
if (ptr != NULL && toml_rtoi(ptr, &int64_val) == 0)
{
if (int64_val < 1 || int64_val > 65535)
{
fprintf(stderr, "invalid monitor.data_link_bind_port %ld\n", int64_val);
FREE(config);
goto fail_exit;
}
config->data_link_bind_port_host_order = (uint16_t)int64_val;
}
/* connection_idle_timeout */
ptr = toml_raw_in(table, "connection_idle_timeout");
if (ptr != NULL && toml_rtoi(ptr, &int64_val) == 0)
{
if (int64_val < 1 || int64_val > 3600)
{
fprintf(stderr, "invalid monitor.connection_idle_timeout %ld, should be [1, 3600]\n", int64_val);
FREE(config);
goto fail_exit;
}
config->connection_idle_timeout = (int)int64_val;
}
/* cli_request_timeout */
ptr = toml_raw_in(table, "cli_request_timeout");
if (ptr != NULL || toml_rtoi(ptr, &int64_val) == 0)
{
if (int64_val < 1 || int64_val > 360)
{
fprintf(stderr, "invalid monitor.cli_request_timeout %ld, , should be [1, 360]\n", int64_val);
FREE(config);
goto fail_exit;
}
config->cli_request_timeout = (int)int64_val;
}
/* stat */
ptr = toml_raw_in(table, "stat_output_path");
if (ptr == NULL || toml_rtos(ptr, &config->output_path) != 0)
{
config->output_path = strdup(STM_STAT_OUTPUT_PATH);
}
ptr = toml_raw_in(table, "stat_output_interval_ms");
if (ptr != NULL && toml_rtoi(ptr, &int64_val) == 0)
{
if (int64_val < 1000 || int64_val > 1000 * 60)
{
fprintf(stderr, "invalid monitor.stat_output_interval_ms %ld, , should be [1, 600000]\n", int64_val);
FREE(config);
goto fail_exit;
}
config->output_interval_ms = (int)int64_val;
}
fail_exit:
if (root)
{
toml_free(root);
}
if (fp)
{
fclose(fp);
}
return config;
}
struct stellar_monitor *stellar_monitor_get(void)
{
if (pthread_self() != __thread_local_tid)
{
assert(0);
// fprintf(stderr, "ERR stellar_monitor_get() failed, caller must in same thread context!\n");
return NULL;
}
return __thread_local_stm;
}
// support dynamic register command, independent of the order of initialization
int monitor_register_cmd(struct stellar_monitor *stm, const char *cmd, monitor_cmd_cb *cb, const char *flags,
const char *hint, const char *desc, void *arg)
{
stm_spinlock_lock(stm->lock);
int ret = stm_cmd_assistant_register_cmd(stm->aide, cmd, cb, arg, flags, hint, desc);
stm_spinlock_unlock(stm->lock);
return ret;
}
int monitor_register_connection_close_cb(struct stellar_monitor *stm, monitor_connection_close_cb *cb, void *arg)
{
stm_spinlock_lock(stm->lock);
struct stm_conn_close_cb_manager *ele = CALLOC(struct stm_conn_close_cb_manager, 1);
ele->cb = cb;
ele->arg = arg;
DL_APPEND(stm->conn_close_mgr, ele);
stm_spinlock_unlock(stm->lock);
return 0;
}
static struct monitor_reply *monitor_cmd_show_brief_cb(struct stellar_monitor *stm)
{
sds cmd_brief = sdsempty();
cmd_brief = sdscatfmt(cmd_brief, "%s, %s\r\n", "\"command\"", "description");
cmd_brief = sdscatfmt(cmd_brief, "-----------------------------\r\n");
sds cmd_brief_cont = stm_cmd_assistant_list_cmd_brief(stm->aide);
cmd_brief = sdscatsds(cmd_brief, cmd_brief_cont);
struct monitor_reply *reply = monitor_reply_new_string("%s", cmd_brief);
sdsfree(cmd_brief);
sdsfree(cmd_brief_cont);
return reply;
}
static struct monitor_reply *monitor_cmd_show_verbose_cb(struct stellar_monitor *stm)
{
sds cmd_verbose = stm_cmd_assistant_list_cmd_verbose(stm->aide);
struct monitor_reply *reply = monitor_reply_new_string("%s", cmd_verbose);
sdsfree(cmd_verbose);
return reply;
}
static struct monitor_reply *monitor_server_builtin_show_command_cb(struct stellar_monitor *stm UNUSED, int argc, char *argv[], UNUSED void *arg)
{
if (argc != 3)
{
return monitor_reply_new_error(error_format_wrong_number_of_args, "show command");
}
if (stm_strncasecmp_exactly(argv[2], "brief", 5) == 0)
{
return monitor_cmd_show_brief_cb((struct stellar_monitor *)arg);
}
else if (stm_strncasecmp_exactly(argv[2], "verbose", 7) == 0)
{
return monitor_cmd_show_verbose_cb((struct stellar_monitor *)arg);
}
return monitor_reply_new_error(error_format_unknown_arg, argv[2]);
}
static struct monitor_reply *monitor_server_builtin_ping_cb(struct stellar_monitor *stm UNUSED, int argc, char *argv[], UNUSED void *arg)
{
if (argc == 1)
{
return monitor_reply_new_string("pong");
}
else if (argc == 2)
{
return monitor_reply_new_string("%s", argv[1]);
}
return monitor_reply_new_error(error_format_wrong_number_of_args, "ping");
}
static struct monitor_reply *monitor_server_builtin_who_cb(struct stellar_monitor *stm, int argc UNUSED, char *argv[] UNUSED, UNUSED void *arg)
{
struct stm_connection_manager *conn_mgr = stm->connection_mgr;
struct stm_connection_manager *ele, *tmp;
sds who = sdsempty();
char timestr[64];
DL_FOREACH_SAFE(conn_mgr, ele, tmp)
{
struct timeval tv = ele->link_start_time;
struct tm *tm = localtime(&tv.tv_sec);
strftime(timestr, sizeof(timestr), "%Y-%m-%d %H:%M:%S", tm);
who = sdscatprintf(who, "%s %s:%u", timestr, ele->peer_ipaddr, ele->peer_port_host_order);
if (stm->current_conn.current_evconn_ref == ele->conn)
{
who = sdscat(who, "\033[1m [current]\033[0m");
}
who = sdscat(who, "\r\n");
}
sdsIncrLen(who, -2); // delete last \r\n
struct monitor_reply *reply = monitor_reply_new_string("%s", who);
sdsfree(who);
return reply;
}
static int stm_builtin_cmd_register(struct stellar_monitor *stm)
{
int ret = 0;
ret += monitor_register_cmd(stm, "show command", monitor_server_builtin_show_command_cb, "readonly", "[ brief|verbose ]", "show all registered commands info", (void *)stm);
assert(ret == 0);
ret += monitor_register_cmd(stm, "who", monitor_server_builtin_who_cb, "readonly", "<cr>", "show who is logged on", (void *)stm);
assert(ret == 0);
ret += monitor_register_cmd(stm, "ping", monitor_server_builtin_ping_cb, "readonly", "[message]", "ping the server", (void *)stm);
assert(ret == 0);
return ret;
}
struct monitor_connection *monitor_get_current_connection(struct stellar_monitor *monitor)
{
if (__stm_libevevt_callback_thread_local_tid != pthread_self())
{
return NULL;
}
return &monitor->current_conn;
}
int monitor_get_peer_addr(struct monitor_connection *conn, char **peer_ip, uint16_t *peer_port)
{
if (NULL == conn || conn->current_evconn_ref == NULL)
{
if (peer_ip)
{
*peer_ip = NULL;
}
if (peer_port)
{
*peer_port = 0;
}
return -1;
}
evhttp_connection_get_peer(conn->current_evconn_ref, peer_ip, peer_port);
return 0;
}
void monitor_free(struct stellar_monitor *stm)
{
STELLAR_LOG_FATAL(stm->logger_ref, STM_LOG_MODULE_NAME, "free and exit\n");
stm_event_http_free(stm);
stm_stat_free(stm->stat);
stm_cmd_assistant_free(stm->aide);
stm_spinlock_free(stm->lock);
if (stm->rpc_ins_array)
{
for (int tid = 0; tid < stm->worker_thread_num; tid++)
{
monitor_rpc_free(stm->rpc_ins_array[tid]);
}
free(stm->rpc_ins_array);
}
__thread_local_stm = NULL;
FREE(stm->config->output_path);
FREE(stm->config);
FREE(stm);
}
struct stellar_monitor *monitor_module_to_monitor(struct module *monitor_module)
{
if (monitor_module == NULL)
{
return NULL;
}
return (struct stellar_monitor *)module_get_ctx(monitor_module);
}
struct stellar_monitor *stellar_module_get_monitor(struct module_manager *mod_mgr)
{
assert(mod_mgr);
struct module *monitor_mod = module_manager_get_module(mod_mgr, MONITOR_MODULE_NAME);
return monitor_module_to_monitor(monitor_mod);
}
void monitor_on_exit(struct module_manager *mod_mgr __attribute__((unused)), struct module *mod)
{
if (mod)
{
struct stellar_monitor *stm = module_get_ctx(mod);
monitor_free(stm);
module_free(mod);
}
}
struct stellar_monitor *monitor_new(const char *toml_file, struct module_manager *mod_mgr, struct logger *logh)
{
struct stellar_monitor *stm = (struct stellar_monitor *)calloc(1, sizeof(struct stellar_monitor));
stm->logger_ref = logh;
stm->mod_mgr_ref = mod_mgr;
struct stellar_monitor_config *config = stellar_monitor_config_new(toml_file);
if (NULL == config)
{
STELLAR_LOG_FATAL(logh, STM_LOG_MODULE_NAME, "get config failed!\n");
goto fail_exit;
}
stm->config = config;
stm->worker_thread_num = module_manager_get_max_thread_num(mod_mgr);
stm->lock = stm_spinlock_new();
stm->worker_thread_num = module_manager_get_max_thread_num(mod_mgr);
assert(stm->worker_thread_num > 0);
stm->aide = stm_cmd_assistant_new();
if (stm_event_http_init(stm) < 0)
{
STELLAR_LOG_FATAL(stm->logger_ref, STM_LOG_MODULE_NAME, "libevent http server init() failed!\n");
goto fail_exit;
}
stm->stat = stm_stat_init(stm);
stm_builtin_cmd_register(stm);
stm_save_thread_local_context(stm);
pthread_create(&stm->evt_main_loop_tid, NULL, stm_event_main_loop, (void *)stm);
sds config_print = stm_config_print(config);
STELLAR_LOG_FATAL(stm->logger_ref, STM_LOG_MODULE_NAME, "config: %s\n", config_print);
sdsfree(config_print);
stm->rpc_ins_array = (struct monitor_rpc **)calloc(stm->worker_thread_num, sizeof(struct monitor_rpc *));
for (int tid = 0; tid < stm->worker_thread_num; tid++)
{
stm->rpc_ins_array[tid] = monitor_rpc_new(stm, mod_mgr);
if (stm->rpc_ins_array[tid] == NULL)
{
STELLAR_LOG_FATAL(stm->logger_ref, STM_LOG_MODULE_NAME, "rpc init failed\n");
goto fail_exit;
}
}
stm_save_thread_local_context(stm);
return stm;
fail_exit:
monitor_free(stm);
return NULL;
}
struct module *monitor_on_init(struct module_manager *mod_mgr)
{
assert(mod_mgr);
const char *toml_file = module_manager_get_toml_path(mod_mgr);
assert(toml_file);
struct logger *logh = module_manager_get_logger(mod_mgr);
assert(logh);
struct stellar_monitor *stm = monitor_new(toml_file, mod_mgr, logh);
struct module *stm_mod = module_new(MONITOR_MODULE_NAME, (void *)stm);
if (stm_mod == NULL)
{
STELLAR_LOG_FATAL(logh, STM_LOG_MODULE_NAME, "moudule new '%s' fail\n", MONITOR_MODULE_NAME);
monitor_free(stm);
return NULL;
}
// show_session_enforcer_init(mod_mgr, stm);
// stm_pktdump_enforcer_init(mod_mgr, stm);
STELLAR_LOG_FATAL(logh, STM_LOG_MODULE_NAME, "init success\n");
return stm_mod;
}