1、修复redis不可用后,没有重连的bug,每隔5秒重连一次;2、修复变量未使用的警告;2、增加redis能连上,但不可用时的错误处理。
This commit is contained in:
@@ -13,7 +13,7 @@
|
|||||||
|
|
||||||
#define maat_redis_monitor (module_name_str("MAAT_REDIS_MONITOR"))
|
#define maat_redis_monitor (module_name_str("MAAT_REDIS_MONITOR"))
|
||||||
#define maat_command (module_name_str("MAAT_COMMAND"))
|
#define maat_command (module_name_str("MAAT_COMMAND"))
|
||||||
|
const time_t MAAT_REDIS_RECONNECT_INTERVAL=5;
|
||||||
const char* rm_key_prefix[2]={"OBSOLETE_RULE","EFFECTIVE_RULE"};
|
const char* rm_key_prefix[2]={"OBSOLETE_RULE","EFFECTIVE_RULE"};
|
||||||
const char* rm_status_sset="MAAT_UPDATE_STATUS";
|
const char* rm_status_sset="MAAT_UPDATE_STATUS";
|
||||||
const char* rm_expire_sset="MAAT_EXPIRE_TIMER";
|
const char* rm_expire_sset="MAAT_EXPIRE_TIMER";
|
||||||
@@ -44,8 +44,8 @@ redisReply *_wrap_redisCommand(redisContext *c, const char *format, ...)
|
|||||||
{
|
{
|
||||||
va_list ap;
|
va_list ap;
|
||||||
void *reply = NULL;
|
void *reply = NULL;
|
||||||
int ret=0,retry=0;
|
int ret=REDIS_ERR, retry=0;
|
||||||
while(reply==NULL&&retry<2)
|
while(reply==NULL&&retry<2&&ret!=REDIS_OK)
|
||||||
{
|
{
|
||||||
va_start(ap,format);
|
va_start(ap,format);
|
||||||
reply = redisvCommand(c,format,ap);
|
reply = redisvCommand(c,format,ap);
|
||||||
@@ -54,10 +54,6 @@ redisReply *_wrap_redisCommand(redisContext *c, const char *format, ...)
|
|||||||
{
|
{
|
||||||
ret=redisReconnect(c);
|
ret=redisReconnect(c);
|
||||||
retry++;
|
retry++;
|
||||||
if(ret==REDIS_OK)
|
|
||||||
{
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return (redisReply *)reply;
|
return (redisReply *)reply;
|
||||||
@@ -102,7 +98,7 @@ long long read_redis_integer(const redisReply* reply)
|
|||||||
return atoll(reply->str);
|
return atoll(reply->str);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
assert(0);
|
return -1;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
@@ -220,7 +216,7 @@ int invalidate_line(char* line, enum MAAT_TABLE_TYPE type,int valid_column_seq)
|
|||||||
}
|
}
|
||||||
void serialize_region(const struct Maat_region_t* p,int group_id, char* buff,int size)
|
void serialize_region(const struct Maat_region_t* p,int group_id, char* buff,int size)
|
||||||
{
|
{
|
||||||
int ret=0;
|
UNUSED int ret=0;
|
||||||
switch(p->region_type)
|
switch(p->region_type)
|
||||||
{
|
{
|
||||||
case REGION_IP:
|
case REGION_IP:
|
||||||
@@ -343,7 +339,8 @@ int get_inc_key_list(long long instance_version, long long target_version, redis
|
|||||||
{
|
{
|
||||||
redisReply* reply=NULL,*tmp_reply=NULL;
|
redisReply* reply=NULL,*tmp_reply=NULL;
|
||||||
char err_buff[256], op_str[4];
|
char err_buff[256], op_str[4];
|
||||||
int rule_num=0,ret=0;
|
int rule_num=0;
|
||||||
|
UNUSED int ret=0;
|
||||||
unsigned int i=0;
|
unsigned int i=0;
|
||||||
long long nearest_rule_version;
|
long long nearest_rule_version;
|
||||||
struct serial_rule_t *s_rule=NULL;
|
struct serial_rule_t *s_rule=NULL;
|
||||||
@@ -354,7 +351,7 @@ int get_inc_key_list(long long instance_version, long long target_version, redis
|
|||||||
|
|
||||||
if(reply==NULL)
|
if(reply==NULL)
|
||||||
{
|
{
|
||||||
__redis_strerror_r(errno,err_buff,sizeof(err_buff));
|
__redis_strerror_r(errno,err_buff,sizeof(err_buff)-1);
|
||||||
MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
|
MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
|
||||||
"GET %s failed %s.",rm_status_sset,err_buff);
|
"GET %s failed %s.",rm_status_sset,err_buff);
|
||||||
return -1;
|
return -1;
|
||||||
@@ -512,7 +509,7 @@ int get_rm_key_list(redisContext *c, long long instance_version, long long desir
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
memset(err_buff,0,sizeof(err_buff));
|
memset(err_buff,0,sizeof(err_buff));
|
||||||
__redis_strerror_r(errno,err_buff,sizeof(err_buff));
|
__redis_strerror_r(errno,err_buff,sizeof(err_buff)-1);
|
||||||
MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
|
MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
|
||||||
"GET MAAT_VERSION failed %s. Reconnecting...",err_buff);
|
"GET MAAT_VERSION failed %s. Reconnecting...",err_buff);
|
||||||
_wrap_redisReconnect(c,logger);
|
_wrap_redisReconnect(c,logger);
|
||||||
@@ -647,7 +644,8 @@ FULL_UPDATE:
|
|||||||
|
|
||||||
int _get_maat_redis_value(redisContext *c,struct serial_rule_t* rule_list,int rule_num,void* logger)
|
int _get_maat_redis_value(redisContext *c,struct serial_rule_t* rule_list,int rule_num,void* logger)
|
||||||
{
|
{
|
||||||
int i=0,ret=0,failed_cnt=0,idx=0;
|
int i=0,failed_cnt=0,idx=0;
|
||||||
|
UNUSED int ret=0;
|
||||||
int error_happened=0;
|
int error_happened=0;
|
||||||
int *retry_ids=(int*)malloc(sizeof(int)*rule_num);
|
int *retry_ids=(int*)malloc(sizeof(int)*rule_num);
|
||||||
char redis_cmd[256];
|
char redis_cmd[256];
|
||||||
@@ -983,7 +981,7 @@ struct expected_reply_t
|
|||||||
|
|
||||||
long long _exec_serial_rule_begin(redisContext* ctx,int rule_num, int renew_rule_num,int *renew_allowed, long long *maat_redis_version)
|
long long _exec_serial_rule_begin(redisContext* ctx,int rule_num, int renew_rule_num,int *renew_allowed, long long *maat_redis_version)
|
||||||
{
|
{
|
||||||
int ret=0;
|
int ret=-1;
|
||||||
redisReply* data_reply=NULL;
|
redisReply* data_reply=NULL;
|
||||||
if(renew_rule_num>0)
|
if(renew_rule_num>0)
|
||||||
{
|
{
|
||||||
@@ -998,12 +996,16 @@ long long _exec_serial_rule_begin(redisContext* ctx,int rule_num, int renew_rule
|
|||||||
data_reply=_wrap_redisCommand(ctx, "INCRBY MAAT_PRE_VER 1");
|
data_reply=_wrap_redisCommand(ctx, "INCRBY MAAT_PRE_VER 1");
|
||||||
*maat_redis_version=read_redis_integer(data_reply);
|
*maat_redis_version=read_redis_integer(data_reply);
|
||||||
freeReplyObject(data_reply);
|
freeReplyObject(data_reply);
|
||||||
|
if(*maat_redis_version<0)
|
||||||
|
{
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if(*renew_allowed==1||rule_num>renew_rule_num)
|
if(*renew_allowed==1||rule_num>renew_rule_num)
|
||||||
{
|
{
|
||||||
data_reply=_wrap_redisCommand(ctx,"MULTI");
|
data_reply=_wrap_redisCommand(ctx,"MULTI");
|
||||||
freeReplyObject(data_reply);
|
freeReplyObject(data_reply);
|
||||||
ret=1;
|
ret=0;
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@@ -1180,8 +1182,9 @@ int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_r
|
|||||||
}
|
}
|
||||||
|
|
||||||
ret=_exec_serial_rule_begin(ctx,serial_rule_num,renew_num, &renew_allowed, &new_version);
|
ret=_exec_serial_rule_begin(ctx,serial_rule_num,renew_num, &renew_allowed, &new_version);
|
||||||
if(ret!=1)//Preconditions of transaction is not qualified.
|
if(ret!=0)//Preconditions of transaction is not qualified.
|
||||||
{
|
{
|
||||||
|
success_cnt=-1;
|
||||||
goto error_out;
|
goto error_out;
|
||||||
}
|
}
|
||||||
while(success_cnt<serial_rule_num)
|
while(success_cnt<serial_rule_num)
|
||||||
@@ -1296,7 +1299,7 @@ int fix_table_name(_Maat_feather_t* feather,struct Maat_cmd_t* cmd)
|
|||||||
void check_maat_expiration(redisContext *ctx, void *logger)
|
void check_maat_expiration(redisContext *ctx, void *logger)
|
||||||
{
|
{
|
||||||
unsigned int i=0,s_rule_num=0;
|
unsigned int i=0,s_rule_num=0;
|
||||||
int ret=0;
|
UNUSED int ret=0;
|
||||||
int success_cnt=0;
|
int success_cnt=0;
|
||||||
redisReply* data_reply=NULL;
|
redisReply* data_reply=NULL;
|
||||||
struct serial_rule_t* s_rule=NULL;
|
struct serial_rule_t* s_rule=NULL;
|
||||||
@@ -1357,14 +1360,19 @@ void cleanup_update_status(redisContext *ctx, void *logger)
|
|||||||
reply=NULL;
|
reply=NULL;
|
||||||
}
|
}
|
||||||
reply=_wrap_redisCommand(ctx,"EXEC");
|
reply=_wrap_redisCommand(ctx,"EXEC");
|
||||||
assert(reply->type==REDIS_REPLY_ARRAY);
|
if(reply->type!=REDIS_REPLY_ARRAY)
|
||||||
|
{
|
||||||
|
goto error_out;
|
||||||
|
}
|
||||||
sub_reply=reply->element[0];
|
sub_reply=reply->element[0];
|
||||||
assert(sub_reply->type==REDIS_REPLY_ARRAY);
|
if(sub_reply->type!=REDIS_REPLY_ARRAY)
|
||||||
|
{
|
||||||
|
goto error_out;
|
||||||
|
}
|
||||||
version_num=sub_reply->elements;
|
version_num=sub_reply->elements;
|
||||||
if(version_num==0)
|
if(version_num==0)
|
||||||
{
|
{
|
||||||
freeReplyObject(reply);
|
goto error_out;
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
version_lower_bound=read_redis_integer(sub_reply->element[0]);
|
version_lower_bound=read_redis_integer(sub_reply->element[0]);
|
||||||
version_upper_bound=read_redis_integer(sub_reply->element[sub_reply->elements-1]);
|
version_upper_bound=read_redis_integer(sub_reply->element[sub_reply->elements-1]);
|
||||||
@@ -1381,7 +1389,9 @@ void cleanup_update_status(redisContext *ctx, void *logger)
|
|||||||
,version_upper_bound
|
,version_upper_bound
|
||||||
,version_num
|
,version_num
|
||||||
,entry_num);
|
,entry_num);
|
||||||
|
error_out:
|
||||||
|
freeReplyObject(reply);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
const char* find_Nth_column(const char* line, int Nth, int* column_len)
|
const char* find_Nth_column(const char* line, int Nth, int* column_len)
|
||||||
{
|
{
|
||||||
@@ -1566,7 +1576,8 @@ struct foreign_conts_track
|
|||||||
};
|
};
|
||||||
void _get_foreign_conts(redisContext *ctx, struct serial_rule_t* rule_list, int rule_num, int print_fn, void *logger)
|
void _get_foreign_conts(redisContext *ctx, struct serial_rule_t* rule_list, int rule_num, int print_fn, void *logger)
|
||||||
{
|
{
|
||||||
int i=0, j=0, ret=0;
|
int i=0, j=0;
|
||||||
|
UNUSED int ret=0;
|
||||||
int key_num=0;
|
int key_num=0;
|
||||||
struct foreign_conts_track* track=ALLOC(struct foreign_conts_track, rule_num*MAX_FOREIGN_CLMN_NUM);
|
struct foreign_conts_track* track=ALLOC(struct foreign_conts_track, rule_num*MAX_FOREIGN_CLMN_NUM);
|
||||||
char redis_cmd[256];
|
char redis_cmd[256];
|
||||||
@@ -1669,10 +1680,19 @@ void redis_monitor_traverse(long long version,redisContext *c,
|
|||||||
redlock_unlock(feather->redis_read_ctx,rm_expire_lock);
|
redlock_unlock(feather->redis_read_ctx,rm_expire_lock);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if(c==NULL||c->err)
|
if(c==NULL) return;
|
||||||
|
if(c->err)
|
||||||
|
{
|
||||||
|
if(time(NULL)-feather->last_reconnect_time < MAAT_REDIS_RECONNECT_INTERVAL)
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
feather->last_reconnect_time=time(NULL);
|
||||||
|
if(0!=_wrap_redisReconnect(c, logger))
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
rule_num=get_rm_key_list(c, version, feather->load_version_from, &new_version, &rule_list, &update_type, logger, feather->cumulative_update_off);
|
rule_num=get_rm_key_list(c, version, feather->load_version_from, &new_version, &rule_list, &update_type, logger, feather->cumulative_update_off);
|
||||||
feather->load_version_from=0;//only valid for one time.
|
feather->load_version_from=0;//only valid for one time.
|
||||||
@@ -1999,6 +2019,11 @@ int Maat_cmd_set_lines(Maat_feather_t feather,const struct Maat_line_t** line_ru
|
|||||||
}
|
}
|
||||||
set_serial_rule(s_rule+i, op,line_rule[i]->rule_id,line_rule[i]->label_id,line_rule[i]->table_name,line_rule[i]->table_line, absolute_expire_time);
|
set_serial_rule(s_rule+i, op,line_rule[i]->rule_id,line_rule[i]->label_id,line_rule[i]->table_name,line_rule[i]->table_line, absolute_expire_time);
|
||||||
}
|
}
|
||||||
|
success_cnt=exec_serial_rule(_feather->redis_write_ctx,s_rule, line_num,server_time,_feather->logger);
|
||||||
|
if(success_cnt<0)//error
|
||||||
|
{
|
||||||
|
ret=-1;
|
||||||
|
goto error_out;
|
||||||
}
|
}
|
||||||
assert(success_cnt==line_num);
|
assert(success_cnt==line_num);
|
||||||
ret=success_cnt;
|
ret=success_cnt;
|
||||||
@@ -2176,7 +2201,7 @@ int Maat_cmd_commit(Maat_feather_t feather)
|
|||||||
|
|
||||||
int ret=0,i=0;
|
int ret=0,i=0;
|
||||||
int new_region_num=0,new_group_num=0;
|
int new_region_num=0,new_group_num=0;
|
||||||
int serial_rule_num=0,serial_rule_idx=0;
|
int serial_rule_num=0,serial_rule_idx=0;
|
||||||
UNUSED int transection_success=1;
|
UNUSED int transection_success=1;
|
||||||
struct _Maat_cmd_inner_t* p=NULL,*n=NULL;
|
struct _Maat_cmd_inner_t* p=NULL,*n=NULL;
|
||||||
|
|
||||||
@@ -2211,12 +2236,22 @@ int Maat_cmd_commit(Maat_feather_t feather)
|
|||||||
|
|
||||||
if(_feather->AUTO_NUMBERING_ON==1)
|
if(_feather->AUTO_NUMBERING_ON==1)
|
||||||
{
|
{
|
||||||
data_reply=_wrap_redisCommand(ctx,"INCRBY SEQUENCE_REGION %d",new_region_num);
|
data_reply=_wrap_redisCommand(ctx,"INCRBY SEQUENCE_REGION %d",new_region_num);
|
||||||
|
if(data_reply->type!=REDIS_REPLY_INTEGER)
|
||||||
|
{
|
||||||
|
freeReplyObject(data_reply);
|
||||||
|
ret=-1;
|
||||||
|
goto error_out;
|
||||||
}
|
}
|
||||||
_feather->base_rgn_seq=data_reply->integer-new_region_num;
|
_feather->base_rgn_seq=data_reply->integer-new_region_num;
|
||||||
freeReplyObject(data_reply);
|
freeReplyObject(data_reply);
|
||||||
|
|
||||||
data_reply=_wrap_redisCommand(ctx,"INCRBY SEQUENCE_GROUP %d",new_group_num);
|
data_reply=_wrap_redisCommand(ctx,"INCRBY SEQUENCE_GROUP %d",new_group_num);
|
||||||
|
if(data_reply->type!=REDIS_REPLY_INTEGER)
|
||||||
|
{
|
||||||
|
freeReplyObject(data_reply);
|
||||||
|
ret=-1;
|
||||||
|
goto error_out;
|
||||||
}
|
}
|
||||||
_feather->base_grp_seq=data_reply->integer-new_group_num;
|
_feather->base_grp_seq=data_reply->integer-new_group_num;
|
||||||
freeReplyObject(data_reply);
|
freeReplyObject(data_reply);
|
||||||
@@ -2244,7 +2279,7 @@ error_out:
|
|||||||
}
|
}
|
||||||
_feather->cmd_qhead=_feather->cmd_qtail=NULL;
|
_feather->cmd_qhead=_feather->cmd_qtail=NULL;
|
||||||
_feather->cmd_q_cnt=0;
|
_feather->cmd_q_cnt=0;
|
||||||
|
|
||||||
for(i=0;i<serial_rule_num && s_rule;i++)
|
for(i=0;i<serial_rule_num && s_rule;i++)
|
||||||
{
|
{
|
||||||
empty_serial_rules(s_rule+i);
|
empty_serial_rules(s_rule+i);
|
||||||
@@ -2266,8 +2301,14 @@ long long Maat_cmd_incrby(Maat_feather_t feather,const char* key, int increment)
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
data_reply=_wrap_redisCommand(_feather->redis_write_ctx,"INCRBY %s %d", key, increment);
|
data_reply=_wrap_redisCommand(_feather->redis_write_ctx,"INCRBY %s %d", key, increment);
|
||||||
|
if(data_reply->type==REDIS_REPLY_INTEGER)
|
||||||
{
|
{
|
||||||
|
result=data_reply->integer;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
result=-1;
|
||||||
}
|
}
|
||||||
freeReplyObject(data_reply);
|
freeReplyObject(data_reply);
|
||||||
return result;
|
return result;
|
||||||
|
|||||||
@@ -482,6 +482,7 @@ error_out:
|
|||||||
int lqueue_destroy_cb(void *data, long data_len, void *arg)
|
int lqueue_destroy_cb(void *data, long data_len, void *arg)
|
||||||
{
|
{
|
||||||
assert(0);
|
assert(0);
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
void * HASH_fetch_by_id(MESA_htable_handle hash,int id)
|
void * HASH_fetch_by_id(MESA_htable_handle hash,int id)
|
||||||
{
|
{
|
||||||
@@ -901,7 +902,7 @@ void* create_bool_matcher(MESA_htable_handle compile_hash,int thread_num,void* l
|
|||||||
MESA_lqueue_head update_q=MESA_lqueue_create(0,0);;
|
MESA_lqueue_head update_q=MESA_lqueue_create(0,0);;
|
||||||
long data_size=0;
|
long data_size=0;
|
||||||
unsigned int mem_size=0;
|
unsigned int mem_size=0;
|
||||||
MESA_queue_errno_t q_ret=MESA_QUEUE_RET_OK;
|
UNUSED MESA_queue_errno_t q_ret=MESA_QUEUE_RET_OK;
|
||||||
data_size=sizeof(void*);
|
data_size=sizeof(void*);
|
||||||
universal_bool_expr_t* one_set=NULL;
|
universal_bool_expr_t* one_set=NULL;
|
||||||
universal_bool_expr_t* set_array=NULL;
|
universal_bool_expr_t* set_array=NULL;
|
||||||
@@ -982,7 +983,7 @@ void force_destroy_compile_rule(struct _Maat_compile_inner_t * p)
|
|||||||
void destroy_compile_rule(struct _Maat_compile_inner_t * p)
|
void destroy_compile_rule(struct _Maat_compile_inner_t * p)
|
||||||
{
|
{
|
||||||
int i=0;
|
int i=0;
|
||||||
struct _Maat_compile_inner_t* p_group=NULL;
|
UNUSED struct _Maat_compile_inner_t* p_group=NULL;
|
||||||
assert(p->group_cnt==0);
|
assert(p->group_cnt==0);
|
||||||
for(i=0;i<p->group_boundary;i++)
|
for(i=0;i<p->group_boundary;i++)
|
||||||
{
|
{
|
||||||
@@ -1155,7 +1156,7 @@ struct _Maat_scanner_t* create_maat_scanner(unsigned int version,_Maat_feather_t
|
|||||||
|
|
||||||
int i=0,j=0;
|
int i=0,j=0;
|
||||||
unsigned int sub_type=0;
|
unsigned int sub_type=0;
|
||||||
int ret=0;
|
UNUSED int ret=0;
|
||||||
|
|
||||||
MESA_htable_create_args_t hargs;
|
MESA_htable_create_args_t hargs;
|
||||||
memset(&hargs,0,sizeof(hargs));
|
memset(&hargs,0,sizeof(hargs));
|
||||||
@@ -1255,7 +1256,8 @@ struct _Maat_scanner_t* create_maat_scanner(unsigned int version,_Maat_feather_t
|
|||||||
void destroy_maat_scanner(struct _Maat_scanner_t*scanner)
|
void destroy_maat_scanner(struct _Maat_scanner_t*scanner)
|
||||||
{
|
{
|
||||||
long q_cnt=0,data_size=0;
|
long q_cnt=0,data_size=0;
|
||||||
int i=0,j=0,q_ret=0;
|
int i=0,j=0;
|
||||||
|
UNUSED int q_ret=0;
|
||||||
struct op_expr_t* op_expr=NULL;
|
struct op_expr_t* op_expr=NULL;
|
||||||
GIE_digest_t* digest_rule=NULL;
|
GIE_digest_t* digest_rule=NULL;
|
||||||
if(scanner==NULL)
|
if(scanner==NULL)
|
||||||
@@ -1386,7 +1388,7 @@ void rulescan_batch_update(rule_scanner_t rs_handle,MESA_lqueue_head expr_queue,
|
|||||||
int j=0,ret=0;
|
int j=0,ret=0;
|
||||||
unsigned int failed_ids[MAX_FAILED_NUM];
|
unsigned int failed_ids[MAX_FAILED_NUM];
|
||||||
char failed_info[512],*p=NULL;
|
char failed_info[512],*p=NULL;
|
||||||
MESA_queue_errno_t q_ret=MESA_QUEUE_RET_OK;
|
UNUSED MESA_queue_errno_t q_ret=MESA_QUEUE_RET_OK;
|
||||||
memset(failed_ids,0,sizeof(failed_ids));
|
memset(failed_ids,0,sizeof(failed_ids));
|
||||||
memset(failed_info,0,sizeof(failed_info));
|
memset(failed_info,0,sizeof(failed_info));
|
||||||
const long q_cnt=MESA_lqueue_get_count(expr_queue);
|
const long q_cnt=MESA_lqueue_get_count(expr_queue);
|
||||||
@@ -1475,7 +1477,7 @@ void digest_batch_update(GIE_handle_t* handle,MESA_lqueue_head update_q,void*log
|
|||||||
int ret=0;
|
int ret=0;
|
||||||
GIE_digest_t* digest_rule=NULL;
|
GIE_digest_t* digest_rule=NULL;
|
||||||
GIE_digest_t** update_array=NULL;
|
GIE_digest_t** update_array=NULL;
|
||||||
MESA_queue_errno_t q_ret=MESA_QUEUE_RET_OK;
|
UNUSED MESA_queue_errno_t q_ret=MESA_QUEUE_RET_OK;
|
||||||
const long q_cnt=MESA_lqueue_get_count(update_q);
|
const long q_cnt=MESA_lqueue_get_count(update_q);
|
||||||
if(q_cnt==0)
|
if(q_cnt==0)
|
||||||
{
|
{
|
||||||
@@ -2945,7 +2947,7 @@ void garbage_bagging(enum maat_garbage_type type,void *p,MESA_lqueue_head garbag
|
|||||||
}
|
}
|
||||||
void garbage_bury(MESA_lqueue_head garbage_q,int timeout,void *logger)
|
void garbage_bury(MESA_lqueue_head garbage_q,int timeout,void *logger)
|
||||||
{
|
{
|
||||||
MESA_queue_errno_t q_ret=MESA_QUEUE_RET_OK;
|
UNUSED MESA_queue_errno_t q_ret=MESA_QUEUE_RET_OK;
|
||||||
_maat_garbage_t* bag=NULL;
|
_maat_garbage_t* bag=NULL;
|
||||||
long data_size=0;
|
long data_size=0;
|
||||||
const long q_cnt=MESA_lqueue_get_count(garbage_q);
|
const long q_cnt=MESA_lqueue_get_count(garbage_q);
|
||||||
@@ -3390,7 +3392,8 @@ void *thread_rule_monitor(void *arg)
|
|||||||
const char* inc_cfg_dir=(const char*)feather->inc_dir;
|
const char* inc_cfg_dir=(const char*)feather->inc_dir;
|
||||||
struct _Maat_scanner_t* old_scanner=NULL;
|
struct _Maat_scanner_t* old_scanner=NULL;
|
||||||
long expr_wait_q_cnt=0;
|
long expr_wait_q_cnt=0;
|
||||||
int scan_dir_cnt=0,ret=0;
|
int scan_dir_cnt=0;
|
||||||
|
UNUSED int ret=0;
|
||||||
char maat_name[16];//Defined by prctl: The name can be up to 16 bytes long,and should
|
char maat_name[16];//Defined by prctl: The name can be up to 16 bytes long,and should
|
||||||
// be null terminated if it contains fewer bytes.
|
// be null terminated if it contains fewer bytes.
|
||||||
if(strlen(feather->instance_name)>0)
|
if(strlen(feather->instance_name)>0)
|
||||||
|
|||||||
@@ -402,6 +402,7 @@ struct _Maat_feather_t
|
|||||||
int AUTO_NUMBERING_ON;
|
int AUTO_NUMBERING_ON;
|
||||||
struct timeval connect_timeout;
|
struct timeval connect_timeout;
|
||||||
redisContext *redis_read_ctx;
|
redisContext *redis_read_ctx;
|
||||||
|
time_t last_reconnect_time;
|
||||||
redisContext *redis_write_ctx; // not thread safe.
|
redisContext *redis_write_ctx; // not thread safe.
|
||||||
int on_redis_writing;
|
int on_redis_writing;
|
||||||
int cmd_q_cnt;
|
int cmd_q_cnt;
|
||||||
|
|||||||
@@ -21,6 +21,7 @@
|
|||||||
#define MIN(a, b) (((a) < (b)) ? (a) : (b))
|
#define MIN(a, b) (((a) < (b)) ? (a) : (b))
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#define UNUSED __attribute__((unused))
|
||||||
const char* module_name_str(const char*name);
|
const char* module_name_str(const char*name);
|
||||||
#define maat_module (module_name_str("MAAT_Frame"))
|
#define maat_module (module_name_str("MAAT_Frame"))
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user