1.添加Redis分布式锁接口,代码宏控制启动(目前非启用)

(存在问题1.由于redis异步,锁存在内容较高,加锁后影响性能较严重
          2.不是每次锁都能成功)
2.添加显示接口中,openssl生成证书时间信息输出(生成证书总时间/生成证书次数)
This commit is contained in:
fengweihao
2018-07-09 14:32:41 +08:00
parent d02f57e5ee
commit 6a98d2a041
11 changed files with 381 additions and 50 deletions

View File

@@ -35,7 +35,11 @@ OBJS := \
dir := ./components/syslogd
include $(dir)/syslog.mk
OBJS += $(OBJS_$(dir))
dir := ./components/redis
include $(dir)/redis.mk
OBJS += $(OBJS_$(dir))
dir := ./rt
include $(dir)/rt.mk
OBJS += $(OBJS_$(dir))

View File

@@ -15,6 +15,8 @@
#include <x509.h>
#include <evp.h>
#include "rd_lock.h"
struct request_t{
#define DATALEN 64
char host[DATALEN];
@@ -25,6 +27,8 @@ struct request_t{
int valid;
struct rd_lock_scb mtx;
struct evhttp_request *evh_req;
};

View File

@@ -25,6 +25,7 @@
#include "rt_common.h"
#include "rt_stdlib.h"
#include "rt_file.h"
#include "rt_time.h"
#include "cert_init.h"
#include "async.h"
#include "read.h"
@@ -47,7 +48,7 @@
static libevent_thread *threads;
struct fs_stats_t{
int line_ids[3];
int line_ids[4];
screen_stat_handle_t handle;
};
@@ -58,6 +59,11 @@ static struct fs_stats_t SGstats = {
#define sizeof_seconds(x) (x * 24 * 60 * 60)
rt_mutex entries_mtx = PTHREAD_MUTEX_INITIALIZER;
uint64_t startTime = 0;
uint64_t endTime = 0;
void connectCallback(const struct redisAsyncContext *c, int status) {
if (status != REDIS_OK) {
mesa_runtime_log(RLOG_LV_FATAL, MODULE_NAME, "Redis connect error : %s\n", c->errstr);
@@ -562,7 +568,19 @@ X509 *x509_create_cert(char *host, int days)
return x509;
}
int cert_redis_init(struct event_base *base, struct redisAsyncContext **cl_ctx)
int redis_sync_int(struct redisContext **c)
{
struct config_bucket_t *redis = cert_default_config();
struct timeval timeout = { 1, 500000 }; // 1.5 seconds
*c = redisConnectWithTimeout(redis->r_ip, redis->r_port, timeout);
return 0;
}
int redis_rsync_init(struct event_base *base, struct redisAsyncContext **cl_ctx)
{
int xret = -1;
struct config_bucket_t *redis = cert_default_config();
@@ -587,10 +605,18 @@ rd_set_callback(redisAsyncContext __attribute__((__unused__))*c, void *r,
{
redisReply *reply = (redisReply*)r;
char *host = (char *)privdata;
struct request_t *request = (struct request_t *)privdata;
#ifdef RD_MUTEX_LOCK
libevent_thread *thread = threads + request->thread_id;
rd_mutex_unlock(&request->mtx, thread->sync);
#endif
if(reply->type == REDIS_REPLY_ERROR){
mesa_runtime_log(RLOG_LV_FATAL, MODULE_NAME, "Writing data(key = %s) to redis failed", host);
mesa_runtime_log(RLOG_LV_FATAL, MODULE_NAME, "Writing data(key = %s) to redis failed", request->host);
}
kfree(request);
return;
}
@@ -657,16 +683,42 @@ finish:
return xret;
}
static int fs_internal_operate(int id, int id2, int column_id, int column_id2, long long diffTime)
{
int ret = -1, value = -1;
screen_stat_handle_t handle = SGstats.handle;
FS_internal_operate(handle, id, column_id, FS_OP_ADD, 1);
if (id2 < 0)
goto finish;
FS_internal_operate(handle, id2, 0, FS_OP_ADD, 1);
if (column_id2 < 0)
goto finish;
value = FS_internal_operate(handle, id, column_id, FS_OP_GET, 0);
if (value < 0)
goto finish;
ret = FS_internal_operate(handle, id, column_id2, FS_OP_SET, diffTime/value);
finish:
return ret;
}
static int
rd_decode_sendbuf(struct request_t *request, redisAsyncContext *c, char *sendbuf)
rd_encode_sendbuf(struct request_t *request, redisAsyncContext *c, char *sendbuf)
{
int xret = -1;
uint64_t startTime = 0, endTime = 0;
libevent_thread *thread = threads + request->thread_id;
struct config_bucket_t *rte = cert_default_config();
char cert[SG_DATA_SIZE] = {0}, pubkey[SG_DATA_SIZE] = {0};
startTime = rt_time_ms();
x509_online_append(request->host, thread->key, thread->root, cert, pubkey);
if (cert[0] == '\0' && pubkey[0] == '\0'){
mesa_runtime_log(RLOG_LV_FATAL, MODULE_NAME, "Failed to issue certificate");
@@ -674,36 +726,44 @@ rd_decode_sendbuf(struct request_t *request, redisAsyncContext *c, char *sendbuf
goto finish;
}
FS_internal_operate(SGstats.handle,thread->column_ids,thread->field_ids,SGstats.line_ids[2], FS_OP_ADD, 1);
endTime = rt_time_ms();
thread->diffTime += (endTime - startTime);
//printf("%lu - %lu = %lu(%lu)\n", startTime, endTime, endTime - startTime, thread->diffTime);
fs_internal_operate(thread->column_ids, thread->field_ids, SGstats.line_ids[2], SGstats.line_ids[3], thread->diffTime);
snprintf(sendbuf, SG_DATA_SIZE * 2, "%s%s", pubkey, cert);
xret = redisAsyncCommand(c, rd_set_callback, request->host, "SETEX %s %d %s",
xret = redisAsyncCommand(c, rd_set_callback, request, "SETEX %s %d %s",
request->host, sizeof_seconds(rte->days), sendbuf);
if (xret < 0){
mesa_runtime_log(RLOG_LV_FATAL, MODULE_NAME, "Failed to set information to redis server");
goto finish;
}
xret = 0;
finish:
return xret;
}
static int
rd_encode_sendbuf(struct request_t *request, redisReply *reply, char *sendbuf)
rd_decode_sendbuf(struct request_t *request, redisReply *reply, char *sendbuf)
{
int xret = -1;
libevent_thread *thread = threads + request->thread_id;
#ifdef RD_MUTEX_LOCK
rd_mutex_unlock(&request->mtx, thread->sync);
#endif
if (reply && reply->str){
FS_internal_operate(SGstats.handle,thread->column_ids,thread->field_ids,SGstats.line_ids[1],FS_OP_ADD,1);
fs_internal_operate(thread->column_ids,thread->field_ids, SGstats.line_ids[1], -1, 0);
snprintf(sendbuf, SG_DATA_SIZE * 2, "%s", reply->str);
xret = 0;
}
else{
evhttp_send_error(request->evh_req, HTTP_BADREQUEST, 0);
}
kfree(request);
return xret;
}
@@ -720,14 +780,14 @@ void rd_get_callback(redisAsyncContext *c, void *r, void *privdata)
case REDIS_REPLY_STRING:
mesa_runtime_log(RLOG_LV_INFO, MODULE_NAME, "Sends the certificate information to the requestor");
xret = rd_encode_sendbuf(request, reply, sendbuf);
xret = rd_decode_sendbuf(request, reply, sendbuf);
break;
case REDIS_REPLY_NIL:
/* Certificate information modification and Strategy to judge**/
mesa_runtime_log(RLOG_LV_INFO, MODULE_NAME, "Generating certificate information");
xret = rd_decode_sendbuf(request, c, sendbuf);
xret = rd_encode_sendbuf(request, c, sendbuf);
break;
default:
break;
@@ -737,7 +797,7 @@ void rd_get_callback(redisAsyncContext *c, void *r, void *privdata)
evhttp_socket_send(request->evh_req, sendbuf);
finish:
kfree(request);
//kfree(request);
return;
}
@@ -893,7 +953,7 @@ pthread_work_proc(struct evhttp_request *evh_req, void *arg)
case EVHTTP_REQ_PATCH: cmdtype = "PATCH"; break;
default: cmdtype = "unknown"; break;
}
FS_internal_operate(SGstats.handle,thread_info->column_ids,-1,SGstats.line_ids[0], FS_OP_ADD, 1);
fs_internal_operate(thread_info->column_ids, -1, SGstats.line_ids[0], -1, 0);
rt_decode_uri(uri, request->host, &request->flag, &request->valid);
mesa_runtime_log(RLOG_LV_INFO, MODULE_NAME, "[Thread %d]Received a %s request for %s, host:%s, flag:%d, valid:%d\nHeaders:",
@@ -901,7 +961,9 @@ pthread_work_proc(struct evhttp_request *evh_req, void *arg)
request->flag, request->valid);
if (request->host[0] != '\0' && request->evh_req != NULL){
#ifdef RD_MUTEX_LOCK
rd_mutex_lock("key", 30, &request->mtx, thread_info->sync);
#endif
xret = redisAsyncCommand(thread_info->cl_ctx, rd_get_callback, request, "GET %s", request->host);
if (xret < 0)
mesa_runtime_log(RLOG_LV_FATAL, MODULE_NAME, "Failed to get information from redis server");
@@ -927,11 +989,14 @@ cert_trapper_task_int(struct event_base *base, libevent_thread *me)
int xret = -1;
/* Initialize the redis connection*/
xret = cert_redis_init(base, &me->cl_ctx);
xret = redis_rsync_init(base, &me->cl_ctx);
if (xret < 0 || !me->cl_ctx){
mesa_runtime_log(RLOG_LV_FATAL, MODULE_NAME, "Initialize the redis connection is failure\n");
goto finish;
}
xret = redis_sync_int(&me->sync);
/* Initialize the X509 CA*/
xret = x509_privatekey_init(&me->key, &me->root);
if (xret < 0 || !me->key || !me->root){
@@ -1064,6 +1129,9 @@ libevent_socket_init()
goto finish;
}
/*mutex init **/
rd_lock_init();
threads = calloc(thread_nu, sizeof(libevent_thread));
if (! threads) {
mesa_runtime_log(RLOG_LV_INFO, MODULE_NAME, "Can't allocate thread descriptors");
@@ -1148,6 +1216,9 @@ fs_screen_init()
snprintf(buff,sizeof(buff),"%s", "sign");
SGstats.line_ids[2] = FS_internal_register(SGstats.handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, buff);
snprintf(buff,sizeof(buff),"%s", "ssl(ms)");
SGstats.line_ids[3] = FS_internal_register(SGstats.handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, buff);
FS_internal_start(SGstats.handle);
return 0;

View File

@@ -12,22 +12,6 @@
#include "MESA_list_queue.h"
#include "rt_sync.h"
enum conn_states {
conn_listening, /**< the socket which listens for connections */
conn_new_cmd, /**< Prepare connection for next command */
conn_waiting, /**< waiting for a readable socket */
conn_read, /**< reading in a command line */
conn_parse_cmd, /**< try to parse a command from the input buffer */
conn_write, /**< writing out a simple response */
conn_nread, /**< reading in a fixed number of bytes */
conn_swallow, /**< swallowing unnecessary bytes w/o storing */
conn_closing, /**< closing this connection */
conn_mwrite, /**< writing out many items sequentially */
conn_closed, /**< connection is closed */
conn_watch, /**< held by the logger thread as a watcher */
conn_max_state /**< Max state value (used for assertion) */
};
typedef struct {
int id;
@@ -43,11 +27,16 @@ typedef struct {
struct redisAsyncContext *cl_ctx;
struct redisContext *sync;
void * (*routine)(void *); /** Executive entry */
int field_ids; /* dispaly */
int column_ids;
uint64_t diffTime;
} libevent_thread;
extern int cert_session_init();

View File

@@ -0,0 +1,206 @@
/*************************************************************************
> File Name: rd_lock.c
> Author:
> Mail:
> Created Time: 2018<31><38>07<30><37>05<30><35> <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD> 11ʱ01<30><31>39<33><39>
************************************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <time.h>
#include <math.h>
#include "rd_lock.h"
#include "rt_string.h"
struct rd_RedLock{
float m_clockDriftFactor;
sds m_unlockScript;
int m_retryCount;
int m_retryDelay;
char *m_continueLockScript;
};
static struct rd_RedLock redlock = {
.m_clockDriftFactor = 0.01,
.m_unlockScript = NULL,
.m_retryCount = 0,
.m_retryDelay = 0,
.m_continueLockScript = NULL,
};
struct rd_RedLock *mutx_redlock()
{
return &redlock;
}
static char *
get_unique_lockid()
{
int i = 0;
char *s = NULL;
char value[10] = "0123456789";
unsigned char buffer[20];
struct timeval t1;
gettimeofday(&t1, NULL);
srand(t1.tv_usec * t1.tv_sec);
for (int i = 0; i < 20; ++i) {
buffer[i] = value[rand() % 10];
}
//<2F><>ȡ20byte<74><65><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
s = sdsempty();
for (i = 0; i < 20; i++) {
s = sdscatprintf(s, "%02X", buffer[i]);
}
return s;
}
static int
rd_lock_instance(redisContext *c, const char *resource,
const char *val, const int ttl)
{
int xret = 0;
redisReply *reply;
reply = (redisReply *)redisCommand(c, "set %s %s px %d nx", resource, val, ttl);
if (NULL == reply)
goto finish;
//printf("Set return: %s [null == fail, OK == success]\n", reply->str);
if (reply->str && STRCMP(reply->str, "OK") == 0) {
xret = 1;
}
freeReplyObject(reply);
finish:
return xret;
}
static char **convertToSds(int count, char** args)
{
int j;
char **sds = (char**)malloc(sizeof(char*)*count);
for(j = 0; j < count; j++)
sds[j] = sdsnew(args[j]);
return sds;
}
redisReply *rd_command_argv(redisContext *c, int argc, char **inargv)
{
redisReply *reply = NULL;
char **argv;
argv = convertToSds(argc, inargv);
size_t *argvlen;
argvlen = (size_t *)malloc(argc * sizeof(size_t));
for (int j = 0; j < argc; j++)
argvlen[j] = sdslen(argv[j]);
reply = (redisReply *)redisCommandArgv(c, argc, (const char **)argv, argvlen);
if (reply) {
//printf("RedisCommandArgv return: %lld\n", reply->integer);
}
free(argvlen);
sdsfreesplitres(argv, argc);
return reply;
}
int rd_mutex_unlock(struct rd_lock_scb *mtx, struct redisContext *c)
{
int argc = 5;
struct rd_RedLock *redlock = mutx_redlock();
char *unlockScriptArgv[] = {(char*)"EVAL",
redlock->m_unlockScript,
(char*)"1",
(char*)mtx->m_resource,
(char*)mtx->m_val};
redisReply *reply = rd_command_argv(c, argc, unlockScriptArgv);
if (reply) {
freeReplyObject(reply);
}
if (reply->integer){
sdsfree(mtx->m_resource);
sdsfree(mtx->m_val);
}
return 0;
}
/* redis lock*/
int rd_mutex_lock(const char *resource, const int ttl,
struct rd_lock_scb *mtx, struct redisContext *c)
{
struct rd_RedLock *redlock = mutx_redlock();
char *val = NULL;
int retryCount =0, xret = 0;
val = get_unique_lockid();
if (!val) {
return xret;
}
mtx->m_resource = sdsnew(resource);
mtx->m_val = val;
retryCount = redlock->m_retryCount;
do {
int n = 0;
int startTime = (int)time(NULL) * 1000;
if (c == NULL || c->err) {
goto finish;
}
if (rd_lock_instance(c, resource, val, ttl)) {
n++;
}
int drift = (ttl * redlock->m_clockDriftFactor) + 2;
int validityTime = ttl - ((int)time(NULL) * 1000 - startTime) - drift;
//printf("The resource validty time is %d, n is %d\n",
// validityTime, n);
if (n > 0 && validityTime > 0) {
mtx->m_validityTime = validityTime;
xret = 1;
goto finish;
} else {
printf("The resource validty time is %d, n is %d\n",
validityTime, n);
}
// Wait a random delay before to retry
int delay = rand() % redlock->m_retryDelay + floor(redlock->m_retryDelay / 2);
printf("[Test] delay = %d\n", delay);
usleep(delay * 1000);
retryCount--;
} while (retryCount > 0);
finish:
return xret;
}
void rd_lock_init()
{
struct rd_RedLock *rdlock = mutx_redlock();
rdlock->m_continueLockScript = sdsnew("if redis.call('get', KEYS[1]) == ARGV[1] then redis.call('del', KEYS[1]) end return redis.call('set', KEYS[1], ARGV[2], 'px', ARGV[3], 'nx')");
rdlock->m_unlockScript = sdsnew("if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end");
rdlock->m_retryCount = 8;
rdlock->m_retryDelay = 10;
rdlock->m_clockDriftFactor = 0.01;
return;
}

View File

@@ -0,0 +1,26 @@
/*************************************************************************
> File Name: rd_lock.h
> Author:
> Mail:
> Created Time: 2018年07月05日 星期四 11时02分03秒
************************************************************************/
#ifndef _RD_LOCK_H
#define _RD_LOCK_H
#include "hiredis.h"
struct rd_lock_scb{
int m_validityTime;
sds m_resource;
sds m_val;
};
void rd_lock_init();
int rd_mutex_lock(const char *resource, const int ttl,
struct rd_lock_scb *mtx, struct redisContext *c);
int rd_mutex_unlock(struct rd_lock_scb *mtx, struct redisContext *c);
#endif

View File

@@ -0,0 +1,40 @@
# standard component Makefile header
sp := $(sp).x
dirstack_$(sp) := $(d)
d := $(dir)
# component specification
OBJS_$(d) :=\
$(OBJ_DIR)/rd_lock.o\
CFLAGS_LOCAL += -I$(d)
$(OBJS_$(d)): CFLAGS_LOCAL := -std=gnu99 -W -Wall -Wunused-parameter -g -O3 \
-I$(d)\
-I$(d)/../../rt\
-I$(d)/../../inc\
# standard component Makefile rules
DEPS_$(d) := $(OBJS_$(d):.o=.d)
#LIBS_LIST := $(LIBS_LIST) $(LIBRARY)
LIBS_LIST := $(LIBS_LIST)
CLEAN_LIST := $(CLEAN_LIST) $(OBJS_$(d)) $(DEPS_$(d))
-include $(DEPS_$(d))
#$(LIBRARY): $(OBJS)
# $(MYARCHIVE)
$(OBJ_DIR)/%.o: $(d)/%.c
$(COMPILE)
# standard component Makefile footer
d := $(dirstack_$(sp))
sp := $(basename $(sp))

View File

@@ -21,7 +21,8 @@ enum field_calc_algo
enum field_op
{
FS_OP_ADD=1,
FS_OP_SET
FS_OP_SET,
FS_OP_GET,
};

View File

@@ -15,7 +15,7 @@ extern "C" int FS_internal_set_para(screen_stat_handle_t handle, enum FS_option
extern "C" void FS_internal_start(screen_stat_handle_t handle);
extern "C" int FS_internal_register(screen_stat_handle_t handle,enum field_dsp_style_t style,
enum field_calc_algo calc_type,const char* name);
extern "C" int FS_internal_operate(screen_stat_handle_t handle,int id,int id2,int column_id,enum field_op op,long long value);
extern "C" int FS_internal_operate(screen_stat_handle_t handle,int id,int column_id,enum field_op op,long long value);
screen_stat_handle_t FS_internal_create_handle(void)
{
@@ -38,19 +38,8 @@ int FS_internal_register(screen_stat_handle_t handle,enum field_dsp_style_t styl
return FS_register(handle, style, calc_type, name);
}
int FS_internal_operate(screen_stat_handle_t handle,int id,int id2,int column_id,enum field_op op,long long value)
int FS_internal_operate(screen_stat_handle_t handle,int id,int column_id,enum field_op op,long long value)
{
int ret = -1;
ret = FS_operate(handle, id, column_id, op, value);
if (ret < 0)
goto finish;
if (id2 < 0)
goto finish;
ret = FS_operate(handle, id2, 0, op, value);
finish:
return ret;
return FS_operate(handle, id, column_id, op, value);
}

View File

@@ -23,7 +23,8 @@ enum field_calc_algo
enum field_op
{
FS_OP_ADD=1,
FS_OP_SET
FS_OP_SET,
FS_OP_GET,
};
typedef void* screen_stat_handle_t;
@@ -53,6 +54,6 @@ void FS_internal_start(screen_stat_handle_t handle);
int FS_internal_register(screen_stat_handle_t handle,enum field_dsp_style_t style,
enum field_calc_algo calc_type,const char* name);
int FS_internal_operate(screen_stat_handle_t handle,int id,int id2,int column_id,enum field_op op,long long value);
int FS_internal_operate(screen_stat_handle_t handle,int id,int column_id,enum field_op op,long long value);
#endif

Binary file not shown.