#include #include #include #include #include #include #include #include #include #include #include #include #include "list.h" #include "MESA_shm_ring_queue.h" #define DEFAUT_BUF_SIZE 256 #define DEFAULT_COMMAND_LEN 1024 #define CONSUMER_SUCCESS 0 #define CONSUMER_ERROR -1 #define CONSUMER_OUTPUT_MODE_FILE 0 #define CONSUMER_OUTPUT_MODE_TERMINAL 1 #define CONSUMER_CARE_PID_ALL 0 #define CONSUMER_CARE_PID_SPEC 1 #define CONSUMER_CARE_PID_MAX_LEN 128 struct log_file_list{ char log_file_pre[MESA_SHM_LOG_PATH_LEN]; char real_log_file[MESA_SHM_LOG_PATH_LEN]; struct tm create_date; int fd; dev_t dev; ino_t ino; struct list_head list; }; struct care_pid_list{ int pid; struct list_head list; }; struct log_file_list g_log_file_list; struct care_pid_list g_care_pid_list; int g_output_mode = CONSUMER_OUTPUT_MODE_FILE; int g_care_pid = CONSUMER_CARE_PID_ALL; int *g_status = NULL; int g_cur_tty_fd = -1; struct MESA_shm_overview *g_shm_overview = NULL; int g_shm_overview_id = -1; void init_ring_queue_head_arrray(struct MESA_shm_overview *ovw, struct MESA_shm_queue_head **ring_queue_head) { int i = 0; struct MESA_shm_overview *tmp_ovw = NULL; for(i = 0; i< MESA_SHM_RING_QUEUE_NUM; i++){ tmp_ovw = ovw + i; if(tmp_ovw->shmid == -1){ break; } ring_queue_head[i] = shmat(tmp_ovw->shmid, NULL, 0); if(ring_queue_head[i] == (struct MESA_shm_queue_head *)-1){ ring_queue_head[i] = NULL; } } return ; } void consumer_daemo() { int fd = -1; switch (fork()) { case -1: printf("fork error\n"); return ; case 0: break; default: exit(0); } if (setsid() == -1) { return ; } umask(0); fd = open("/dev/null", O_RDWR); if (fd == -1) { return ; } if (dup2(fd, STDIN_FILENO) == -1) { return ; } if (dup2(fd, STDOUT_FILENO) == -1) { return ; } if (fd > STDERR_FILENO) { if (close(fd) == -1) { return ; } } return ; } int consumer_is_running(char *process_name) { FILE *fp = NULL; char buf[DEFAUT_BUF_SIZE] = {0}; int count = 0; char command[DEFAUT_BUF_SIZE] = {0}; if(process_name == NULL){ return 0; } snprintf(command, sizeof(command), "ps -ef | grep %s | grep -v grep | wc -l", process_name); fp = popen(command, "r"); if(fp == NULL){ return 0; } if((fgets(buf, sizeof(buf), fp)) != NULL){ count = atoi(buf); } pclose(fp); if(count > 1){ return 1; }else{ return 0; } return 0; } char *get_exe_name(char *argv) { char * p = NULL; p = rindex(argv, '/'); if(p == NULL){ p = argv; }else{ p = p + 1; } return p; } struct log_file_list *get_log_file_node(char *log_file) { struct log_file_list *tmp; struct log_file_list *n; list_for_each_entry_safe(tmp, n, &g_log_file_list.list, list){ if(strcmp(tmp->log_file_pre, log_file) == 0){ return tmp; } } return NULL; } struct log_file_list * create_log_file_node(char *log_file) { struct log_file_list *node; struct tm date; time_t curtime = 0; struct stat buf; int n = 0; node = (struct log_file_list *)malloc(sizeof(struct log_file_list)); if(node == NULL){ return NULL; } memset(node, 0 ,sizeof(struct log_file_list)); memcpy(node->log_file_pre, log_file, strlen(log_file)); curtime = time(NULL); localtime_r(&curtime, &date); n = snprintf(node->real_log_file, sizeof(node->real_log_file), "%s.%d-%d-%d", node->log_file_pre, date.tm_year + 1900, date.tm_mon + 1, date.tm_mday); if(n >= sizeof(node->real_log_file)){ goto error; } node->fd = open(node->real_log_file, O_RDWR | O_CREAT | O_APPEND, 0666); if(node->fd < 0){ goto error; } if(stat(node->real_log_file, &buf) < 0){ goto error; } node->dev = buf.st_dev; node->ino = buf.st_ino; node->create_date = date; list_add(&node->list, &g_log_file_list.list); return node; error: if(node != NULL){ free(node); } return NULL; } void get_cur_date(struct tm *date) { time_t curtime = 0; curtime = time(NULL); localtime_r(&curtime, date); return ; } void get_cur_strftime(char *buf, int maxlen) { struct tm local; time_t curtime = time(NULL); localtime_r(&curtime, &local); strftime(buf, maxlen, "%c", &local); return ; } int reopen_log_file(struct log_file_list *node) { struct stat buf; close(node->fd); node->fd = open(node->real_log_file, O_RDWR | O_CREAT | O_APPEND, 0666); if(node->fd < 0){ return CONSUMER_ERROR; } if(stat(node->real_log_file, &buf) < 0){ return CONSUMER_ERROR; } node->dev = buf.st_dev; node->ino = buf.st_ino; return CONSUMER_SUCCESS; } int check_reopen_log_file(struct log_file_list *node) { struct stat buf; struct tm date; int n = 0; get_cur_date(&date); if(date.tm_year != node->create_date.tm_year || date.tm_mon != node->create_date.tm_mon || date.tm_mday != node->create_date.tm_mday){ node->create_date = date; n = snprintf(node->real_log_file, sizeof(node->real_log_file), "%s.%d-%d-%d", node->log_file_pre, node->create_date.tm_year + 1900, node->create_date.tm_mon + 1, node->create_date.tm_mday); if(n >= sizeof(node->real_log_file)){ return CONSUMER_ERROR; } return reopen_log_file(node); } if(stat(node->real_log_file, &buf) < 0){ return reopen_log_file(node); /* we'll have to restat the newly created file to get the inode info*/ } if(buf.st_dev != node->dev || buf.st_ino != node->ino){ return reopen_log_file(node); } return CONSUMER_SUCCESS; } /* (int)file_len + (str)file + '\0' + (int)payload_len + (str)payload + '\0' */ void consumer_ring_queue_to_file(struct MESA_shm_queue_head *head) { int *p_file_len = NULL; int file_len = 0; char *p_file = NULL; int *p_payload_len = NULL; int payload_len = 0; char *payload = NULL; char buf[MESA_SHM_LOG_BUF_PREFIX_LEN + MESA_SHM_RING_QUEUE_BLOCK_SIZE] = {0}; int n = 0; char strtime[DEFAUT_BUF_SIZE] = {0}; struct log_file_list *node = NULL; get_cur_strftime(strtime, sizeof(strtime)); while(!MESA_shm_ring_queue_is_empty(head)){ p_file_len = (int *)((char *)(head + 1) + (head->blksize * head->rd_idx)); file_len = *p_file_len; p_file = (char *)(p_file_len + 1); node = get_log_file_node(p_file); if(node == NULL){ node = create_log_file_node(p_file); if(node == NULL){ head->rd_idx = (head->rd_idx + 1) % head->blknum; continue ; } }else{ if(check_reopen_log_file(node) != CONSUMER_SUCCESS){ head->rd_idx = (head->rd_idx + 1) % head->blknum; continue ; } } p_payload_len = (int *)(p_file + file_len + 1); payload_len = *p_payload_len; payload = (char *)(p_payload_len + 1); n = snprintf(buf, sizeof(buf), "%s, %s\n", strtime, payload); write(node->fd, buf, n); head->rd_idx = (head->rd_idx + 1) % head->blknum; } return ; } void consumer_ring_queue_to_terminal(struct MESA_shm_queue_head *head, int producer_pid) { int *p_file_len = NULL; int file_len = 0; char *p_file = NULL; int *p_payload_len = NULL; int payload_len = 0; char *payload = NULL; char buf[MESA_SHM_LOG_BUF_PREFIX_LEN + MESA_SHM_RING_QUEUE_BLOCK_SIZE] = {0}; int n = 0; while(!MESA_shm_ring_queue_is_empty(head)){ p_file_len = (int *)((char *)(head + 1) + (head->blksize * head->rd_idx)); file_len = *p_file_len; p_file = (char *)(p_file_len + 1); p_payload_len = (int *)(p_file + file_len + 1); payload_len = *p_payload_len; payload = (char *)(p_payload_len + 1); n = snprintf(buf, sizeof(buf), "pid:%d, %s\n", producer_pid, payload); if(g_cur_tty_fd > 0){ write(g_cur_tty_fd, buf, n); } head->rd_idx = (head->rd_idx + 1) % head->blknum; } return ; } int producer_pid_is_cared(int producer_pid) { struct care_pid_list *tmp; struct care_pid_list *n; list_for_each_entry_safe(tmp, n, &g_care_pid_list.list, list){ if(tmp->pid == producer_pid){ return 1; } } return 0; } struct care_pid_list *create_care_pid_node(int pid) { struct care_pid_list *node; node = (struct care_pid_list *)malloc(sizeof(struct care_pid_list)); if(node == NULL){ printf("malloc pid node error\n"); return NULL; } node->pid = pid; return node; } int add_care_pid_node(struct care_pid_list *node, int pid) { if(!producer_pid_is_cared(pid)){ list_add(&node->list, &g_care_pid_list.list); }else{ free(node); printf("pid repeat\n"); return CONSUMER_ERROR; } if(g_care_pid == CONSUMER_CARE_PID_ALL){ g_care_pid = CONSUMER_CARE_PID_SPEC; } return CONSUMER_SUCCESS; } int parse_care_pid(char *p_pid) { char *cur; char *p; char tmp_pid_str[CONSUMER_CARE_PID_MAX_LEN] = {0}; int tmp_pid; int argcc = 0; struct care_pid_list *node = NULL;; cur = p_pid; p = p_pid; while(*p){ if(*p == '\0'){ break; } if((*p < '0' || *p > '9') && !isspace(*p)){ printf("invalid pid\n"); return CONSUMER_ERROR; } p++; } while(isspace(*cur)){ cur++; } while(1){ if(*cur == '\0'){ break ; } if(!isspace(*cur)){ tmp_pid_str[argcc] = *cur; argcc++; cur++; }else{ while(isspace(*cur)){ cur++; } tmp_pid = atoi(tmp_pid_str); node = create_care_pid_node(tmp_pid); if(node == NULL){ return CONSUMER_ERROR; } if(add_care_pid_node(node,tmp_pid) == CONSUMER_ERROR){ printf("add pid error\n"); return CONSUMER_ERROR; } memset(tmp_pid_str, 0 ,sizeof(tmp_pid_str)); argcc = 0; } if(argcc >= CONSUMER_CARE_PID_MAX_LEN){ printf("invalid pid\n"); return CONSUMER_ERROR; } if(*cur == 0 && tmp_pid_str[0] != '\0'){ tmp_pid = atoi(tmp_pid_str); node = create_care_pid_node(tmp_pid); if(node == NULL){ return CONSUMER_ERROR; } if(add_care_pid_node(node,tmp_pid) == CONSUMER_ERROR){ printf("add pid error\n"); return CONSUMER_ERROR; } } } return CONSUMER_SUCCESS; } int get_options(int argc, char **argv) { char *p; int i; for(i = 1; i < argc; i++){ p = argv[i]; if(*p++ != '-'){ return CONSUMER_ERROR; } if(strcmp(p, "pid") == 0){ if(argv[++i]){ if(strcmp(argv[i], "all") == 0){ g_care_pid = CONSUMER_CARE_PID_ALL; }else{ if(parse_care_pid(argv[i]) == CONSUMER_ERROR){ printf("parse pid error\n"); return CONSUMER_ERROR; } } }else{ printf("miss parameter\n"); return CONSUMER_ERROR; } }else if(strcmp(p, "mode") == 0){ if(argv[++i]){ if(strcmp(argv[i], "file") == 0){ g_output_mode = CONSUMER_OUTPUT_MODE_FILE; }else if(strcmp(argv[i], "terminal") == 0){ g_output_mode = CONSUMER_OUTPUT_MODE_TERMINAL; }else{ printf("invalid option:%s\n",argv[i]); return CONSUMER_ERROR; } }else{ printf("miss parameter\n"); return CONSUMER_ERROR; } }else{ printf("invalid option:%s\n",p); return CONSUMER_ERROR; } } return CONSUMER_SUCCESS; } void kill_old_process(char *process_name) { FILE *fp = NULL; char buf[DEFAUT_BUF_SIZE] = {0}; char ps_cmd[DEFAULT_COMMAND_LEN] = {0}; char kill_cmd[DEFAULT_COMMAND_LEN] = {0}; int pid = getpid(); int tmp_pid = 0; snprintf(ps_cmd, sizeof(ps_cmd), "ps -ef | grep %s | grep -v grep | awk '{print $2}'", process_name); fp = popen(ps_cmd, "r"); if(fp == NULL){ return ; } while((fgets(buf, sizeof(buf), fp)) != NULL){ tmp_pid = atoi(buf); if(tmp_pid != pid){ snprintf(kill_cmd, sizeof(kill_cmd), "kill %d", tmp_pid); system(kill_cmd); } } pclose(fp); return ; } int get_cur_tty_fd() { FILE *fp = NULL; char cur_tty_name[DEFAUT_BUF_SIZE] = {0}; char *p = NULL; int cur_tty_fd = -1; fp = popen("tty", "r"); if(fp == NULL){ return -1; } if((fgets(cur_tty_name, sizeof(cur_tty_name), fp)) != NULL){ if(strncmp(cur_tty_name, "/dev/pts/", strlen("/dev/pts/")) != 0){ goto error; } p = strchr(cur_tty_name, '\n'); if(p == NULL){ goto error; } *p = '\0'; cur_tty_fd = open(cur_tty_name, O_WRONLY); if(cur_tty_fd < 0){ goto error; } }else{ goto error; } pclose(fp); return cur_tty_fd; error: pclose(fp); return -1; } void signal_handler_exit(int signum) { struct shmid_ds buf; if(g_status != NULL){ *g_status = MESA_CONSUMER_NOT_RUNNING; } if(g_shm_overview == NULL || g_shm_overview_id == -1){ goto out; } if(shmctl(g_shm_overview_id, IPC_STAT, &buf) == -1){ goto out; } if(buf.shm_nattch <= 1){ /* This is the last process to use this shared memory, we need to unlink the shared memory before the process exit */ MESA_shm_unlink(g_shm_overview, g_shm_overview_id); } out: exit(0); } /* if we use shmctl function and IPC_RMID parameter, the shared memory will actually be destroyed only after the last process detaches it, but no more attaches for the shared memory identified by the shmid parameter are allowed, producer process and consumer process start at an indeterminate time, so we have to use IPC_RMID parameter to unlink the shared memory when all processes exit */ void register_sginal_handler() { signal(SIGKILL, signal_handler_exit); signal(SIGTRAP, signal_handler_exit); signal(SIGABRT, signal_handler_exit); signal(SIGBUS, signal_handler_exit); signal(SIGFPE, signal_handler_exit); signal(SIGSEGV, signal_handler_exit); signal(SIGTERM, signal_handler_exit); signal(SIGINT, signal_handler_exit); signal(SIGQUIT, signal_handler_exit); return ; } void print_help(char *exe_name) { printf("-pid default all, optional parameter\n"); printf("-mode default file, optional parameter\n"); printf("./%s -pid [ '$pid1 $pid2 $pid3...' | all ] -mode [ file | terminal ]\n",exe_name); return ; } int main(int argc, char **argv) { char *exe_name = get_exe_name(argv[0]); if(argc == 2 && argv[1] != NULL && strstr(argv[1], "help") != NULL){ print_help(exe_name); return 0; } if(argc == 2 && argv[1] != NULL && (strcmp(argv[1], "stop") == 0)){ kill_old_process(exe_name); return 0; } INIT_LIST_HEAD(&g_care_pid_list.list); if(get_options(argc, argv) < 0){ printf("parse options error\n"); return 0; } kill_old_process(exe_name); if(g_output_mode == CONSUMER_OUTPUT_MODE_FILE){ consumer_daemo(); }else{ g_cur_tty_fd = get_cur_tty_fd(); } int i = 0; int ret = 0; struct MESA_shm_overview *tmp_ovw = NULL; struct MESA_shm_queue_head *ring_queue_head[MESA_SHM_RING_QUEUE_NUM] = {NULL}; INIT_LIST_HEAD(&g_log_file_list.list); ret = MESA_shm_alloc_overview(&g_shm_overview, &g_shm_overview_id, &g_status); if(ret < 0){ return 0; } *g_status = MESA_CONSUMER_RUNNING; register_sginal_handler(); init_ring_queue_head_arrray(g_shm_overview, ring_queue_head); while(1){ for(i = 0; i< MESA_SHM_RING_QUEUE_NUM; i++){ tmp_ovw = g_shm_overview + i; if(tmp_ovw->shmid == -1){ break; } if(ring_queue_head[i] == NULL){ ring_queue_head[i] = shmat(tmp_ovw->shmid, NULL, 0); if(ring_queue_head[i] == (struct MESA_shm_queue_head *)-1){ ring_queue_head[i] = NULL; break ; } } if(!MESA_shm_ring_queue_is_empty(ring_queue_head[i])){ if(g_care_pid == CONSUMER_CARE_PID_SPEC){ if(!producer_pid_is_cared(tmp_ovw->producer_pid)){ /*do not set empty, in order to improve performance for producer process*/ /*MESA_shm_ring_queue_set_empty(ring_queue_head[i]);*/ continue ; } } if(g_output_mode == CONSUMER_OUTPUT_MODE_FILE){ consumer_ring_queue_to_file(ring_queue_head[i]); }else{ consumer_ring_queue_to_terminal(ring_queue_head[i], tmp_ovw->producer_pid); } } } usleep(5000); } return 0; }