diff --git a/src/Makefile b/src/Makefile index e902554..5b67971 100644 --- a/src/Makefile +++ b/src/Makefile @@ -35,7 +35,11 @@ OBJS := \ dir := ./components/syslogd include $(dir)/syslog.mk OBJS += $(OBJS_$(dir)) - + +dir := ./components/redis +include $(dir)/redis.mk +OBJS += $(OBJS_$(dir)) + dir := ./rt include $(dir)/rt.mk OBJS += $(OBJS_$(dir)) diff --git a/src/cert_init.h b/src/cert_init.h index 869c8b6..d44b300 100644 --- a/src/cert_init.h +++ b/src/cert_init.h @@ -15,6 +15,8 @@ #include #include +#include "rd_lock.h" + struct request_t{ #define DATALEN 64 char host[DATALEN]; @@ -25,6 +27,8 @@ struct request_t{ int valid; + struct rd_lock_scb mtx; + struct evhttp_request *evh_req; }; diff --git a/src/cert_session.c b/src/cert_session.c index 5b96c5b..e7d7a65 100644 --- a/src/cert_session.c +++ b/src/cert_session.c @@ -25,6 +25,7 @@ #include "rt_common.h" #include "rt_stdlib.h" #include "rt_file.h" +#include "rt_time.h" #include "cert_init.h" #include "async.h" #include "read.h" @@ -47,7 +48,7 @@ static libevent_thread *threads; struct fs_stats_t{ - int line_ids[3]; + int line_ids[4]; screen_stat_handle_t handle; }; @@ -58,6 +59,11 @@ static struct fs_stats_t SGstats = { #define sizeof_seconds(x) (x * 24 * 60 * 60) +rt_mutex entries_mtx = PTHREAD_MUTEX_INITIALIZER; + +uint64_t startTime = 0; +uint64_t endTime = 0; + void connectCallback(const struct redisAsyncContext *c, int status) { if (status != REDIS_OK) { mesa_runtime_log(RLOG_LV_FATAL, MODULE_NAME, "Redis connect error : %s\n", c->errstr); @@ -562,7 +568,19 @@ X509 *x509_create_cert(char *host, int days) return x509; } -int cert_redis_init(struct event_base *base, struct redisAsyncContext **cl_ctx) +int redis_sync_int(struct redisContext **c) +{ + struct config_bucket_t *redis = cert_default_config(); + + struct timeval timeout = { 1, 500000 }; // 1.5 seconds + + *c = redisConnectWithTimeout(redis->r_ip, redis->r_port, timeout); + + return 0; +} + + +int redis_rsync_init(struct event_base *base, struct redisAsyncContext **cl_ctx) { int xret = -1; struct config_bucket_t *redis = cert_default_config(); @@ -587,10 +605,18 @@ rd_set_callback(redisAsyncContext __attribute__((__unused__))*c, void *r, { redisReply *reply = (redisReply*)r; - char *host = (char *)privdata; + struct request_t *request = (struct request_t *)privdata; + +#ifdef RD_MUTEX_LOCK + libevent_thread *thread = threads + request->thread_id; + rd_mutex_unlock(&request->mtx, thread->sync); +#endif + if(reply->type == REDIS_REPLY_ERROR){ - mesa_runtime_log(RLOG_LV_FATAL, MODULE_NAME, "Writing data(key = %s) to redis failed", host); + mesa_runtime_log(RLOG_LV_FATAL, MODULE_NAME, "Writing data(key = %s) to redis failed", request->host); } + + kfree(request); return; } @@ -657,16 +683,42 @@ finish: return xret; } +static int fs_internal_operate(int id, int id2, int column_id, int column_id2, long long diffTime) +{ + int ret = -1, value = -1; + screen_stat_handle_t handle = SGstats.handle; + + FS_internal_operate(handle, id, column_id, FS_OP_ADD, 1); + + if (id2 < 0) + goto finish; + + FS_internal_operate(handle, id2, 0, FS_OP_ADD, 1); + + if (column_id2 < 0) + goto finish; + + value = FS_internal_operate(handle, id, column_id, FS_OP_GET, 0); + if (value < 0) + goto finish; + + ret = FS_internal_operate(handle, id, column_id2, FS_OP_SET, diffTime/value); +finish: + return ret; +} + static int -rd_decode_sendbuf(struct request_t *request, redisAsyncContext *c, char *sendbuf) +rd_encode_sendbuf(struct request_t *request, redisAsyncContext *c, char *sendbuf) { int xret = -1; + uint64_t startTime = 0, endTime = 0; libevent_thread *thread = threads + request->thread_id; struct config_bucket_t *rte = cert_default_config(); char cert[SG_DATA_SIZE] = {0}, pubkey[SG_DATA_SIZE] = {0}; + startTime = rt_time_ms(); x509_online_append(request->host, thread->key, thread->root, cert, pubkey); if (cert[0] == '\0' && pubkey[0] == '\0'){ mesa_runtime_log(RLOG_LV_FATAL, MODULE_NAME, "Failed to issue certificate"); @@ -674,36 +726,44 @@ rd_decode_sendbuf(struct request_t *request, redisAsyncContext *c, char *sendbuf goto finish; } - FS_internal_operate(SGstats.handle,thread->column_ids,thread->field_ids,SGstats.line_ids[2], FS_OP_ADD, 1); + endTime = rt_time_ms(); + thread->diffTime += (endTime - startTime); + //printf("%lu - %lu = %lu(%lu)\n", startTime, endTime, endTime - startTime, thread->diffTime); + fs_internal_operate(thread->column_ids, thread->field_ids, SGstats.line_ids[2], SGstats.line_ids[3], thread->diffTime); snprintf(sendbuf, SG_DATA_SIZE * 2, "%s%s", pubkey, cert); - xret = redisAsyncCommand(c, rd_set_callback, request->host, "SETEX %s %d %s", + xret = redisAsyncCommand(c, rd_set_callback, request, "SETEX %s %d %s", request->host, sizeof_seconds(rte->days), sendbuf); if (xret < 0){ mesa_runtime_log(RLOG_LV_FATAL, MODULE_NAME, "Failed to set information to redis server"); goto finish; } - xret = 0; finish: return xret; } static int -rd_encode_sendbuf(struct request_t *request, redisReply *reply, char *sendbuf) +rd_decode_sendbuf(struct request_t *request, redisReply *reply, char *sendbuf) { int xret = -1; libevent_thread *thread = threads + request->thread_id; +#ifdef RD_MUTEX_LOCK + rd_mutex_unlock(&request->mtx, thread->sync); +#endif if (reply && reply->str){ - FS_internal_operate(SGstats.handle,thread->column_ids,thread->field_ids,SGstats.line_ids[1],FS_OP_ADD,1); + fs_internal_operate(thread->column_ids,thread->field_ids, SGstats.line_ids[1], -1, 0); + snprintf(sendbuf, SG_DATA_SIZE * 2, "%s", reply->str); xret = 0; } else{ evhttp_send_error(request->evh_req, HTTP_BADREQUEST, 0); } + + kfree(request); return xret; } @@ -720,14 +780,14 @@ void rd_get_callback(redisAsyncContext *c, void *r, void *privdata) case REDIS_REPLY_STRING: mesa_runtime_log(RLOG_LV_INFO, MODULE_NAME, "Sends the certificate information to the requestor"); - xret = rd_encode_sendbuf(request, reply, sendbuf); + xret = rd_decode_sendbuf(request, reply, sendbuf); break; case REDIS_REPLY_NIL: /* Certificate information modification and Strategy to judge**/ mesa_runtime_log(RLOG_LV_INFO, MODULE_NAME, "Generating certificate information"); - xret = rd_decode_sendbuf(request, c, sendbuf); + xret = rd_encode_sendbuf(request, c, sendbuf); break; default: break; @@ -737,7 +797,7 @@ void rd_get_callback(redisAsyncContext *c, void *r, void *privdata) evhttp_socket_send(request->evh_req, sendbuf); finish: - kfree(request); + //kfree(request); return; } @@ -893,7 +953,7 @@ pthread_work_proc(struct evhttp_request *evh_req, void *arg) case EVHTTP_REQ_PATCH: cmdtype = "PATCH"; break; default: cmdtype = "unknown"; break; } - FS_internal_operate(SGstats.handle,thread_info->column_ids,-1,SGstats.line_ids[0], FS_OP_ADD, 1); + fs_internal_operate(thread_info->column_ids, -1, SGstats.line_ids[0], -1, 0); rt_decode_uri(uri, request->host, &request->flag, &request->valid); mesa_runtime_log(RLOG_LV_INFO, MODULE_NAME, "[Thread %d]Received a %s request for %s, host:%s, flag:%d, valid:%d\nHeaders:", @@ -901,7 +961,9 @@ pthread_work_proc(struct evhttp_request *evh_req, void *arg) request->flag, request->valid); if (request->host[0] != '\0' && request->evh_req != NULL){ - +#ifdef RD_MUTEX_LOCK + rd_mutex_lock("key", 30, &request->mtx, thread_info->sync); +#endif xret = redisAsyncCommand(thread_info->cl_ctx, rd_get_callback, request, "GET %s", request->host); if (xret < 0) mesa_runtime_log(RLOG_LV_FATAL, MODULE_NAME, "Failed to get information from redis server"); @@ -927,11 +989,14 @@ cert_trapper_task_int(struct event_base *base, libevent_thread *me) int xret = -1; /* Initialize the redis connection*/ - xret = cert_redis_init(base, &me->cl_ctx); + xret = redis_rsync_init(base, &me->cl_ctx); if (xret < 0 || !me->cl_ctx){ mesa_runtime_log(RLOG_LV_FATAL, MODULE_NAME, "Initialize the redis connection is failure\n"); goto finish; } + + xret = redis_sync_int(&me->sync); + /* Initialize the X509 CA*/ xret = x509_privatekey_init(&me->key, &me->root); if (xret < 0 || !me->key || !me->root){ @@ -1064,6 +1129,9 @@ libevent_socket_init() goto finish; } + /*mutex init **/ + rd_lock_init(); + threads = calloc(thread_nu, sizeof(libevent_thread)); if (! threads) { mesa_runtime_log(RLOG_LV_INFO, MODULE_NAME, "Can't allocate thread descriptors"); @@ -1148,6 +1216,9 @@ fs_screen_init() snprintf(buff,sizeof(buff),"%s", "sign"); SGstats.line_ids[2] = FS_internal_register(SGstats.handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, buff); + snprintf(buff,sizeof(buff),"%s", "ssl(ms)"); + SGstats.line_ids[3] = FS_internal_register(SGstats.handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, buff); + FS_internal_start(SGstats.handle); return 0; diff --git a/src/cert_session.h b/src/cert_session.h index 2512707..848c0b4 100644 --- a/src/cert_session.h +++ b/src/cert_session.h @@ -12,22 +12,6 @@ #include "MESA_list_queue.h" #include "rt_sync.h" -enum conn_states { - conn_listening, /**< the socket which listens for connections */ - conn_new_cmd, /**< Prepare connection for next command */ - conn_waiting, /**< waiting for a readable socket */ - conn_read, /**< reading in a command line */ - conn_parse_cmd, /**< try to parse a command from the input buffer */ - conn_write, /**< writing out a simple response */ - conn_nread, /**< reading in a fixed number of bytes */ - conn_swallow, /**< swallowing unnecessary bytes w/o storing */ - conn_closing, /**< closing this connection */ - conn_mwrite, /**< writing out many items sequentially */ - conn_closed, /**< connection is closed */ - conn_watch, /**< held by the logger thread as a watcher */ - conn_max_state /**< Max state value (used for assertion) */ -}; - typedef struct { int id; @@ -43,11 +27,16 @@ typedef struct { struct redisAsyncContext *cl_ctx; + struct redisContext *sync; + void * (*routine)(void *); /** Executive entry */ int field_ids; /* dispaly */ int column_ids; + + uint64_t diffTime; + } libevent_thread; extern int cert_session_init(); diff --git a/src/components/redis/rd_lock.c b/src/components/redis/rd_lock.c new file mode 100644 index 0000000..68587b2 --- /dev/null +++ b/src/components/redis/rd_lock.c @@ -0,0 +1,206 @@ +/************************************************************************* + > File Name: rd_lock.c + > Author: + > Mail: + > Created Time: 2018Äê07ÔÂ05ÈÕ ÐÇÆÚËÄ 11ʱ01·Ö39Ãë + ************************************************************************/ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "rd_lock.h" +#include "rt_string.h" + +struct rd_RedLock{ + float m_clockDriftFactor; + sds m_unlockScript; + int m_retryCount; + int m_retryDelay; + char *m_continueLockScript; +}; + +static struct rd_RedLock redlock = { + .m_clockDriftFactor = 0.01, + .m_unlockScript = NULL, + .m_retryCount = 0, + .m_retryDelay = 0, + .m_continueLockScript = NULL, +}; + +struct rd_RedLock *mutx_redlock() +{ + return &redlock; +} + +static char * +get_unique_lockid() +{ + int i = 0; + char *s = NULL; + char value[10] = "0123456789"; + unsigned char buffer[20]; + + struct timeval t1; + gettimeofday(&t1, NULL); + srand(t1.tv_usec * t1.tv_sec); + + for (int i = 0; i < 20; ++i) { + buffer[i] = value[rand() % 10]; + } + //»ñÈ¡20byteµÄËæ»úÊý¾Ý + s = sdsempty(); + for (i = 0; i < 20; i++) { + s = sdscatprintf(s, "%02X", buffer[i]); + } + + return s; +} + +static int +rd_lock_instance(redisContext *c, const char *resource, + const char *val, const int ttl) +{ + int xret = 0; + redisReply *reply; + + reply = (redisReply *)redisCommand(c, "set %s %s px %d nx", resource, val, ttl); + if (NULL == reply) + goto finish; + + //printf("Set return: %s [null == fail, OK == success]\n", reply->str); + + if (reply->str && STRCMP(reply->str, "OK") == 0) { + xret = 1; + } + freeReplyObject(reply); + +finish: + return xret; +} + +static char **convertToSds(int count, char** args) +{ + int j; + char **sds = (char**)malloc(sizeof(char*)*count); + for(j = 0; j < count; j++) + sds[j] = sdsnew(args[j]); + return sds; +} + +redisReply *rd_command_argv(redisContext *c, int argc, char **inargv) +{ + redisReply *reply = NULL; + + char **argv; + argv = convertToSds(argc, inargv); + + size_t *argvlen; + argvlen = (size_t *)malloc(argc * sizeof(size_t)); + + for (int j = 0; j < argc; j++) + argvlen[j] = sdslen(argv[j]); + + reply = (redisReply *)redisCommandArgv(c, argc, (const char **)argv, argvlen); + if (reply) { + //printf("RedisCommandArgv return: %lld\n", reply->integer); + } + free(argvlen); + sdsfreesplitres(argv, argc); + return reply; +} + +int rd_mutex_unlock(struct rd_lock_scb *mtx, struct redisContext *c) +{ + int argc = 5; + struct rd_RedLock *redlock = mutx_redlock(); + + char *unlockScriptArgv[] = {(char*)"EVAL", + redlock->m_unlockScript, + (char*)"1", + (char*)mtx->m_resource, + (char*)mtx->m_val}; + + redisReply *reply = rd_command_argv(c, argc, unlockScriptArgv); + if (reply) { + freeReplyObject(reply); + } + + if (reply->integer){ + sdsfree(mtx->m_resource); + sdsfree(mtx->m_val); + } + return 0; +} + +/* redis lock*/ +int rd_mutex_lock(const char *resource, const int ttl, + struct rd_lock_scb *mtx, struct redisContext *c) +{ + struct rd_RedLock *redlock = mutx_redlock(); + + char *val = NULL; + int retryCount =0, xret = 0; + + val = get_unique_lockid(); + if (!val) { + return xret; + } + mtx->m_resource = sdsnew(resource); + mtx->m_val = val; + retryCount = redlock->m_retryCount; + + do { + int n = 0; + int startTime = (int)time(NULL) * 1000; + + if (c == NULL || c->err) { + goto finish; + } + + if (rd_lock_instance(c, resource, val, ttl)) { + n++; + } + + int drift = (ttl * redlock->m_clockDriftFactor) + 2; + int validityTime = ttl - ((int)time(NULL) * 1000 - startTime) - drift; + //printf("The resource validty time is %d, n is %d\n", + // validityTime, n); + + if (n > 0 && validityTime > 0) { + mtx->m_validityTime = validityTime; + xret = 1; + goto finish; + } else { + printf("The resource validty time is %d, n is %d\n", + validityTime, n); + } + // Wait a random delay before to retry + int delay = rand() % redlock->m_retryDelay + floor(redlock->m_retryDelay / 2); + printf("[Test] delay = %d\n", delay); + usleep(delay * 1000); + retryCount--; + } while (retryCount > 0); + +finish: + return xret; +} + +void rd_lock_init() +{ + struct rd_RedLock *rdlock = mutx_redlock(); + + rdlock->m_continueLockScript = sdsnew("if redis.call('get', KEYS[1]) == ARGV[1] then redis.call('del', KEYS[1]) end return redis.call('set', KEYS[1], ARGV[2], 'px', ARGV[3], 'nx')"); + rdlock->m_unlockScript = sdsnew("if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"); + rdlock->m_retryCount = 8; + rdlock->m_retryDelay = 10; + rdlock->m_clockDriftFactor = 0.01; + + return; +} + diff --git a/src/components/redis/rd_lock.h b/src/components/redis/rd_lock.h new file mode 100644 index 0000000..a6a9b74 --- /dev/null +++ b/src/components/redis/rd_lock.h @@ -0,0 +1,26 @@ +/************************************************************************* + > File Name: rd_lock.h + > Author: + > Mail: + > Created Time: 2018å¹´07月05æ—¥ 星期四 11æ—¶02分03ç§’ + ************************************************************************/ + +#ifndef _RD_LOCK_H +#define _RD_LOCK_H + +#include "hiredis.h" + +struct rd_lock_scb{ + int m_validityTime; + sds m_resource; + sds m_val; +}; + +void rd_lock_init(); + +int rd_mutex_lock(const char *resource, const int ttl, + struct rd_lock_scb *mtx, struct redisContext *c); + +int rd_mutex_unlock(struct rd_lock_scb *mtx, struct redisContext *c); + +#endif diff --git a/src/components/redis/redis.mk b/src/components/redis/redis.mk new file mode 100644 index 0000000..5baf45d --- /dev/null +++ b/src/components/redis/redis.mk @@ -0,0 +1,40 @@ + + +# standard component Makefile header +sp := $(sp).x +dirstack_$(sp) := $(d) +d := $(dir) + +# component specification + +OBJS_$(d) :=\ + $(OBJ_DIR)/rd_lock.o\ + +CFLAGS_LOCAL += -I$(d) +$(OBJS_$(d)): CFLAGS_LOCAL := -std=gnu99 -W -Wall -Wunused-parameter -g -O3 \ + -I$(d)\ + -I$(d)/../../rt\ + -I$(d)/../../inc\ + + +# standard component Makefile rules + +DEPS_$(d) := $(OBJS_$(d):.o=.d) + +#LIBS_LIST := $(LIBS_LIST) $(LIBRARY) +LIBS_LIST := $(LIBS_LIST) + +CLEAN_LIST := $(CLEAN_LIST) $(OBJS_$(d)) $(DEPS_$(d)) + +-include $(DEPS_$(d)) + +#$(LIBRARY): $(OBJS) +# $(MYARCHIVE) + +$(OBJ_DIR)/%.o: $(d)/%.c + $(COMPILE) + +# standard component Makefile footer + +d := $(dirstack_$(sp)) +sp := $(basename $(sp)) diff --git a/src/inc/field_stat2.h b/src/inc/field_stat2.h index 6b7ca4a..d27c309 100644 --- a/src/inc/field_stat2.h +++ b/src/inc/field_stat2.h @@ -21,7 +21,8 @@ enum field_calc_algo enum field_op { FS_OP_ADD=1, - FS_OP_SET + FS_OP_SET, + FS_OP_GET, }; diff --git a/src/inc/moodycamel_field_stat2.cpp b/src/inc/moodycamel_field_stat2.cpp index d872083..a618b26 100644 --- a/src/inc/moodycamel_field_stat2.cpp +++ b/src/inc/moodycamel_field_stat2.cpp @@ -15,7 +15,7 @@ extern "C" int FS_internal_set_para(screen_stat_handle_t handle, enum FS_option extern "C" void FS_internal_start(screen_stat_handle_t handle); extern "C" int FS_internal_register(screen_stat_handle_t handle,enum field_dsp_style_t style, enum field_calc_algo calc_type,const char* name); -extern "C" int FS_internal_operate(screen_stat_handle_t handle,int id,int id2,int column_id,enum field_op op,long long value); +extern "C" int FS_internal_operate(screen_stat_handle_t handle,int id,int column_id,enum field_op op,long long value); screen_stat_handle_t FS_internal_create_handle(void) { @@ -38,19 +38,8 @@ int FS_internal_register(screen_stat_handle_t handle,enum field_dsp_style_t styl return FS_register(handle, style, calc_type, name); } -int FS_internal_operate(screen_stat_handle_t handle,int id,int id2,int column_id,enum field_op op,long long value) +int FS_internal_operate(screen_stat_handle_t handle,int id,int column_id,enum field_op op,long long value) { - int ret = -1; - - ret = FS_operate(handle, id, column_id, op, value); - if (ret < 0) - goto finish; - - if (id2 < 0) - goto finish; - - ret = FS_operate(handle, id2, 0, op, value); -finish: - return ret; + return FS_operate(handle, id, column_id, op, value); } diff --git a/src/inc/moodycamel_field_stat2.h b/src/inc/moodycamel_field_stat2.h index 31b4a10..64d8a52 100644 --- a/src/inc/moodycamel_field_stat2.h +++ b/src/inc/moodycamel_field_stat2.h @@ -23,7 +23,8 @@ enum field_calc_algo enum field_op { FS_OP_ADD=1, - FS_OP_SET + FS_OP_SET, + FS_OP_GET, }; typedef void* screen_stat_handle_t; @@ -53,6 +54,6 @@ void FS_internal_start(screen_stat_handle_t handle); int FS_internal_register(screen_stat_handle_t handle,enum field_dsp_style_t style, enum field_calc_algo calc_type,const char* name); -int FS_internal_operate(screen_stat_handle_t handle,int id,int id2,int column_id,enum field_op op,long long value); +int FS_internal_operate(screen_stat_handle_t handle,int id,int column_id,enum field_op op,long long value); #endif diff --git a/src/lib/libMESA_field_stat2.a b/src/lib/libMESA_field_stat2.a index bc1b2c2..7ba8281 100644 Binary files a/src/lib/libMESA_field_stat2.a and b/src/lib/libMESA_field_stat2.a differ