优化共享内存解析格式,减少内存浪费。优化生产者进程的效率。
This commit is contained in:
@@ -5,8 +5,7 @@
|
|||||||
#define MESA_SHM_LOG_PATH_LEN 1024
|
#define MESA_SHM_LOG_PATH_LEN 1024
|
||||||
#define MESA_SHM_RING_QUEUE_NUM 128
|
#define MESA_SHM_RING_QUEUE_NUM 128
|
||||||
#define MESA_SHM_RING_QUEUE_BLOCK_NUM 8192
|
#define MESA_SHM_RING_QUEUE_BLOCK_NUM 8192
|
||||||
#define MESA_SHM_RING_QUEUE_BLOCK_BUFLEN 4096
|
#define MESA_SHM_RING_QUEUE_BLOCK_SIZE 4096
|
||||||
#define MESA_SHM_RING_QUEUE_BLOCK_SIZE (MESA_SHM_LOG_PATH_LEN + sizeof(int) + MESA_SHM_RING_QUEUE_BLOCK_BUFLEN) /*log file + (int)payload len + payload*/
|
|
||||||
#define MESA_SHM_KEY_OVERVIEW 35720
|
#define MESA_SHM_KEY_OVERVIEW 35720
|
||||||
#define MESA_SHM_KEY_MIN (MESA_SHM_KEY_OVERVIEW + 1)
|
#define MESA_SHM_KEY_MIN (MESA_SHM_KEY_OVERVIEW + 1)
|
||||||
#define MESA_SHM_KEY_MAX (MESA_SHM_KEY_MIN + MESA_SHM_RING_QUEUE_NUM -1)
|
#define MESA_SHM_KEY_MAX (MESA_SHM_KEY_MIN + MESA_SHM_RING_QUEUE_NUM -1)
|
||||||
|
|||||||
@@ -233,19 +233,27 @@ int check_reopen_log_file(struct log_file_list *node)
|
|||||||
}
|
}
|
||||||
return CONSUMER_SUCCESS;
|
return CONSUMER_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
(int)file_len + (str)file + '\0' + (int)payload_len + (str)payload + '\0'
|
||||||
|
*/
|
||||||
void consumer_ring_queue_to_file(struct MESA_shm_queue_head *head)
|
void consumer_ring_queue_to_file(struct MESA_shm_queue_head *head)
|
||||||
{
|
{
|
||||||
|
int *p_file_len = NULL;
|
||||||
|
int file_len = 0;
|
||||||
char *p_file = NULL;
|
char *p_file = NULL;
|
||||||
int *p_payload_len = NULL;
|
int *p_payload_len = NULL;
|
||||||
char *payload = NULL;
|
|
||||||
int payload_len = 0;
|
int payload_len = 0;
|
||||||
char buf[MESA_SHM_LOG_BUF_PREFIX_LEN + MESA_SHM_RING_QUEUE_BLOCK_BUFLEN] = {0};
|
char *payload = NULL;
|
||||||
|
char buf[MESA_SHM_LOG_BUF_PREFIX_LEN + MESA_SHM_RING_QUEUE_BLOCK_SIZE] = {0};
|
||||||
int n = 0;
|
int n = 0;
|
||||||
char strtime[DEFAUT_BUF_SIZE] = {0};
|
char strtime[DEFAUT_BUF_SIZE] = {0};
|
||||||
struct log_file_list *node = NULL;
|
struct log_file_list *node = NULL;
|
||||||
get_cur_strftime(strtime, sizeof(strtime));
|
get_cur_strftime(strtime, sizeof(strtime));
|
||||||
while(!MESA_shm_ring_queue_is_empty(head)){
|
while(!MESA_shm_ring_queue_is_empty(head)){
|
||||||
p_file = (char *)(head + 1) + (head->blksize * head->rd_idx);
|
p_file_len = (int *)((char *)(head + 1) + (head->blksize * head->rd_idx));
|
||||||
|
file_len = *p_file_len;
|
||||||
|
p_file = (char *)(p_file_len + 1);
|
||||||
node = get_log_file_node(p_file);
|
node = get_log_file_node(p_file);
|
||||||
if(node == NULL){
|
if(node == NULL){
|
||||||
node = create_log_file_node(p_file);
|
node = create_log_file_node(p_file);
|
||||||
@@ -259,7 +267,7 @@ void consumer_ring_queue_to_file(struct MESA_shm_queue_head *head)
|
|||||||
continue ;
|
continue ;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
p_payload_len = (int *)(p_file + MESA_SHM_LOG_PATH_LEN);
|
p_payload_len = (int *)(p_file + file_len + 1);
|
||||||
payload_len = *p_payload_len;
|
payload_len = *p_payload_len;
|
||||||
payload = (char *)(p_payload_len + 1);
|
payload = (char *)(p_payload_len + 1);
|
||||||
n = snprintf(buf, sizeof(buf), "%s, %s\n", strtime, payload);
|
n = snprintf(buf, sizeof(buf), "%s, %s\n", strtime, payload);
|
||||||
@@ -270,14 +278,19 @@ void consumer_ring_queue_to_file(struct MESA_shm_queue_head *head)
|
|||||||
}
|
}
|
||||||
void consumer_ring_queue_to_terminal(struct MESA_shm_queue_head *head, int producer_pid)
|
void consumer_ring_queue_to_terminal(struct MESA_shm_queue_head *head, int producer_pid)
|
||||||
{
|
{
|
||||||
|
int *p_file_len = NULL;
|
||||||
|
int file_len = 0;
|
||||||
char *p_file = NULL;
|
char *p_file = NULL;
|
||||||
int *p_payload_len = NULL;
|
int *p_payload_len = NULL;
|
||||||
char *payload = NULL;
|
|
||||||
int payload_len = 0;
|
int payload_len = 0;
|
||||||
char buf[MESA_SHM_LOG_BUF_PREFIX_LEN + MESA_SHM_RING_QUEUE_BLOCK_BUFLEN] = {0};
|
char *payload = NULL;
|
||||||
|
char buf[MESA_SHM_LOG_BUF_PREFIX_LEN + MESA_SHM_RING_QUEUE_BLOCK_SIZE] = {0};
|
||||||
int n = 0;
|
int n = 0;
|
||||||
while(!MESA_shm_ring_queue_is_empty(head)){
|
while(!MESA_shm_ring_queue_is_empty(head)){
|
||||||
p_payload_len = (int *)((char *)(head + 1) + (head->blksize * head->rd_idx) + MESA_SHM_LOG_PATH_LEN);
|
p_file_len = (int *)((char *)(head + 1) + (head->blksize * head->rd_idx));
|
||||||
|
file_len = *p_file_len;
|
||||||
|
p_file = (char *)(p_file_len + 1);
|
||||||
|
p_payload_len = (int *)(p_file + file_len + 1);
|
||||||
payload_len = *p_payload_len;
|
payload_len = *p_payload_len;
|
||||||
payload = (char *)(p_payload_len + 1);
|
payload = (char *)(p_payload_len + 1);
|
||||||
n = snprintf(buf, sizeof(buf), "pid:%d, %s\n", producer_pid, payload);
|
n = snprintf(buf, sizeof(buf), "pid:%d, %s\n", producer_pid, payload);
|
||||||
@@ -523,7 +536,8 @@ int main(int argc, char **argv)
|
|||||||
if(!MESA_shm_ring_queue_is_empty(ring_queue_head[i])){
|
if(!MESA_shm_ring_queue_is_empty(ring_queue_head[i])){
|
||||||
if(g_care_pid == CONSUMER_CARE_PID_SPEC){
|
if(g_care_pid == CONSUMER_CARE_PID_SPEC){
|
||||||
if(!producer_pid_is_cared(tmp_ovw->producer_pid)){
|
if(!producer_pid_is_cared(tmp_ovw->producer_pid)){
|
||||||
MESA_shm_ring_queue_set_empty(ring_queue_head[i]);
|
/*do not set empty, in order to improve performance for producer process*/
|
||||||
|
/*MESA_shm_ring_queue_set_empty(ring_queue_head[i]);*/
|
||||||
continue ;
|
continue ;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -216,7 +216,7 @@ struct MESA_pthread_private *MESA_create_pthread_private(void *handle)
|
|||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
pri->fmt_buf_len = p_handle->fmt_buf_len;
|
pri->fmt_buf_len = p_handle->fmt_buf_len;
|
||||||
pri->cache_buf_len = MESA_SHM_RING_QUEUE_BLOCK_BUFLEN;
|
pri->cache_buf_len = MESA_SHM_RING_QUEUE_BLOCK_SIZE;
|
||||||
pri->cache_buf = malloc(pri->cache_buf_len);
|
pri->cache_buf = malloc(pri->cache_buf_len);
|
||||||
if(pri->cache_buf == NULL){
|
if(pri->cache_buf == NULL){
|
||||||
goto error;
|
goto error;
|
||||||
|
|||||||
@@ -187,29 +187,44 @@ void MESA_shm_ring_queue_set_empty(struct MESA_shm_queue_head *head)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
(int)file_len + (str)p_file + '\0' + (int)payload_len + (str)payload + '\0'
|
||||||
|
*/
|
||||||
int MESA_shm_copy_buf_to_ring_queue(char *buf, int buflen, struct MESA_shm_queue_head *head, char *log_file, int log_file_len)
|
int MESA_shm_copy_buf_to_ring_queue(char *buf, int buflen, struct MESA_shm_queue_head *head, char *log_file, int log_file_len)
|
||||||
{
|
{
|
||||||
int len = buflen < (MESA_SHM_RING_QUEUE_BLOCK_BUFLEN -1)?buflen:(MESA_SHM_RING_QUEUE_BLOCK_BUFLEN - 1);
|
int available_len = 0;
|
||||||
|
int *p_file_len = 0;
|
||||||
char *p_file = NULL;
|
char *p_file = NULL;
|
||||||
int *p_buf_len = NULL;
|
int *p_payload_len = 0;
|
||||||
char *p_buf = NULL;
|
int payload_len = 0;
|
||||||
|
char *p_payload = NULL;
|
||||||
if(head == NULL){
|
if(head == NULL){
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
if(log_file_len == 0){
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
if(MESA_shm_ring_queue_is_full(head)){
|
if(MESA_shm_ring_queue_is_full(head)){
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
p_file = (char *)(head + 1) + (head->blksize * head->wr_idx);
|
if(log_file_len <= 0 || log_file_len >= MESA_SHM_LOG_PATH_LEN){
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
available_len = MESA_SHM_RING_QUEUE_BLOCK_SIZE - log_file_len -(2 * sizeof(int)) - 2;
|
||||||
|
if(available_len <= 0){
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
payload_len = buflen < available_len ? buflen : available_len;
|
||||||
|
p_file_len = (int *)((char *)(head + 1) + (head->blksize * head->wr_idx));
|
||||||
|
*p_file_len = log_file_len;
|
||||||
|
p_file = (char *)(p_file_len + 1);
|
||||||
memcpy(p_file, log_file, log_file_len);
|
memcpy(p_file, log_file, log_file_len);
|
||||||
p_file[log_file_len] = '\0';
|
p_file[log_file_len] = '\0';
|
||||||
p_buf_len = (int *)(p_file + MESA_SHM_LOG_PATH_LEN);
|
p_payload_len = (int *)(p_file + log_file_len + 1);
|
||||||
*p_buf_len = len;
|
*p_payload_len = payload_len;
|
||||||
p_buf = (char *)(p_buf_len + 1);
|
p_payload = (char *)(p_payload_len + 1);
|
||||||
memcpy(p_buf, buf, len);
|
memcpy(p_payload, buf, payload_len);
|
||||||
p_buf[len] = '\0';
|
p_payload[payload_len] = '\0';
|
||||||
head->wr_idx = (head->wr_idx + 1) % head->blknum;
|
head->wr_idx = (head->wr_idx + 1) % head->blknum;
|
||||||
return len;
|
return payload_len;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user