diff --git a/inc/MESA_shm_ring_queue.h b/inc/MESA_shm_ring_queue.h index 0c03f45..7c2432c 100644 --- a/inc/MESA_shm_ring_queue.h +++ b/inc/MESA_shm_ring_queue.h @@ -25,6 +25,7 @@ void MESA_shm_recycle_ring_queue(struct MESA_shm_queue_head *ring_queue_head); int MESA_shm_copy_buf_to_ring_queue(char *buf, int buflen, struct MESA_shm_queue_head *head, char *log_file, int log_file_len); int MESA_shm_ring_queue_is_empty(struct MESA_shm_queue_head *head); int MESA_shm_ring_queue_is_full(struct MESA_shm_queue_head *head); +void MESA_shm_ring_queue_set_empty(struct MESA_shm_queue_head *head); diff --git a/shm_consumer/MESA_shm_consumer.c b/shm_consumer/MESA_shm_consumer.c index a807972..9ff435a 100644 --- a/shm_consumer/MESA_shm_consumer.c +++ b/shm_consumer/MESA_shm_consumer.c @@ -8,13 +8,22 @@ #include #include #include +#include #include "list.h" #include "MESA_shm_ring_queue.h" -#define DEFAUT_BUF_SIZE 256 -#define CONSUMER_SUCCESS 0 -#define CONSUMER_ERROR -1 +#define DEFAUT_BUF_SIZE 256 +#define DEFAULT_COMMAND_LEN 1024 +#define CONSUMER_SUCCESS 0 +#define CONSUMER_ERROR -1 + +#define CONSUMER_OUTPUT_MODE_FILE 0 +#define CONSUMER_OUTPUT_MODE_TERMINAL 1 +#define CONSUMER_CARE_PID_ALL 0 +#define CONSUMER_CARE_PID_SPEC 1 +#define CONSUMER_CARE_PID_MAX_LEN 128 + struct log_file_list{ char log_file_pre[MESA_SHM_LOG_PATH_LEN]; char real_log_file[MESA_SHM_LOG_PATH_LEN]; @@ -24,7 +33,14 @@ struct log_file_list{ ino_t ino; struct list_head list; }; +struct care_pid_list{ + int pid; + struct list_head list; +}; struct log_file_list g_log_file_list; +struct care_pid_list g_care_pid_list; +int g_output_mode = CONSUMER_OUTPUT_MODE_FILE; +int g_care_pid = CONSUMER_CARE_PID_ALL; void init_ring_queue_head_arrray(struct MESA_shm_overview *ovw, struct MESA_shm_queue_head **ring_queue_head) { int i = 0; @@ -250,22 +266,242 @@ void consumer_ring_queue_to_file(struct MESA_shm_queue_head *head) write(node->fd, buf, n); head->rd_idx = (head->rd_idx + 1) % head->blknum; } + return ; +} +void consumer_ring_queue_to_terminal(struct MESA_shm_queue_head *head, int producer_pid) +{ + char *p_file = NULL; + int *p_payload_len = NULL; + char *payload = NULL; + int payload_len = 0; + char buf[MESA_SHM_LOG_BUF_PREFIX_LEN + MESA_SHM_RING_QUEUE_BLOCK_BUFLEN] = {0}; + int n = 0; + while(!MESA_shm_ring_queue_is_empty(head)){ + p_payload_len = (int *)((char *)(head + 1) + (head->blksize * head->rd_idx) + MESA_SHM_LOG_PATH_LEN); + payload_len = *p_payload_len; + payload = (char *)(p_payload_len + 1); + n = snprintf(buf, sizeof(buf), "pid:%d, %s\n", producer_pid, payload); + write(STDOUT_FILENO, buf, n); + head->rd_idx = (head->rd_idx + 1) % head->blknum; + } + return ; +} +int producer_pid_is_cared(int producer_pid) +{ + struct care_pid_list *tmp; + struct care_pid_list *n; + list_for_each_entry_safe(tmp, n, &g_care_pid_list.list, list){ + if(tmp->pid == producer_pid){ + return 1; + } + } + return 0; +} + +int check_care_pid_repeat(int pid) +{ + struct care_pid_list *tmp; + struct care_pid_list *n; + list_for_each_entry_safe(tmp, n, &g_care_pid_list.list, list){ + if(tmp->pid == pid){ + return CONSUMER_ERROR; + } + } + return CONSUMER_SUCCESS; +} +struct care_pid_list *create_care_pid_node(int pid) +{ + struct care_pid_list *node; + node = (struct care_pid_list *)malloc(sizeof(struct care_pid_list)); + if(node == NULL){ + printf("malloc pid node error\n"); + return NULL; + } + node->pid = pid; + return node; +} +int add_care_pid_node(struct care_pid_list *node, int pid) +{ + if(check_care_pid_repeat(pid) == CONSUMER_SUCCESS){ + list_add(&node->list, &g_care_pid_list.list); + }else{ + free(node); + printf("pid repeat\n"); + return CONSUMER_ERROR; + } + if(g_care_pid == CONSUMER_CARE_PID_ALL){ + g_care_pid = CONSUMER_CARE_PID_SPEC; + } + return CONSUMER_SUCCESS; +} +int parse_care_pid(char *p_pid) +{ + char *cur; + char *p; + char tmp_pid_str[CONSUMER_CARE_PID_MAX_LEN] = {0}; + int tmp_pid; + int argcc = 0; + struct care_pid_list *node = NULL;; + cur = p_pid; + p = p_pid; + while(*p){ + if(*p == '\0'){ + break; + } + if((*p < '0' || *p > '9') && !isspace(*p)){ + printf("invalid pid\n"); + return CONSUMER_ERROR; + } + p++; + } + while(isspace(*cur)){ + cur++; + } + while(1){ + if(*cur == '\0'){ + break ; + } + + if(!isspace(*cur)){ + tmp_pid_str[argcc] = *cur; + argcc++; + cur++; + }else{ + while(isspace(*cur)){ + cur++; + } + tmp_pid = atoi(tmp_pid_str); + node = create_care_pid_node(tmp_pid); + if(node == NULL){ + return CONSUMER_ERROR; + } + if(add_care_pid_node(node,tmp_pid) == CONSUMER_ERROR){ + printf("add pid error\n"); + return CONSUMER_ERROR; + } + memset(tmp_pid_str, 0 ,sizeof(tmp_pid_str)); + argcc = 0; + } + if(argcc >= CONSUMER_CARE_PID_MAX_LEN){ + printf("invalid pid\n"); + return CONSUMER_ERROR; + } + if(*cur == 0 && tmp_pid_str[0] != '\0'){ + tmp_pid = atoi(tmp_pid_str); + node = create_care_pid_node(tmp_pid); + if(node == NULL){ + return CONSUMER_ERROR; + } + if(add_care_pid_node(node,tmp_pid) == CONSUMER_ERROR){ + printf("add pid error\n"); + return CONSUMER_ERROR; + } + } + } + return CONSUMER_SUCCESS; +} + +int get_options(int argc, char **argv) +{ + char *p; + int i; + for(i = 1; i < argc; i++){ + p = argv[i]; + if(*p++ != '-'){ + return CONSUMER_ERROR; + } + if(strcmp(p, "pid") == 0){ + if(argv[++i]){ + if(strcmp(argv[i], "all") == 0){ + g_care_pid = CONSUMER_CARE_PID_ALL; + }else{ + if(parse_care_pid(argv[i]) == CONSUMER_ERROR){ + printf("parse pid error\n"); + return CONSUMER_ERROR; + } + } + + }else{ + printf("miss parameter\n"); + return CONSUMER_ERROR; + } + }else if(strcmp(p, "mode") == 0){ + if(argv[++i]){ + if(strcmp(argv[i], "file") == 0){ + g_output_mode = CONSUMER_OUTPUT_MODE_FILE; + }else if(strcmp(argv[i], "terminal") == 0){ + g_output_mode = CONSUMER_OUTPUT_MODE_TERMINAL; + }else{ + printf("invalid option:%s\n",argv[i]); + return CONSUMER_ERROR; + } + }else{ + printf("miss parameter\n"); + return CONSUMER_ERROR; + } + }else{ + printf("invalid option:%s\n",p); + return CONSUMER_ERROR; + } + } + return CONSUMER_SUCCESS; +} +void kill_old_process(char *process_name) +{ + FILE *fp = NULL; + char buf[DEFAUT_BUF_SIZE] = {0}; + char ps_cmd[DEFAULT_COMMAND_LEN] = {0}; + char kill_cmd[DEFAULT_COMMAND_LEN] = {0}; + int pid = getpid(); + int tmp_pid = 0; + snprintf(ps_cmd, sizeof(ps_cmd), "ps -ef | grep %s | grep -v grep | awk '{print $2}'", process_name); + fp = popen(ps_cmd, "r"); + if(fp == NULL){ + return ; + } + while((fgets(buf, sizeof(buf), fp)) != NULL){ + tmp_pid = atoi(buf); + if(tmp_pid != pid){ + snprintf(kill_cmd, sizeof(kill_cmd), "kill %d", tmp_pid); + system(kill_cmd); + } + } + pclose(fp); + return ; +} + + +void print_help(char *exe_name) +{ + printf("-pid default all, optional parameter\n"); + printf("-mode default file, optional parameter\n"); + printf("./%s -pid [ '$pid1 $pid2 $pid3...' | all ] -mode [ file | terminal ]\n",exe_name); + return ; } int main(int argc, char **argv) { - char *exe_name = get_exe_name(argv[0]); - if(consumer_is_running(exe_name)){ - printf("consumer process is already running\n"); + if(argc == 2 && argv[1] != NULL && strstr(argv[1], "help") != NULL){ + print_help(exe_name); return 0; } - consumer_daemo(); + if(argc == 2 && argv[1] != NULL && (strcmp(argv[1], "stop") == 0)){ + kill_old_process(exe_name); + return 0; + } + INIT_LIST_HEAD(&g_care_pid_list.list); + if(get_options(argc, argv) < 0){ + printf("parse options error\n"); + return 0; + } + kill_old_process(exe_name); + if(g_output_mode == CONSUMER_OUTPUT_MODE_FILE){ + consumer_daemo(); + } int i = 0; struct MESA_shm_overview *shm_overview = NULL; struct MESA_shm_overview *tmp_ovw = NULL; struct MESA_shm_queue_head *ring_queue_head[MESA_SHM_RING_QUEUE_NUM] = {NULL}; - int log_file_fd = -1; - struct log_file_list *log_file_node = NULL; INIT_LIST_HEAD(&g_log_file_list.list); shm_overview = MESA_shm_alloc_overview(); if(shm_overview == NULL){ @@ -284,10 +520,21 @@ int main(int argc, char **argv) break ; } } - if(!MESA_shm_ring_queue_is_empty(ring_queue_head[i])){ - consumer_ring_queue_to_file(ring_queue_head[i]); + if(!MESA_shm_ring_queue_is_empty(ring_queue_head[i])){ + if(g_care_pid == CONSUMER_CARE_PID_SPEC){ + if(!producer_pid_is_cared(tmp_ovw->producer_pid)){ + MESA_shm_ring_queue_set_empty(ring_queue_head[i]); + goto next; + } + } + if(g_output_mode == CONSUMER_OUTPUT_MODE_FILE){ + consumer_ring_queue_to_file(ring_queue_head[i]); + }else{ + consumer_ring_queue_to_terminal(ring_queue_head[i], tmp_ovw->producer_pid); + } } } + next: usleep(5000); } return 0; diff --git a/src/MESA_shm_ring_queue.c b/src/MESA_shm_ring_queue.c index 69c747f..f6012fb 100644 --- a/src/MESA_shm_ring_queue.c +++ b/src/MESA_shm_ring_queue.c @@ -181,6 +181,12 @@ int MESA_shm_ring_queue_is_full(struct MESA_shm_queue_head *head) return 0; } } +void MESA_shm_ring_queue_set_empty(struct MESA_shm_queue_head *head) +{ + head->rd_idx = head->wr_idx; + return; +} + int MESA_shm_copy_buf_to_ring_queue(char *buf, int buflen, struct MESA_shm_queue_head *head, char *log_file, int log_file_len) { int len = buflen < (MESA_SHM_RING_QUEUE_BLOCK_BUFLEN -1)?buflen:(MESA_SHM_RING_QUEUE_BLOCK_BUFLEN - 1); diff --git a/src/version.map b/src/version.map index c453ef4..a4c52a9 100644 --- a/src/version.map +++ b/src/version.map @@ -1,4 +1,4 @@ { - global: MESA*runtime_log*;GIT_VERSION_*;MESA_shm_alloc_overview;MESA_handle_fmt_rule_register;MESA_shm_ring_queue_is_empty;MESA_shm_ring_queue_is_full; + global: MESA*runtime_log*;GIT_VERSION_*;MESA_shm_alloc_overview;MESA_handle_fmt_rule_register;MESA_shm_ring_queue_is_empty;MESA_shm_ring_queue_is_full;MESA_shm_ring_queue_set_empty; local: *; };