使用共享锁来标记环形缓冲区是否正在被使用

This commit is contained in:
guo_peixu
2022-06-17 18:14:24 +08:00
parent 0ce7424fa1
commit 93c07a240b
4 changed files with 56 additions and 87 deletions

View File

@@ -1,5 +1,7 @@
#ifndef _MESA_SHM_RING_QUEUE_H_
#define _MESA_SHM_RING_QUEUE_H_
#include <pthread.h>
#define MESA_SHM_RING_QUEUE_NUM 128
#define MESA_SHM_RING_QUEUE_BLOCK_NUM 8192
#define MESA_SHM_RING_QUEUE_BLOCK_BUFLEN 4096
@@ -27,7 +29,7 @@ struct MESA_shm_overview{
int shmkey;
int shmid;
int idx;
volatile char status;
pthread_mutex_t mutex;
};
struct MESA_shm_queue_head{

View File

@@ -129,12 +129,7 @@ int main(int argc, char **argv)
break ;
}
}
if(tmp_ovw->status == MESA_SHM_RING_QUEUE_USED || tmp_ovw->status == MESA_SHM_RING_QUEUE_HALF_IDLE){
MESA_shm_write_ring_queue_to_file(log_file_fd, ring_queue_head[i]);
if(tmp_ovw->status == MESA_SHM_RING_QUEUE_HALF_IDLE){
tmp_ovw->status = MESA_SHM_RING_QUEUE_IDLE;
}
}
}
usleep(5000);
}

View File

@@ -264,7 +264,6 @@ void MESA_alloc_pthread_key()
{
pthread_key_create(&MESA_pthread_key, MESA_free_pthread_private);
MESA_shm_init_overview();
MESA_shm_init_mutex();
return ;
}

View File

@@ -6,21 +6,23 @@
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include "MESA_handle_logger.h"
#include "MESA_shm_ring_queue.h"
#define MESA_SEM_KEY 35719
/*pthread_mutex_t MESA_shm_mutex;*/
union semun {
int val;
struct semid_ds *buf;
unsigned short *array;
struct seminfo *__buf;
};
int MESA_semid = -1;
struct MESA_shm_overview *MESA_shm_ovw = NULL;
void MESA_shm_init_ring_queue_mutex(struct MESA_shm_overview *ovw)
{
pthread_mutexattr_t mutexattr;
pthread_mutexattr_init(&mutexattr);
pthread_mutexattr_setpshared(&mutexattr, PTHREAD_PROCESS_SHARED);
pthread_mutexattr_setrobust(&mutexattr, PTHREAD_MUTEX_ROBUST);
pthread_mutex_init(&ovw->mutex, &mutexattr);
return ;
}
struct MESA_shm_overview *MESA_shm_alloc_overview()
{
int shmsize = sizeof(struct MESA_shm_overview) * MESA_SHM_RING_QUEUE_NUM;
@@ -29,7 +31,7 @@ struct MESA_shm_overview *MESA_shm_alloc_overview()
int i = 0;
int shmid = shmget(MESA_SHM_KEY_OVERVIEW, 0, 0);
if(shmid == -1){
shmid = shmget(MESA_SHM_KEY_OVERVIEW, shmsize, IPC_CREAT | 0666);
shmid = shmget(MESA_SHM_KEY_OVERVIEW, shmsize, (SHM_R|SHM_W|IPC_CREAT));
if(shmid == -1){
return NULL;
}else{
@@ -43,6 +45,7 @@ struct MESA_shm_overview *MESA_shm_alloc_overview()
tmp_ovw->shmkey = i + MESA_SHM_KEY_MIN;
tmp_ovw->shmid = -1;
tmp_ovw->idx = i;
MESA_shm_init_ring_queue_mutex(tmp_ovw);
}
}
}else{
@@ -50,48 +53,6 @@ struct MESA_shm_overview *MESA_shm_alloc_overview()
}
return shm_overview;
}
void MESA_shm_sem_p()
{
if(MESA_semid == -1){
return ;
}
struct sembuf sem_b;
sem_b.sem_num = 0;
sem_b.sem_op = -1;
sem_b.sem_flg = SEM_UNDO;
semop(MESA_semid, &sem_b, 1);
return ;
}
void MESA_shm_sem_v()
{
if(MESA_semid == -1){
return ;
}
struct sembuf sem_b;
sem_b.sem_num = 0;
sem_b.sem_op = 1;
sem_b.sem_flg = SEM_UNDO;
semop(MESA_semid, &sem_b, 1);
return ;
}
void MESA_shm_init_mutex()
{
/*pthread_mutex_init(&MESA_shm_mutex, NULL);*/
union semun semopt;
int semid = semget(MESA_SEM_KEY, 0 , 0);
if(semid == -1){
semid = semget(MESA_SEM_KEY, 1 ,IPC_CREAT | 0666);
if(semid == -1){
return ;
}
MESA_semid = semid;
semopt.val = 1;
semctl(semid, 0, SETVAL, semopt);
}else{
MESA_semid = semid;
}
return ;
}
void MESA_shm_init_overview()
{
if(MESA_shm_ovw == NULL){
@@ -106,28 +67,44 @@ void MESA_shm_init_ring_queue(struct MESA_shm_queue_head *head, struct MESA_shm_
head->blknum = MESA_SHM_RING_QUEUE_BLOCK_NUM;
head->rd_idx = 0;
head->wr_idx = 0;
ovw->status = MESA_SHM_RING_QUEUE_USED;
head->ovw_idx = ovw->idx;
return ;
}
void MESA_shm_recycle_ring_queue(struct MESA_shm_queue_head *ring_queue_head)
{
if(MESA_shm_ovw == NULL){
return ;
}
int ovw_idx = ring_queue_head->ovw_idx;
MESA_shm_ovw[ovw_idx].status = MESA_SHM_RING_QUEUE_HALF_IDLE;
struct MESA_shm_overview *ovw = MESA_shm_ovw + ring_queue_head->ovw_idx;
pthread_mutex_unlock(&ovw->mutex);
return ;
}
int MESA_shm_mutex_trylock_success(struct MESA_shm_overview *ovw)
{
int ret = 0;
ret = pthread_mutex_trylock(&ovw->mutex);
if(ret == 0){
return 1;
}else if(ret == EOWNERDEAD){
ret = pthread_mutex_consistent(&ovw->mutex);
if(ret == 0){
return 1;
}
}
return 0;
}
struct MESA_shm_queue_head *MESA_shm_alloc_new_ring_queue(struct MESA_shm_overview *ovw)
{
struct MESA_shm_queue_head *head = NULL;
int shmid = -1;
int shmsize = sizeof(struct MESA_shm_queue_head) + (MESA_SHM_RING_QUEUE_BLOCK_NUM * MESA_SHM_RING_QUEUE_BLOCK_SIZE);
if(!MESA_shm_mutex_trylock_success(ovw)){
return NULL;
}
shmid = shmget(ovw->shmkey, 0, 0);
if(shmid == -1){
shmid = shmget(ovw->shmkey, shmsize, IPC_CREAT | 0666);
shmid = shmget(ovw->shmkey, shmsize, (SHM_R|SHM_W|IPC_CREAT));
if(shmid == -1){
return NULL;
}
@@ -137,55 +114,51 @@ struct MESA_shm_queue_head *MESA_shm_alloc_new_ring_queue(struct MESA_shm_overvi
ovw->shmid = shmid;
head = (struct MESA_shm_queue_head *)shmat(shmid, NULL, 0);
}
if(head != NULL){
MESA_shm_init_ring_queue(head, ovw);
}else{
pthread_mutex_unlock(&ovw->mutex);
}
return head;
}
struct MESA_shm_queue_head *MESA_shm_connect_idle_ring_queue(struct MESA_shm_overview *ovw)
struct MESA_shm_queue_head *MESA_shm_try_ring_queue(struct MESA_shm_overview *ovw)
{
struct MESA_shm_queue_head *head = NULL;
if(ovw->shmid == -1){
return NULL;
}
if(!MESA_shm_mutex_trylock_success(ovw)){
return NULL;
}
head = (struct MESA_shm_queue_head *)shmat(ovw->shmid, NULL, 0);
if(head == NULL){
pthread_mutex_unlock(&ovw->mutex);
}
return head;
}
struct MESA_shm_queue_head *MESA_shm_get_ring_queue()
{
int i = 0;
int flag = 0;
struct MESA_shm_overview *tmp_ovw = NULL;
struct MESA_shm_queue_head *head = NULL;
if(MESA_shm_ovw == NULL){
return NULL;
}
/*pthread_mutex_lock(&MESA_shm_mutex);*/
MESA_shm_sem_p();
for(i = 0; i < MESA_SHM_RING_QUEUE_NUM; i++){
tmp_ovw = MESA_shm_ovw + i;
if(tmp_ovw->status == MESA_SHM_RING_QUEUE_IDLE){
flag = 1;
break;
}
if(tmp_ovw->shmid == -1){
flag = 2;
head = MESA_shm_alloc_new_ring_queue(tmp_ovw);
break;
}
head = MESA_shm_try_ring_queue(tmp_ovw);
if(head != NULL){
break;
}
if(flag == 1){
head = MESA_shm_connect_idle_ring_queue(tmp_ovw);
}else if(flag == 2){
head = MESA_shm_alloc_new_ring_queue(tmp_ovw);
}else{
head = NULL;
goto out;
}
if(head != NULL){
MESA_shm_init_ring_queue(head, tmp_ovw);
}
out:
//pthread_mutex_unlock(&MESA_shm_mutex);*/
MESA_shm_sem_v();
return head;
}
int MESA_shm_ring_queue_is_empty(struct MESA_shm_queue_head *head)