#include #include #include #include #include #include #include #include #include #include #include #include #include "stellar/log.h" #include "monitor_private.h" #include "monitor_cmd_assistant.h" #include "monitor/monitor_utils.h" #include "toml/toml.h" #include "sds/sds.h" #include "uthash/utlist.h" static __thread struct stellar_monitor *__thread_local_stm; static __thread pthread_t __thread_local_tid; static __thread pthread_t __stm_libevevt_callback_thread_local_tid; static __thread struct logger *__stm_thread_local_logger = NULL; static void stm_save_thread_local_context(struct stellar_monitor *stm) { __thread_local_stm = stm; __thread_local_tid = pthread_self(); __stm_thread_local_logger = stm->logger_ref; } static void stm_connection_close_notify(struct stellar_monitor *stm, UNUSED struct evhttp_connection *evconn) { struct stm_conn_close_cb_manager *ele, *tmp; DL_FOREACH_SAFE(stm->conn_close_mgr, ele, tmp) { ele->cb(&stm->current_conn, ele->arg); } } static void on_connection_close_cb(UNUSED struct evhttp_connection *ev_conn, UNUSED void *arg) { __stm_libevevt_callback_thread_local_tid = pthread_self(); struct stellar_monitor *stm = stellar_monitor_get(); stm->current_conn.current_evconn_ref = ev_conn; stm_spinlock_lock(stm->lock); stm_connection_delete(ev_conn); stm_connection_close_notify(stm, ev_conn); stm_spinlock_unlock(stm->lock); char *peer_ip_addr; uint16_t peer_port; evhttp_connection_get_peer(ev_conn, &peer_ip_addr, &peer_port); STELLAR_LOG_INFO(stm->logger_ref, STM_LOG_MODULE_NAME, "cli connection closed, client %s:%u\n", peer_ip_addr, peer_port); stm_stat_update(stm->stat, stm->worker_thread_num, STM_STAT_CLI_CONNECTION_CLOSE, 1); stm->current_conn.current_evconn_ref = NULL; } static void stm_command_send_reply_by_cstr(struct evhttp_request *request, int http_status_code, const char *reply, UNUSED void *params) { struct evbuffer *buffer = evbuffer_new(); evbuffer_add(buffer, reply, strlen(reply)); evhttp_send_reply(request, http_status_code, "OK", buffer); evbuffer_free(buffer); } static void stm_command_send_reply(struct evhttp_request *request, struct monitor_reply *reply) { struct evbuffer *buffer = evbuffer_new(); sds reply_str = monitor_reply_to_string(reply); evbuffer_add(buffer, reply_str, sdslen(reply_str)); evhttp_send_reply(request, reply->http_code, reply->http_reason, buffer); evbuffer_free(buffer); sdsfree(reply_str); monitor_reply_free(reply); } static void stm_command_notfound(struct evhttp_request *request, UNUSED void *arg) { struct stellar_monitor *stm = stellar_monitor_get(); const char *req_str_uri = evhttp_request_get_uri(request); struct evkeyvalq headers = {}; evhttp_parse_query(req_str_uri, &headers); const char *raw_cmd_content = evhttp_find_header(&headers, STM_RESTFUL_URI_CMD_KEY); stm_command_send_reply(request, monitor_reply_new_error(error_format_unknown_command, raw_cmd_content)); STELLAR_LOG_ERROR(stm->logger_ref, STM_LOG_MODULE_NAME, "invlid http uri: %s\r\n", evhttp_request_get_uri(request)); evhttp_clear_headers(&headers); } static void stm_exec_command(struct stellar_monitor *stm, struct evhttp_request *request, const char *cmd_line) { stm_spinlock_lock(stm->lock); monitor_cmd_cb *cmd_cb = stm_cmd_assistant_get_cb(stm->aide, cmd_line); if (NULL == cmd_cb) { stm_command_notfound(request, NULL); stm_spinlock_unlock(stm->lock); return; } void *cmd_user_arg = stm_cmd_assistant_get_user_arg(stm->aide, cmd_line); int argc; sds *cmd_argv = sdssplitargs(cmd_line, &argc); struct monitor_reply *reply = cmd_cb(stm, argc, cmd_argv, cmd_user_arg); stm_command_send_reply(request, reply); sdsfreesplitres(cmd_argv, argc); stm_spinlock_unlock(stm->lock); } static void stm_new_request_cb(struct evhttp_request *request, UNUSED void *privParams) { __stm_libevevt_callback_thread_local_tid = pthread_self(); struct stellar_monitor *stm = stellar_monitor_get(); struct evhttp_connection *ev_conn = evhttp_request_get_connection(request); stm->current_conn.current_evconn_ref = ev_conn; stm_spinlock_lock(stm->lock); stm_connection_insert(ev_conn); stm_spinlock_unlock(stm->lock); evhttp_connection_set_closecb(ev_conn, on_connection_close_cb, request); // evhttp_request_set_error_cb(request, on_request_error_cb); const char *req_str_uri = evhttp_request_get_uri(request); char *peer_ip_addr; uint16_t peer_port; evhttp_connection_get_peer(ev_conn, &peer_ip_addr, &peer_port); STELLAR_LOG_INFO(stm->logger_ref, STM_LOG_MODULE_NAME, "new cli request, client:%s:%u, uri: %s\n", peer_ip_addr, peer_port, req_str_uri); struct evkeyvalq headers = {}; evhttp_parse_query(req_str_uri, &headers); const char *raw_cmd_content = evhttp_find_header(&headers, STM_RESTFUL_URI_CMD_KEY); if (NULL == raw_cmd_content) { stm_command_send_reply_by_cstr(request, HTTP_BADREQUEST, "http uri syntax error\r\n", NULL); evhttp_clear_headers(&headers); return; } stm_exec_command(stm, request, raw_cmd_content); evhttp_clear_headers(&headers); } static int stm_event_http_init(struct stellar_monitor *stm) { // Create a new event handler stm->evt_base = event_base_new(); // Create a http server using that handler stm->evt_http_server = evhttp_new(stm->evt_base); // Limit serving GET requests evhttp_set_allowed_methods(stm->evt_http_server, EVHTTP_REQ_GET); char restful_path[256] = {0}; /* must start with '/' */ snprintf(restful_path, sizeof(restful_path), "/%s/%s", STM_RESTFUL_VERSION, STM_RESTFUL_RESOURCE); evhttp_set_cb(stm->evt_http_server, restful_path, stm_new_request_cb, stm->evt_base); // Set the callback for anything not recognized evhttp_set_gencb(stm->evt_http_server, stm_command_notfound, NULL); if (evhttp_bind_socket(stm->evt_http_server, stm->config->listen_ipaddr, stm->config->listen_port_host_order) != 0) { STELLAR_LOG_FATAL(stm->logger_ref, STM_LOG_MODULE_NAME, "Could not bind to %s:%u\r\n", stm->config->listen_ipaddr, stm->config->listen_port_host_order); return -1; } evhttp_set_timeout(stm->evt_http_server, stm->config->connection_idle_timeout); STELLAR_LOG_INFO(stm->logger_ref, STM_LOG_MODULE_NAME, "accept http uri path: %s\r\n", restful_path); return 0; } static void *stm_event_main_loop(void *arg) { struct stellar_monitor *stm = (struct stellar_monitor *)arg; stm_save_thread_local_context(stm); event_base_dispatch(stm->evt_base); return NULL; } static void stm_event_http_free(struct stellar_monitor *stm) { event_base_loopbreak(stm->evt_base); pthread_cancel(stm->evt_main_loop_tid); pthread_join(stm->evt_main_loop_tid, NULL); evhttp_free(stm->evt_http_server); // event_free(stm->ev_timeout); event_base_free(stm->evt_base); } static void stm_server_set_default_cfg(struct stellar_monitor_config *config) { config->ringbuf_size = STM_RINGBUF_SIZE; config->connection_idle_timeout = STM_CONNECTION_IDLE_TIMEOUT; config->cli_request_timeout = STM_REQUEST_TIMEOUT; config->listen_ipaddr = "0.0.0.0"; config->listen_port_host_order = STM_SERVER_LISTEN_PORT; config->data_link_bind_port_host_order = STM_TZSP_UDP_PORT; config->output_interval_ms = STM_STAT_OUTPUT_INTERVAL_MS; } int stellar_monitor_set_gettime_callback(struct stellar_monitor *stm, int (*gettime_cb)(struct timeval *tv, void *tz)) { if (NULL == gettime_cb) { return -1; } stm->gettime_cb = gettime_cb; return 0; } struct stellar_monitor_config *stellar_monitor_config_new(const char *toml_file) { struct stellar_monitor_config *config = CALLOC(struct stellar_monitor_config, 1); stm_server_set_default_cfg(config); int64_t int64_val = 0; char errbuf[256]; FILE *fp = NULL; toml_table_t *root = NULL; toml_table_t *table = NULL; toml_raw_t ptr = NULL; fp = fopen(toml_file, "r"); if (fp == NULL) { fprintf(stderr, "config file %s open failed, %s", toml_file, strerror(errno)); goto fail_exit; } root = toml_parse_file(fp, errbuf, sizeof(errbuf)); if (root == NULL) { fprintf(stderr, "config file %s parse failed, %s", toml_file, errbuf); goto fail_exit; } table = toml_table_in(root, "monitor"); if (table == NULL) { fprintf(stderr, "config file %s missing [monitor]", toml_file); goto fail_exit; } /* listen_port */ ptr = toml_raw_in(table, "listen_port"); if (ptr != NULL && toml_rtoi(ptr, &int64_val) == 0) { if (int64_val < 1 || int64_val > 65535) { fprintf(stderr, "invalid monitor.listen_port %ld\n", int64_val); FREE(config); goto fail_exit; } config->listen_port_host_order = (uint16_t)int64_val; } /* data link bind port */ ptr = toml_raw_in(table, "data_link_bind_port"); if (ptr != NULL && toml_rtoi(ptr, &int64_val) == 0) { if (int64_val < 1 || int64_val > 65535) { fprintf(stderr, "invalid monitor.data_link_bind_port %ld\n", int64_val); FREE(config); goto fail_exit; } config->data_link_bind_port_host_order = (uint16_t)int64_val; } /* connection_idle_timeout */ ptr = toml_raw_in(table, "connection_idle_timeout"); if (ptr != NULL && toml_rtoi(ptr, &int64_val) == 0) { if (int64_val < 1 || int64_val > 3600) { fprintf(stderr, "invalid monitor.connection_idle_timeout %ld, should be [1, 3600]\n", int64_val); FREE(config); goto fail_exit; } config->connection_idle_timeout = (int)int64_val; } /* cli_request_timeout */ ptr = toml_raw_in(table, "cli_request_timeout"); if (ptr != NULL || toml_rtoi(ptr, &int64_val) == 0) { if (int64_val < 1 || int64_val > 360) { fprintf(stderr, "invalid monitor.cli_request_timeout %ld, , should be [1, 360]\n", int64_val); FREE(config); goto fail_exit; } config->cli_request_timeout = (int)int64_val; } /* stat */ ptr = toml_raw_in(table, "stat_output_path"); if (ptr == NULL || toml_rtos(ptr, &config->output_path) != 0) { config->output_path = strdup(STM_STAT_OUTPUT_PATH); } ptr = toml_raw_in(table, "stat_output_interval_ms"); if (ptr != NULL && toml_rtoi(ptr, &int64_val) == 0) { if (int64_val < 1000 || int64_val > 1000 * 60) { fprintf(stderr, "invalid monitor.stat_output_interval_ms %ld, , should be [1, 600000]\n", int64_val); FREE(config); goto fail_exit; } config->output_interval_ms = (int)int64_val; } fail_exit: if (root) { toml_free(root); } if (fp) { fclose(fp); } return config; } struct stellar_monitor *stellar_monitor_get(void) { if (pthread_self() != __thread_local_tid) { assert(0); // fprintf(stderr, "ERR stellar_monitor_get() failed, caller must in same thread context!\n"); return NULL; } return __thread_local_stm; } // support dynamic register command, independent of the order of initialization int monitor_register_cmd(struct stellar_monitor *stm, const char *cmd, monitor_cmd_cb *cb, const char *flags, const char *hint, const char *desc, void *arg) { stm_spinlock_lock(stm->lock); int ret = stm_cmd_assistant_register_cmd(stm->aide, cmd, cb, arg, flags, hint, desc); stm_spinlock_unlock(stm->lock); return ret; } int monitor_register_connection_close_cb(struct stellar_monitor *stm, monitor_connection_close_cb *cb, void *arg) { stm_spinlock_lock(stm->lock); struct stm_conn_close_cb_manager *ele = CALLOC(struct stm_conn_close_cb_manager, 1); ele->cb = cb; ele->arg = arg; DL_APPEND(stm->conn_close_mgr, ele); stm_spinlock_unlock(stm->lock); return 0; } static struct monitor_reply *monitor_cmd_show_brief_cb(struct stellar_monitor *stm) { sds cmd_brief = sdsempty(); cmd_brief = sdscatfmt(cmd_brief, "%s, %s\r\n", "\"command\"", "description"); cmd_brief = sdscatfmt(cmd_brief, "-----------------------------\r\n"); sds cmd_brief_cont = stm_cmd_assistant_list_cmd_brief(stm->aide); cmd_brief = sdscatsds(cmd_brief, cmd_brief_cont); struct monitor_reply *reply = monitor_reply_new_string("%s", cmd_brief); sdsfree(cmd_brief); sdsfree(cmd_brief_cont); return reply; } static struct monitor_reply *monitor_cmd_show_verbose_cb(struct stellar_monitor *stm) { sds cmd_verbose = stm_cmd_assistant_list_cmd_verbose(stm->aide); struct monitor_reply *reply = monitor_reply_new_string("%s", cmd_verbose); sdsfree(cmd_verbose); return reply; } static struct monitor_reply *monitor_server_builtin_show_command_cb(struct stellar_monitor *stm UNUSED, int argc, char *argv[], UNUSED void *arg) { if (argc != 3) { return monitor_reply_new_error(error_format_wrong_number_of_args, "show command"); } if (stm_strncasecmp_exactly(argv[2], "brief", 5) == 0) { return monitor_cmd_show_brief_cb((struct stellar_monitor *)arg); } else if (stm_strncasecmp_exactly(argv[2], "verbose", 7) == 0) { return monitor_cmd_show_verbose_cb((struct stellar_monitor *)arg); } return monitor_reply_new_error(error_format_unknown_arg, argv[2]); } static struct monitor_reply *monitor_server_builtin_ping_cb(struct stellar_monitor *stm UNUSED, int argc, char *argv[], UNUSED void *arg) { if (argc == 1) { return monitor_reply_new_string("pong"); } else if (argc == 2) { return monitor_reply_new_string("%s", argv[1]); } return monitor_reply_new_error(error_format_wrong_number_of_args, "ping"); } static struct monitor_reply *monitor_server_builtin_who_cb(struct stellar_monitor *stm, int argc UNUSED, char *argv[] UNUSED, UNUSED void *arg) { struct stm_connection_manager *conn_mgr = stm->connection_mgr; struct stm_connection_manager *ele, *tmp; sds who = sdsempty(); char timestr[64]; DL_FOREACH_SAFE(conn_mgr, ele, tmp) { struct timeval tv = ele->link_start_time; struct tm *tm = localtime(&tv.tv_sec); strftime(timestr, sizeof(timestr), "%Y-%m-%d %H:%M:%S", tm); who = sdscatprintf(who, "%s %s:%u", timestr, ele->peer_ipaddr, ele->peer_port_host_order); if (stm->current_conn.current_evconn_ref == ele->conn) { who = sdscat(who, "\033[1m [current]\033[0m"); } who = sdscat(who, "\r\n"); } sdsIncrLen(who, -2); // delete last \r\n struct monitor_reply *reply = monitor_reply_new_string("%s", who); sdsfree(who); return reply; } static int stm_builtin_cmd_register(struct stellar_monitor *stm) { int ret = 0; ret += monitor_register_cmd(stm, "show command", monitor_server_builtin_show_command_cb, "readonly", "[ brief|verbose ]", "show all registered commands info", (void *)stm); assert(ret == 0); ret += monitor_register_cmd(stm, "who", monitor_server_builtin_who_cb, "readonly", "", "show who is logged on", (void *)stm); assert(ret == 0); ret += monitor_register_cmd(stm, "ping", monitor_server_builtin_ping_cb, "readonly", "[message]", "ping the server", (void *)stm); assert(ret == 0); return ret; } struct monitor_connection *monitor_get_current_connection(struct stellar_monitor *monitor) { if (__stm_libevevt_callback_thread_local_tid != pthread_self()) { return NULL; } return &monitor->current_conn; } int monitor_get_peer_addr(struct monitor_connection *conn, char **peer_ip, uint16_t *peer_port) { if (NULL == conn || conn->current_evconn_ref == NULL) { if (peer_ip) { *peer_ip = NULL; } if (peer_port) { *peer_port = 0; } return -1; } evhttp_connection_get_peer(conn->current_evconn_ref, peer_ip, peer_port); return 0; } void monitor_free(struct stellar_monitor *stm) { STELLAR_LOG_FATAL(stm->logger_ref, STM_LOG_MODULE_NAME, "free and exit\n"); stm_event_http_free(stm); stm_stat_free(stm->stat); stm_cmd_assistant_free(stm->aide); stm_spinlock_free(stm->lock); if (stm->rpc_ins_array) { for (int tid = 0; tid < stm->worker_thread_num; tid++) { monitor_rpc_free(stm->rpc_ins_array[tid]); } free(stm->rpc_ins_array); } __thread_local_stm = NULL; FREE(stm->config->output_path); FREE(stm->config); FREE(stm); } struct stellar_monitor *monitor_module_to_monitor(struct module *monitor_module) { if (monitor_module == NULL) { return NULL; } return (struct stellar_monitor *)module_get_ctx(monitor_module); } struct stellar_monitor *stellar_module_get_monitor(struct module_manager *mod_mgr) { assert(mod_mgr); struct module *monitor_mod = module_manager_get_module(mod_mgr, MONITOR_MODULE_NAME); return monitor_module_to_monitor(monitor_mod); } void monitor_on_exit(struct module_manager *mod_mgr __attribute__((unused)), struct module *mod) { if (mod) { struct stellar_monitor *stm = module_get_ctx(mod); monitor_free(stm); module_free(mod); } } struct stellar_monitor *monitor_new(const char *toml_file, struct module_manager *mod_mgr, struct logger *logh) { struct stellar_monitor *stm = (struct stellar_monitor *)calloc(1, sizeof(struct stellar_monitor)); stm->logger_ref = logh; stm->mod_mgr_ref = mod_mgr; struct stellar_monitor_config *config = stellar_monitor_config_new(toml_file); if (NULL == config) { STELLAR_LOG_FATAL(logh, STM_LOG_MODULE_NAME, "get config failed!\n"); goto fail_exit; } stm->config = config; stm->worker_thread_num = module_manager_get_max_thread_num(mod_mgr); stm->lock = stm_spinlock_new(); stm->worker_thread_num = module_manager_get_max_thread_num(mod_mgr); assert(stm->worker_thread_num > 0); stm->gettime_cb = gettimeofday; stm->aide = stm_cmd_assistant_new(); if (stm_event_http_init(stm) < 0) { STELLAR_LOG_FATAL(stm->logger_ref, STM_LOG_MODULE_NAME, "libevent http server init() failed!\n"); goto fail_exit; } stm->stat = stm_stat_init(stm); stm_builtin_cmd_register(stm); stm_save_thread_local_context(stm); pthread_create(&stm->evt_main_loop_tid, NULL, stm_event_main_loop, (void *)stm); sds config_print = stm_config_print(config); STELLAR_LOG_FATAL(stm->logger_ref, STM_LOG_MODULE_NAME, "config: %s\n", config_print); sdsfree(config_print); stm->rpc_ins_array = (struct monitor_rpc **)calloc(stm->worker_thread_num, sizeof(struct monitor_rpc *)); for (int tid = 0; tid < stm->worker_thread_num; tid++) { stm->rpc_ins_array[tid] = monitor_rpc_new(stm, mod_mgr); if (stm->rpc_ins_array[tid] == NULL) { STELLAR_LOG_FATAL(stm->logger_ref, STM_LOG_MODULE_NAME, "rpc init failed\n"); goto fail_exit; } } stm_save_thread_local_context(stm); return stm; fail_exit: monitor_free(stm); return NULL; } struct module *monitor_on_init(struct module_manager *mod_mgr) { assert(mod_mgr); const char *toml_file = module_manager_get_toml_path(mod_mgr); assert(toml_file); struct logger *logh = module_manager_get_logger(mod_mgr); assert(logh); struct stellar_monitor *stm = monitor_new(toml_file, mod_mgr, logh); struct module *stm_mod = module_new(MONITOR_MODULE_NAME, (void *)stm); if (stm_mod == NULL) { STELLAR_LOG_FATAL(logh, STM_LOG_MODULE_NAME, "moudule new '%s' fail\n", MONITOR_MODULE_NAME); monitor_free(stm); return NULL; } // show_session_enforcer_init(mod_mgr, stm); // stm_pktdump_enforcer_init(mod_mgr, stm); STELLAR_LOG_FATAL(logh, STM_LOG_MODULE_NAME, "init success\n"); return stm_mod; }