From d7b64b457732f1c9da1f75e9d04395218eb0761c Mon Sep 17 00:00:00 2001 From: guo_peixu Date: Fri, 10 Jun 2022 16:14:32 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8F=90=E4=BA=A4=E7=8E=AF=E5=BD=A2=E7=BC=93?= =?UTF-8?q?=E5=86=B2=E5=8C=BA=E6=B7=BB=E5=8A=A0=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- inc/MESA_shm_ring_queue.h | 55 +++++++++ shm_consumer/MESA_shm_consumer.c | 99 +++++++++++++++ shm_consumer/Makefile | 26 ++++ src/MESA_shm_ring_queue.c | 202 +++++++++++++++++++++++++++++++ 4 files changed, 382 insertions(+) create mode 100644 inc/MESA_shm_ring_queue.h create mode 100644 shm_consumer/MESA_shm_consumer.c create mode 100644 shm_consumer/Makefile create mode 100644 src/MESA_shm_ring_queue.c diff --git a/inc/MESA_shm_ring_queue.h b/inc/MESA_shm_ring_queue.h new file mode 100644 index 0000000..523005a --- /dev/null +++ b/inc/MESA_shm_ring_queue.h @@ -0,0 +1,55 @@ +#ifndef _MESA_SHM_RING_QUEUE_H_ +#define _MESA_SHM_RING_QUEUE_H_ +#define MESA_SHM_RING_QUEUE_NUM 128 +#define MESA_SHM_RING_QUEUE_BLOCK_NUM 8192 +#define MESA_SHM_RING_QUEUE_BLOCK_BUFLEN 4096 +#define MESA_SHM_RING_QUEUE_BLOCK_SIZE (MESA_SHM_RING_QUEUE_BLOCK_BUFLEN + sizeof(int) + 1) /*user buf + (int)len flag + '\n'*/ +#define MESA_SHM_KEY_OVERVIEW 35720 +#define MESA_SHM_KEY_MIN (MESA_SHM_KEY_OVERVIEW + 1) +#define MESA_SHM_KEY_MAX (MESA_SHM_KEY_MIN + MESA_SHM_RING_QUEUE_NUM -1) + +#define MESA_SHM_RING_QUEUE_INITIAL 0 +#define MESA_SHM_RING_QUEUE_IDLE 1 +#define MESA_SHM_RING_QUEUE_HALF_IDLE 2 +#define MESA_SHM_RING_QUEUE_USED 3 + +struct MESA_shm_overview *MESA_shm_alloc_overview(); +struct MESA_shm_queue_head *MESA_shm_get_ring_queue(); +void MESA_shm_init_mutex(); +void MESA_shm_init_overview(); +void MESA_shm_recycle_ring_queue(struct MESA_shm_queue_head *ring_queue_head); +int MESA_shm_copy_buf_to_ring_queue(char *buf, int buflen, struct MESA_shm_queue_head *head); +void MESA_shm_write_ring_queue_to_file(int fd, struct MESA_shm_queue_head *head); + + + +struct MESA_shm_overview{ + int shmkey; + int shmid; + int idx; + volatile char status; +}; + +struct MESA_shm_queue_head{ + unsigned int blksize; + unsigned int blknum; + volatile unsigned int rd_idx; + volatile unsigned int wr_idx; + int ovw_idx; +}; + +#endif + + + + + + + + + + + + + + diff --git a/shm_consumer/MESA_shm_consumer.c b/shm_consumer/MESA_shm_consumer.c new file mode 100644 index 0000000..486b7b3 --- /dev/null +++ b/shm_consumer/MESA_shm_consumer.c @@ -0,0 +1,99 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "MESA_shm_ring_queue.h" + +void init_ring_queue_head_arrray(struct MESA_shm_overview *ovw, struct MESA_shm_queue_head **ring_queue_head) +{ + int i = 0; + struct MESA_shm_overview *tmp_ovw = NULL; + for(i = 0; i< MESA_SHM_RING_QUEUE_NUM; i++){ + tmp_ovw = ovw + i; + if(tmp_ovw->shmid == -1){ + break; + } + ring_queue_head[i] = shmat(tmp_ovw->shmid, NULL, 0); + } + return ; +} +void consumer_daemo() +{ + int fd = -1; + switch (fork()) { + case -1: + printf("fork error\n"); + return ; + case 0: + break; + default: + exit(0); + } + if (setsid() == -1) { + return ; + } + umask(0); + fd = open("/dev/null", O_RDWR); + if (fd == -1) { + return ; + } + if (dup2(fd, STDIN_FILENO) == -1) { + return ; + } + + if (dup2(fd, STDOUT_FILENO) == -1) { + return ; + } + if (fd > STDERR_FILENO) { + if (close(fd) == -1) { + return ; + } + } + return ; +} +int main(int argc, char **args) +{ + consumer_daemo(); + int i = 0; + struct MESA_shm_overview *shm_overview = NULL; + struct MESA_shm_overview *tmp_ovw = NULL; + struct MESA_shm_queue_head *ring_queue_head[MESA_SHM_RING_QUEUE_NUM] = {NULL}; + int log_file_fd = -1; + shm_overview = MESA_shm_alloc_overview(); + if(shm_overview == NULL){ + return 0; + } + init_ring_queue_head_arrray(shm_overview, ring_queue_head); + log_file_fd = open("/root/MESA_log", O_RDWR | O_CREAT | O_APPEND, 0666); + if(log_file_fd < 0){ + return 0; + } + while(1){ + for(i = 0; i< MESA_SHM_RING_QUEUE_NUM; i++){ + tmp_ovw = shm_overview + i; + if(tmp_ovw->shmid == -1){ + break; + } + if(ring_queue_head[i] == NULL){ + ring_queue_head[i] = shmat(tmp_ovw->shmid, NULL, 0); + if(ring_queue_head[i] == NULL){ + break ; + } + } + if(tmp_ovw->status == MESA_SHM_RING_QUEUE_USED || tmp_ovw->status == MESA_SHM_RING_QUEUE_HALF_IDLE){ + MESA_shm_write_ring_queue_to_file(log_file_fd, ring_queue_head[i]); + if(tmp_ovw->status == MESA_SHM_RING_QUEUE_HALF_IDLE){ + tmp_ovw->status = MESA_SHM_RING_QUEUE_IDLE; + } + } + } + usleep(5000); + } + return 0; +} diff --git a/shm_consumer/Makefile b/shm_consumer/Makefile new file mode 100644 index 0000000..1940ba2 --- /dev/null +++ b/shm_consumer/Makefile @@ -0,0 +1,26 @@ +vpath %.h ../inc +vpath %.a ../lib + +CC=gcc + +CFLAGS= -g3 -Wall -fPIC -O -Werror +CFLAGS+=-I../inc/ + +#LIB=-L../lib/ +LIB=../lib/libMESA_handle_logger.a ../lib/libMESA_snprintf.a +LIB+=-lpthread + +LIB_FILE=$(wildcard ../lib/*.a) + +SRC=MESA_shm_consumer.c +TARGET=MESA_shm_consumer + +all:$(TARGET) + +$(TARGET):$(SRC) $(LIB_FILE) + $(CC) $(CFLAGS) $(INC) $(LIBPATH) $< $(LIB) -o $@ + +clean : + rm -f $(TARGET) + + diff --git a/src/MESA_shm_ring_queue.c b/src/MESA_shm_ring_queue.c new file mode 100644 index 0000000..8c4ef26 --- /dev/null +++ b/src/MESA_shm_ring_queue.c @@ -0,0 +1,202 @@ +#include +#include +#include +#include +#include +#include +#include +#include "MESA_handle_logger.h" +#include "MESA_shm_ring_queue.h" + +pthread_mutex_t MESA_shm_mutex; +struct MESA_shm_overview *MESA_shm_ovw = NULL; + +struct MESA_shm_overview *MESA_shm_alloc_overview() +{ + int shmsize = sizeof(struct MESA_shm_overview) * MESA_SHM_RING_QUEUE_NUM; + struct MESA_shm_overview *shm_overview = NULL; + struct MESA_shm_overview *tmp_ovw = NULL; + int i = 0; + int shmid = shmget(MESA_SHM_KEY_OVERVIEW, 0, 0); + if(shmid == -1){ + shmid = shmget(MESA_SHM_KEY_OVERVIEW, shmsize, IPC_CREAT | 0666); + if(shmid == -1){ + return NULL; + }else{ + shm_overview = (struct MESA_shm_overview *)shmat(shmid, NULL, 0); + if(shm_overview == NULL){ + return NULL; + } + memset((void *)shm_overview, 0, shmsize); + for(i = 0; i < MESA_SHM_RING_QUEUE_NUM; i++){ + tmp_ovw = shm_overview + i; + tmp_ovw->shmkey = i + MESA_SHM_KEY_MIN; + tmp_ovw->shmid = -1; + tmp_ovw->idx = i; + } + } + }else{ + shm_overview = (struct MESA_shm_overview *)shmat(shmid, NULL, 0); + } + return shm_overview; +} + +void MESA_shm_init_mutex() +{ + pthread_mutex_init(&MESA_shm_mutex, NULL); + return ; +} +void MESA_shm_init_overview() +{ + if(MESA_shm_ovw == NULL){ + MESA_shm_ovw = MESA_shm_alloc_overview(); + } + return ; + +} +void MESA_shm_init_ring_queue(struct MESA_shm_queue_head *head, struct MESA_shm_overview *ovw) +{ + head->blksize = MESA_SHM_RING_QUEUE_BLOCK_SIZE; + head->blknum = MESA_SHM_RING_QUEUE_BLOCK_NUM; + head->rd_idx = 0; + head->wr_idx = 0; + ovw->status = MESA_SHM_RING_QUEUE_USED; + head->ovw_idx = ovw->idx; + return ; +} +void MESA_shm_recycle_ring_queue(struct MESA_shm_queue_head *ring_queue_head) +{ + if(MESA_shm_ovw == NULL){ + return ; + } + int ovw_idx = ring_queue_head->ovw_idx; + MESA_shm_ovw[ovw_idx].status = MESA_SHM_RING_QUEUE_HALF_IDLE; + return ; +} + +struct MESA_shm_queue_head *MESA_shm_alloc_new_ring_queue(struct MESA_shm_overview *ovw) +{ + struct MESA_shm_queue_head *head = NULL; + int shmid = -1; + int shmsize = sizeof(struct MESA_shm_queue_head) + (MESA_SHM_RING_QUEUE_BLOCK_NUM * MESA_SHM_RING_QUEUE_BLOCK_SIZE); + shmid = shmget(ovw->shmkey, 0, 0); + if(shmid == -1){ + shmid = shmget(ovw->shmkey, shmsize, IPC_CREAT | 0666); + if(shmid == -1){ + return NULL; + } + ovw->shmid = shmid; + head = (struct MESA_shm_queue_head *)shmat(shmid, NULL, 0); + }else{ + ovw->shmid = shmid; + head = (struct MESA_shm_queue_head *)shmat(shmid, NULL, 0); + } + return head; +} + +struct MESA_shm_queue_head *MESA_shm_connect_idle_ring_queue(struct MESA_shm_overview *ovw) +{ + struct MESA_shm_queue_head *head = NULL; + if(ovw->shmid == -1){ + return NULL; + } + head = (struct MESA_shm_queue_head *)shmat(ovw->shmid, NULL, 0); + return head; + +} +struct MESA_shm_queue_head *MESA_shm_get_ring_queue() +{ + int i = 0; + int flag = 0; + struct MESA_shm_overview *tmp_ovw = NULL; + struct MESA_shm_queue_head *head = NULL; + if(MESA_shm_ovw == NULL){ + return NULL; + } + pthread_mutex_lock(&MESA_shm_mutex); + for(i = 0; i < MESA_SHM_RING_QUEUE_NUM; i++){ + tmp_ovw = MESA_shm_ovw + i; + if(tmp_ovw->status == MESA_SHM_RING_QUEUE_IDLE){ + flag = 1; + break; + } + if(tmp_ovw->shmid == -1){ + flag = 2; + break; + } + } + if(flag == 1){ + head = MESA_shm_connect_idle_ring_queue(tmp_ovw); + }else if(flag == 2){ + head = MESA_shm_alloc_new_ring_queue(tmp_ovw); + }else{ + head = NULL; + goto out; + } + if(head != NULL){ + MESA_shm_init_ring_queue(head, tmp_ovw); + } +out: + pthread_mutex_unlock(&MESA_shm_mutex); + return head; +} +int MESA_shm_ring_queue_is_empty(struct MESA_shm_queue_head *head) +{ + if(head->rd_idx == head->wr_idx){ + return 1; + }else{ + return 0; + } +} +int MESA_shm_ring_queue_is_full(struct MESA_shm_queue_head *head) +{ + if(((head->wr_idx + 1) % head->blknum) == head->rd_idx){ + return 1; + }else{ + return 0; + } +} +int MESA_shm_copy_buf_to_ring_queue(char *buf, int buflen, struct MESA_shm_queue_head *head) +{ + int len = buflen < MESA_SHM_RING_QUEUE_BLOCK_BUFLEN?buflen:MESA_SHM_RING_QUEUE_BLOCK_BUFLEN; + int *wr_len = NULL; + char *wr_blk = NULL; + char *wr_pos = NULL; + if(head == NULL){ + return 0; + } + if(MESA_shm_ring_queue_is_full(head)){ + return 0; + } + wr_blk = (char *)(head + 1) + (head->blksize * head->wr_idx); + wr_len = (int *)wr_blk; + *wr_len = len + 1; + wr_pos = (char *)(wr_len + 1); + wr_pos[len] = '\n'; + memcpy(wr_pos, buf, len); + head->wr_idx = (head->wr_idx + 1) % head->blknum; + return len; +} +void MESA_shm_write_ring_queue_to_file(int fd, struct MESA_shm_queue_head *head) +{ + int len = 0; + int write_len = 0; + int *rd_len = NULL; + char *rd_blk = NULL; + char *rd_pos = NULL; + while(!MESA_shm_ring_queue_is_empty(head)){ + rd_blk = (char *)(head + 1) + (head->blksize * head->rd_idx); + rd_len = (int *)rd_blk; + len = *rd_len; + rd_pos = (char *)(rd_len + 1); + write_len = write(fd, rd_pos, len); + if(write_len < 0){ + printf("write file error, func=%s, line=%d\n",__FUNCTION__, __LINE__); + } + /*fsync(fd);*/ + head->rd_idx = (head->rd_idx + 1) % head->blknum; + } + return ; + +} +