From 93c07a240b9ac81d1c79306bc800c4de912366e3 Mon Sep 17 00:00:00 2001 From: guo_peixu Date: Fri, 17 Jun 2022 18:14:24 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BD=BF=E7=94=A8=E5=85=B1=E4=BA=AB=E9=94=81?= =?UTF-8?q?=E6=9D=A5=E6=A0=87=E8=AE=B0=E7=8E=AF=E5=BD=A2=E7=BC=93=E5=86=B2?= =?UTF-8?q?=E5=8C=BA=E6=98=AF=E5=90=A6=E6=AD=A3=E5=9C=A8=E8=A2=AB=E4=BD=BF?= =?UTF-8?q?=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- inc/MESA_shm_ring_queue.h | 4 +- shm_consumer/MESA_shm_consumer.c | 7 +- src/MESA_handle_logger.c | 1 - src/MESA_shm_ring_queue.c | 131 ++++++++++++------------------- 4 files changed, 56 insertions(+), 87 deletions(-) diff --git a/inc/MESA_shm_ring_queue.h b/inc/MESA_shm_ring_queue.h index 523005a..724d4f6 100644 --- a/inc/MESA_shm_ring_queue.h +++ b/inc/MESA_shm_ring_queue.h @@ -1,5 +1,7 @@ #ifndef _MESA_SHM_RING_QUEUE_H_ #define _MESA_SHM_RING_QUEUE_H_ + +#include #define MESA_SHM_RING_QUEUE_NUM 128 #define MESA_SHM_RING_QUEUE_BLOCK_NUM 8192 #define MESA_SHM_RING_QUEUE_BLOCK_BUFLEN 4096 @@ -27,7 +29,7 @@ struct MESA_shm_overview{ int shmkey; int shmid; int idx; - volatile char status; + pthread_mutex_t mutex; }; struct MESA_shm_queue_head{ diff --git a/shm_consumer/MESA_shm_consumer.c b/shm_consumer/MESA_shm_consumer.c index a64096e..c37a32d 100644 --- a/shm_consumer/MESA_shm_consumer.c +++ b/shm_consumer/MESA_shm_consumer.c @@ -129,12 +129,7 @@ int main(int argc, char **argv) break ; } } - if(tmp_ovw->status == MESA_SHM_RING_QUEUE_USED || tmp_ovw->status == MESA_SHM_RING_QUEUE_HALF_IDLE){ - MESA_shm_write_ring_queue_to_file(log_file_fd, ring_queue_head[i]); - if(tmp_ovw->status == MESA_SHM_RING_QUEUE_HALF_IDLE){ - tmp_ovw->status = MESA_SHM_RING_QUEUE_IDLE; - } - } + MESA_shm_write_ring_queue_to_file(log_file_fd, ring_queue_head[i]); } usleep(5000); } diff --git a/src/MESA_handle_logger.c b/src/MESA_handle_logger.c index 407a0fc..b8b9712 100644 --- a/src/MESA_handle_logger.c +++ b/src/MESA_handle_logger.c @@ -264,7 +264,6 @@ void MESA_alloc_pthread_key() { pthread_key_create(&MESA_pthread_key, MESA_free_pthread_private); MESA_shm_init_overview(); - MESA_shm_init_mutex(); return ; } diff --git a/src/MESA_shm_ring_queue.c b/src/MESA_shm_ring_queue.c index 4011c88..c5110c8 100644 --- a/src/MESA_shm_ring_queue.c +++ b/src/MESA_shm_ring_queue.c @@ -6,21 +6,23 @@ #include #include #include +#include #include "MESA_handle_logger.h" #include "MESA_shm_ring_queue.h" -#define MESA_SEM_KEY 35719 -/*pthread_mutex_t MESA_shm_mutex;*/ -union semun { - int val; - struct semid_ds *buf; - unsigned short *array; - struct seminfo *__buf; -}; -int MESA_semid = -1; struct MESA_shm_overview *MESA_shm_ovw = NULL; +void MESA_shm_init_ring_queue_mutex(struct MESA_shm_overview *ovw) +{ + pthread_mutexattr_t mutexattr; + pthread_mutexattr_init(&mutexattr); + pthread_mutexattr_setpshared(&mutexattr, PTHREAD_PROCESS_SHARED); + pthread_mutexattr_setrobust(&mutexattr, PTHREAD_MUTEX_ROBUST); + pthread_mutex_init(&ovw->mutex, &mutexattr); + return ; +} + struct MESA_shm_overview *MESA_shm_alloc_overview() { int shmsize = sizeof(struct MESA_shm_overview) * MESA_SHM_RING_QUEUE_NUM; @@ -29,7 +31,7 @@ struct MESA_shm_overview *MESA_shm_alloc_overview() int i = 0; int shmid = shmget(MESA_SHM_KEY_OVERVIEW, 0, 0); if(shmid == -1){ - shmid = shmget(MESA_SHM_KEY_OVERVIEW, shmsize, IPC_CREAT | 0666); + shmid = shmget(MESA_SHM_KEY_OVERVIEW, shmsize, (SHM_R|SHM_W|IPC_CREAT)); if(shmid == -1){ return NULL; }else{ @@ -43,6 +45,7 @@ struct MESA_shm_overview *MESA_shm_alloc_overview() tmp_ovw->shmkey = i + MESA_SHM_KEY_MIN; tmp_ovw->shmid = -1; tmp_ovw->idx = i; + MESA_shm_init_ring_queue_mutex(tmp_ovw); } } }else{ @@ -50,48 +53,6 @@ struct MESA_shm_overview *MESA_shm_alloc_overview() } return shm_overview; } -void MESA_shm_sem_p() -{ - if(MESA_semid == -1){ - return ; - } - struct sembuf sem_b; - sem_b.sem_num = 0; - sem_b.sem_op = -1; - sem_b.sem_flg = SEM_UNDO; - semop(MESA_semid, &sem_b, 1); - return ; -} -void MESA_shm_sem_v() -{ - if(MESA_semid == -1){ - return ; - } - struct sembuf sem_b; - sem_b.sem_num = 0; - sem_b.sem_op = 1; - sem_b.sem_flg = SEM_UNDO; - semop(MESA_semid, &sem_b, 1); - return ; -} -void MESA_shm_init_mutex() -{ - /*pthread_mutex_init(&MESA_shm_mutex, NULL);*/ - union semun semopt; - int semid = semget(MESA_SEM_KEY, 0 , 0); - if(semid == -1){ - semid = semget(MESA_SEM_KEY, 1 ,IPC_CREAT | 0666); - if(semid == -1){ - return ; - } - MESA_semid = semid; - semopt.val = 1; - semctl(semid, 0, SETVAL, semopt); - }else{ - MESA_semid = semid; - } - return ; -} void MESA_shm_init_overview() { if(MESA_shm_ovw == NULL){ @@ -106,28 +67,44 @@ void MESA_shm_init_ring_queue(struct MESA_shm_queue_head *head, struct MESA_shm_ head->blknum = MESA_SHM_RING_QUEUE_BLOCK_NUM; head->rd_idx = 0; head->wr_idx = 0; - ovw->status = MESA_SHM_RING_QUEUE_USED; head->ovw_idx = ovw->idx; return ; } + void MESA_shm_recycle_ring_queue(struct MESA_shm_queue_head *ring_queue_head) { if(MESA_shm_ovw == NULL){ return ; } - int ovw_idx = ring_queue_head->ovw_idx; - MESA_shm_ovw[ovw_idx].status = MESA_SHM_RING_QUEUE_HALF_IDLE; + struct MESA_shm_overview *ovw = MESA_shm_ovw + ring_queue_head->ovw_idx; + pthread_mutex_unlock(&ovw->mutex); return ; } - +int MESA_shm_mutex_trylock_success(struct MESA_shm_overview *ovw) +{ + int ret = 0; + ret = pthread_mutex_trylock(&ovw->mutex); + if(ret == 0){ + return 1; + }else if(ret == EOWNERDEAD){ + ret = pthread_mutex_consistent(&ovw->mutex); + if(ret == 0){ + return 1; + } + } + return 0; +} struct MESA_shm_queue_head *MESA_shm_alloc_new_ring_queue(struct MESA_shm_overview *ovw) { struct MESA_shm_queue_head *head = NULL; int shmid = -1; int shmsize = sizeof(struct MESA_shm_queue_head) + (MESA_SHM_RING_QUEUE_BLOCK_NUM * MESA_SHM_RING_QUEUE_BLOCK_SIZE); + if(!MESA_shm_mutex_trylock_success(ovw)){ + return NULL; + } shmid = shmget(ovw->shmkey, 0, 0); if(shmid == -1){ - shmid = shmget(ovw->shmkey, shmsize, IPC_CREAT | 0666); + shmid = shmget(ovw->shmkey, shmsize, (SHM_R|SHM_W|IPC_CREAT)); if(shmid == -1){ return NULL; } @@ -137,55 +114,51 @@ struct MESA_shm_queue_head *MESA_shm_alloc_new_ring_queue(struct MESA_shm_overvi ovw->shmid = shmid; head = (struct MESA_shm_queue_head *)shmat(shmid, NULL, 0); } + if(head != NULL){ + MESA_shm_init_ring_queue(head, ovw); + }else{ + pthread_mutex_unlock(&ovw->mutex); + } return head; } -struct MESA_shm_queue_head *MESA_shm_connect_idle_ring_queue(struct MESA_shm_overview *ovw) +struct MESA_shm_queue_head *MESA_shm_try_ring_queue(struct MESA_shm_overview *ovw) { struct MESA_shm_queue_head *head = NULL; if(ovw->shmid == -1){ return NULL; } + if(!MESA_shm_mutex_trylock_success(ovw)){ + return NULL; + } head = (struct MESA_shm_queue_head *)shmat(ovw->shmid, NULL, 0); - return head; - + if(head == NULL){ + pthread_mutex_unlock(&ovw->mutex); + } + return head; } struct MESA_shm_queue_head *MESA_shm_get_ring_queue() { int i = 0; - int flag = 0; struct MESA_shm_overview *tmp_ovw = NULL; struct MESA_shm_queue_head *head = NULL; if(MESA_shm_ovw == NULL){ return NULL; } - /*pthread_mutex_lock(&MESA_shm_mutex);*/ - MESA_shm_sem_p(); for(i = 0; i < MESA_SHM_RING_QUEUE_NUM; i++){ tmp_ovw = MESA_shm_ovw + i; - if(tmp_ovw->status == MESA_SHM_RING_QUEUE_IDLE){ - flag = 1; - break; - } if(tmp_ovw->shmid == -1){ - flag = 2; + head = MESA_shm_alloc_new_ring_queue(tmp_ovw); break; } - } - if(flag == 1){ - head = MESA_shm_connect_idle_ring_queue(tmp_ovw); - }else if(flag == 2){ - head = MESA_shm_alloc_new_ring_queue(tmp_ovw); - }else{ - head = NULL; - goto out; + head = MESA_shm_try_ring_queue(tmp_ovw); + if(head != NULL){ + break; + } } if(head != NULL){ MESA_shm_init_ring_queue(head, tmp_ovw); } -out: - //pthread_mutex_unlock(&MESA_shm_mutex);*/ - MESA_shm_sem_v(); return head; } int MESA_shm_ring_queue_is_empty(struct MESA_shm_queue_head *head)