diff --git a/src/entry/Maat_command.cpp b/src/entry/Maat_command.cpp index ac96c4c..ed1719f 100644 --- a/src/entry/Maat_command.cpp +++ b/src/entry/Maat_command.cpp @@ -13,7 +13,7 @@ #define maat_redis_monitor (module_name_str("MAAT_REDIS_MONITOR")) #define maat_command (module_name_str("MAAT_COMMAND")) - +const time_t MAAT_REDIS_RECONNECT_INTERVAL=5; const char* rm_key_prefix[2]={"OBSOLETE_RULE","EFFECTIVE_RULE"}; const char* rm_status_sset="MAAT_UPDATE_STATUS"; const char* rm_expire_sset="MAAT_EXPIRE_TIMER"; @@ -44,8 +44,8 @@ redisReply *_wrap_redisCommand(redisContext *c, const char *format, ...) { va_list ap; void *reply = NULL; - int ret=0,retry=0; - while(reply==NULL&&retry<2) + int ret=REDIS_ERR, retry=0; + while(reply==NULL&&retry<2&&ret!=REDIS_OK) { va_start(ap,format); reply = redisvCommand(c,format,ap); @@ -54,10 +54,6 @@ redisReply *_wrap_redisCommand(redisContext *c, const char *format, ...) { ret=redisReconnect(c); retry++; - if(ret==REDIS_OK) - { - break; - } } } return (redisReply *)reply; @@ -102,7 +98,7 @@ long long read_redis_integer(const redisReply* reply) return atoll(reply->str); break; default: - assert(0); + return -1; break; } return 0; @@ -220,7 +216,7 @@ int invalidate_line(char* line, enum MAAT_TABLE_TYPE type,int valid_column_seq) } void serialize_region(const struct Maat_region_t* p,int group_id, char* buff,int size) { - int ret=0; + UNUSED int ret=0; switch(p->region_type) { case REGION_IP: @@ -343,7 +339,8 @@ int get_inc_key_list(long long instance_version, long long target_version, redis { redisReply* reply=NULL,*tmp_reply=NULL; char err_buff[256], op_str[4]; - int rule_num=0,ret=0; + int rule_num=0; + UNUSED int ret=0; unsigned int i=0; long long nearest_rule_version; struct serial_rule_t *s_rule=NULL; @@ -354,7 +351,7 @@ int get_inc_key_list(long long instance_version, long long target_version, redis if(reply==NULL) { - __redis_strerror_r(errno,err_buff,sizeof(err_buff)); + __redis_strerror_r(errno,err_buff,sizeof(err_buff)-1); MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, "GET %s failed %s.",rm_status_sset,err_buff); return -1; @@ -512,7 +509,7 @@ int get_rm_key_list(redisContext *c, long long instance_version, long long desir else { memset(err_buff,0,sizeof(err_buff)); - __redis_strerror_r(errno,err_buff,sizeof(err_buff)); + __redis_strerror_r(errno,err_buff,sizeof(err_buff)-1); MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, "GET MAAT_VERSION failed %s. Reconnecting...",err_buff); _wrap_redisReconnect(c,logger); @@ -647,7 +644,8 @@ FULL_UPDATE: int _get_maat_redis_value(redisContext *c,struct serial_rule_t* rule_list,int rule_num,void* logger) { - int i=0,ret=0,failed_cnt=0,idx=0; + int i=0,failed_cnt=0,idx=0; + UNUSED int ret=0; int error_happened=0; int *retry_ids=(int*)malloc(sizeof(int)*rule_num); char redis_cmd[256]; @@ -983,7 +981,7 @@ struct expected_reply_t long long _exec_serial_rule_begin(redisContext* ctx,int rule_num, int renew_rule_num,int *renew_allowed, long long *maat_redis_version) { - int ret=0; + int ret=-1; redisReply* data_reply=NULL; if(renew_rule_num>0) { @@ -998,12 +996,16 @@ long long _exec_serial_rule_begin(redisContext* ctx,int rule_num, int renew_rule data_reply=_wrap_redisCommand(ctx, "INCRBY MAAT_PRE_VER 1"); *maat_redis_version=read_redis_integer(data_reply); freeReplyObject(data_reply); + if(*maat_redis_version<0) + { + return -1; + } } if(*renew_allowed==1||rule_num>renew_rule_num) { data_reply=_wrap_redisCommand(ctx,"MULTI"); freeReplyObject(data_reply); - ret=1; + ret=0; } return ret; } @@ -1180,8 +1182,9 @@ int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_r } ret=_exec_serial_rule_begin(ctx,serial_rule_num,renew_num, &renew_allowed, &new_version); - if(ret!=1)//Preconditions of transaction is not qualified. + if(ret!=0)//Preconditions of transaction is not qualified. { + success_cnt=-1; goto error_out; } while(success_cnttype==REDIS_REPLY_ARRAY); + if(reply->type!=REDIS_REPLY_ARRAY) + { + goto error_out; + } sub_reply=reply->element[0]; - assert(sub_reply->type==REDIS_REPLY_ARRAY); + if(sub_reply->type!=REDIS_REPLY_ARRAY) + { + goto error_out; + } version_num=sub_reply->elements; if(version_num==0) { - freeReplyObject(reply); - return; + goto error_out; } version_lower_bound=read_redis_integer(sub_reply->element[0]); version_upper_bound=read_redis_integer(sub_reply->element[sub_reply->elements-1]); @@ -1381,7 +1389,9 @@ void cleanup_update_status(redisContext *ctx, void *logger) ,version_upper_bound ,version_num ,entry_num); - +error_out: + freeReplyObject(reply); + return; } const char* find_Nth_column(const char* line, int Nth, int* column_len) { @@ -1566,7 +1576,8 @@ struct foreign_conts_track }; void _get_foreign_conts(redisContext *ctx, struct serial_rule_t* rule_list, int rule_num, int print_fn, void *logger) { - int i=0, j=0, ret=0; + int i=0, j=0; + UNUSED int ret=0; int key_num=0; struct foreign_conts_track* track=ALLOC(struct foreign_conts_track, rule_num*MAX_FOREIGN_CLMN_NUM); char redis_cmd[256]; @@ -1669,9 +1680,18 @@ void redis_monitor_traverse(long long version,redisContext *c, redlock_unlock(feather->redis_read_ctx,rm_expire_lock); } } - if(c==NULL||c->err) + if(c==NULL) return; + if(c->err) { - return; + if(time(NULL)-feather->last_reconnect_time < MAAT_REDIS_RECONNECT_INTERVAL) + { + return; + } + feather->last_reconnect_time=time(NULL); + if(0!=_wrap_redisReconnect(c, logger)) + { + return; + } } rule_num=get_rm_key_list(c, version, feather->load_version_from, &new_version, &rule_list, &update_type, logger, feather->cumulative_update_off); @@ -1999,6 +2019,11 @@ int Maat_cmd_set_lines(Maat_feather_t feather,const struct Maat_line_t** line_ru set_serial_rule(s_rule+i, op,line_rule[i]->rule_id,line_rule[i]->label_id,line_rule[i]->table_name,line_rule[i]->table_line, absolute_expire_time); } success_cnt=exec_serial_rule(_feather->redis_write_ctx,s_rule, line_num,server_time,_feather->logger); + if(success_cnt<0)//error + { + ret=-1; + goto error_out; + } assert(success_cnt==line_num); ret=success_cnt; _feather->line_cmd_acc_num+=success_cnt; @@ -2176,7 +2201,7 @@ int Maat_cmd_commit(Maat_feather_t feather) int ret=0,i=0; int new_region_num=0,new_group_num=0; int serial_rule_num=0,serial_rule_idx=0; - int transection_success=1; + UNUSED int transection_success=1; struct _Maat_cmd_inner_t* p=NULL,*n=NULL; redisContext* ctx=NULL; @@ -2211,12 +2236,22 @@ int Maat_cmd_commit(Maat_feather_t feather) if(_feather->AUTO_NUMBERING_ON==1) { data_reply=_wrap_redisCommand(ctx,"INCRBY SEQUENCE_REGION %d",new_region_num); - assert(data_reply->type==REDIS_REPLY_INTEGER); + if(data_reply->type!=REDIS_REPLY_INTEGER) + { + freeReplyObject(data_reply); + ret=-1; + goto error_out; + } _feather->base_rgn_seq=data_reply->integer-new_region_num; freeReplyObject(data_reply); data_reply=_wrap_redisCommand(ctx,"INCRBY SEQUENCE_GROUP %d",new_group_num); - assert(data_reply->type==REDIS_REPLY_INTEGER); + if(data_reply->type!=REDIS_REPLY_INTEGER) + { + freeReplyObject(data_reply); + ret=-1; + goto error_out; + } _feather->base_grp_seq=data_reply->integer-new_group_num; freeReplyObject(data_reply); } @@ -2244,7 +2279,7 @@ error_out: _feather->cmd_qhead=_feather->cmd_qtail=NULL; _feather->cmd_q_cnt=0; - for(i=0;iredis_write_ctx,"INCRBY %s %d", key, increment); - assert(data_reply->type==REDIS_REPLY_INTEGER); - result=data_reply->integer; + if(data_reply->type==REDIS_REPLY_INTEGER) + { + result=data_reply->integer; + } + else + { + result=-1; + } freeReplyObject(data_reply); return result; } diff --git a/src/entry/Maat_rule.cpp b/src/entry/Maat_rule.cpp index adefe75..53699e0 100644 --- a/src/entry/Maat_rule.cpp +++ b/src/entry/Maat_rule.cpp @@ -482,6 +482,7 @@ error_out: int lqueue_destroy_cb(void *data, long data_len, void *arg) { assert(0); + return 0; } void * HASH_fetch_by_id(MESA_htable_handle hash,int id) { @@ -901,7 +902,7 @@ void* create_bool_matcher(MESA_htable_handle compile_hash,int thread_num,void* l MESA_lqueue_head update_q=MESA_lqueue_create(0,0);; long data_size=0; unsigned int mem_size=0; - MESA_queue_errno_t q_ret=MESA_QUEUE_RET_OK; + UNUSED MESA_queue_errno_t q_ret=MESA_QUEUE_RET_OK; data_size=sizeof(void*); universal_bool_expr_t* one_set=NULL; universal_bool_expr_t* set_array=NULL; @@ -982,7 +983,7 @@ void force_destroy_compile_rule(struct _Maat_compile_inner_t * p) void destroy_compile_rule(struct _Maat_compile_inner_t * p) { int i=0; - struct _Maat_compile_inner_t* p_group=NULL; + UNUSED struct _Maat_compile_inner_t* p_group=NULL; assert(p->group_cnt==0); for(i=0;igroup_boundary;i++) { @@ -1155,7 +1156,7 @@ struct _Maat_scanner_t* create_maat_scanner(unsigned int version,_Maat_feather_t int i=0,j=0; unsigned int sub_type=0; - int ret=0; + UNUSED int ret=0; MESA_htable_create_args_t hargs; memset(&hargs,0,sizeof(hargs)); @@ -1255,7 +1256,8 @@ struct _Maat_scanner_t* create_maat_scanner(unsigned int version,_Maat_feather_t void destroy_maat_scanner(struct _Maat_scanner_t*scanner) { long q_cnt=0,data_size=0; - int i=0,j=0,q_ret=0; + int i=0,j=0; + UNUSED int q_ret=0; struct op_expr_t* op_expr=NULL; GIE_digest_t* digest_rule=NULL; if(scanner==NULL) @@ -1386,7 +1388,7 @@ void rulescan_batch_update(rule_scanner_t rs_handle,MESA_lqueue_head expr_queue, int j=0,ret=0; unsigned int failed_ids[MAX_FAILED_NUM]; char failed_info[512],*p=NULL; - MESA_queue_errno_t q_ret=MESA_QUEUE_RET_OK; + UNUSED MESA_queue_errno_t q_ret=MESA_QUEUE_RET_OK; memset(failed_ids,0,sizeof(failed_ids)); memset(failed_info,0,sizeof(failed_info)); const long q_cnt=MESA_lqueue_get_count(expr_queue); @@ -1475,7 +1477,7 @@ void digest_batch_update(GIE_handle_t* handle,MESA_lqueue_head update_q,void*log int ret=0; GIE_digest_t* digest_rule=NULL; GIE_digest_t** update_array=NULL; - MESA_queue_errno_t q_ret=MESA_QUEUE_RET_OK; + UNUSED MESA_queue_errno_t q_ret=MESA_QUEUE_RET_OK; const long q_cnt=MESA_lqueue_get_count(update_q); if(q_cnt==0) { @@ -2945,7 +2947,7 @@ void garbage_bagging(enum maat_garbage_type type,void *p,MESA_lqueue_head garbag } void garbage_bury(MESA_lqueue_head garbage_q,int timeout,void *logger) { - MESA_queue_errno_t q_ret=MESA_QUEUE_RET_OK; + UNUSED MESA_queue_errno_t q_ret=MESA_QUEUE_RET_OK; _maat_garbage_t* bag=NULL; long data_size=0; const long q_cnt=MESA_lqueue_get_count(garbage_q); @@ -3390,7 +3392,8 @@ void *thread_rule_monitor(void *arg) const char* inc_cfg_dir=(const char*)feather->inc_dir; struct _Maat_scanner_t* old_scanner=NULL; long expr_wait_q_cnt=0; - int scan_dir_cnt=0,ret=0; + int scan_dir_cnt=0; + UNUSED int ret=0; char maat_name[16];//Defined by prctl: The name can be up to 16 bytes long,and should // be null terminated if it contains fewer bytes. if(strlen(feather->instance_name)>0) diff --git a/src/inc_internal/Maat_rule_internal.h b/src/inc_internal/Maat_rule_internal.h index 8196ad2..55bb52d 100644 --- a/src/inc_internal/Maat_rule_internal.h +++ b/src/inc_internal/Maat_rule_internal.h @@ -402,6 +402,7 @@ struct _Maat_feather_t int AUTO_NUMBERING_ON; struct timeval connect_timeout; redisContext *redis_read_ctx; + time_t last_reconnect_time; redisContext *redis_write_ctx; // not thread safe. int on_redis_writing; int cmd_q_cnt; diff --git a/src/inc_internal/Maat_utils.h b/src/inc_internal/Maat_utils.h index 8b92bb9..6d23a3b 100644 --- a/src/inc_internal/Maat_utils.h +++ b/src/inc_internal/Maat_utils.h @@ -21,6 +21,7 @@ #define MIN(a, b) (((a) < (b)) ? (a) : (b)) #endif +#define UNUSED __attribute__((unused)) const char* module_name_str(const char*name); #define maat_module (module_name_str("MAAT_Frame"))