修复_exec_serial_rule_begin中maat_redis_version多加1的bug。该bug影响主版本。
支持运行中暂停后台配置更新,通过MAAT_OPT_DISABLE_UPDATE选项设置。
This commit is contained in:
@@ -484,7 +484,8 @@ Maat_feather_t Maat_feather(int max_thread_num,const char* table_info_path,void*
|
||||
feather->AUTO_NUMBERING_ON=1;
|
||||
feather->connect_timeout.tv_sec=0;
|
||||
feather->connect_timeout.tv_usec=100*1000; // 100 ms
|
||||
pthread_mutex_init(&(feather->plugin_table_reg_mutex),NULL);
|
||||
feather->backgroud_update_enabled=1;
|
||||
pthread_mutex_init(&(feather->backgroud_update_mutex),NULL);
|
||||
pthread_mutex_init(&(feather->redis_write_lock),NULL);
|
||||
snprintf(feather->table_info_fn,sizeof(feather->table_info_fn),"%s",table_info_path);
|
||||
return feather;
|
||||
@@ -493,7 +494,41 @@ int Maat_set_feather_opt(Maat_feather_t feather,enum MAAT_INIT_OPT type,const vo
|
||||
{
|
||||
_Maat_feather_t* _feather=(_Maat_feather_t*)feather;
|
||||
int intval=0,ret=-1;
|
||||
if(_feather->still_working==1)// not allowed set after Maat_initiate_feather;
|
||||
switch(type)
|
||||
{
|
||||
case MAAT_OPT_DISABLE_UPDATE:
|
||||
intval=*((int*)value);
|
||||
if(_feather->backgroud_update_enabled!=intval)
|
||||
{
|
||||
if(intval==0)
|
||||
{
|
||||
pthread_mutex_lock(&(_feather->backgroud_update_mutex));
|
||||
MESA_handle_runtime_log(_feather->logger,RLOG_LV_INFO,maat_module ,
|
||||
"Background update is disabled, current version %lld."
|
||||
,_feather->maat_version);
|
||||
}
|
||||
else
|
||||
{
|
||||
pthread_mutex_unlock(&(_feather->backgroud_update_mutex));
|
||||
MESA_handle_runtime_log(_feather->logger,RLOG_LV_INFO,maat_module ,
|
||||
"Background update is enabled, current version %lld."
|
||||
,_feather->maat_version);
|
||||
}
|
||||
_feather->backgroud_update_enabled=intval;
|
||||
}
|
||||
else
|
||||
{
|
||||
MESA_handle_runtime_log(_feather->logger,RLOG_LV_INFO,maat_module ,
|
||||
"Duplicated operation, background update is ALREADY %s, current version %lld."
|
||||
,_feather->backgroud_update_enabled?"enabled":"disabled"
|
||||
,_feather->maat_version);
|
||||
|
||||
}
|
||||
return 0;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
if(_feather->still_working==1)//The following options are not allowed to set after initiation;
|
||||
{
|
||||
return -2;
|
||||
}
|
||||
@@ -501,7 +536,7 @@ int Maat_set_feather_opt(Maat_feather_t feather,enum MAAT_INIT_OPT type,const vo
|
||||
{
|
||||
case MAAT_OPT_EFFECT_INVERVAL_MS:
|
||||
intval=*(const int*)value;
|
||||
if(size!=sizeof(int)||intval<=0)
|
||||
if(size!=sizeof(int)||intval<0)
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
@@ -623,7 +658,12 @@ int Maat_set_feather_opt(Maat_feather_t feather,enum MAAT_INIT_OPT type,const vo
|
||||
_feather->cumulative_update_off=1;
|
||||
break;
|
||||
case MAAT_OPT_LOAD_VERSION_FROM:
|
||||
_feather->load_from_specific_version=*((long long*)value);
|
||||
_feather->load_version_from=*((long long*)value);
|
||||
_feather->backgroud_update_enabled=1;
|
||||
pthread_mutex_lock((&_feather->backgroud_update_mutex));
|
||||
MESA_handle_runtime_log(_feather->logger,RLOG_LV_INFO,maat_module ,
|
||||
"Maat load version from %lld, stops backgroud update."
|
||||
,_feather->load_version_from);
|
||||
break;
|
||||
default:
|
||||
return -1;
|
||||
@@ -853,11 +893,12 @@ int Maat_table_callback_register(Maat_feather_t feather,short table_id,
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
pthread_mutex_lock(&(_feather->plugin_table_reg_mutex));
|
||||
//plugin table register blocks background update.
|
||||
pthread_mutex_lock(&(_feather->backgroud_update_mutex));
|
||||
idx=p_table->cb_info->cb_plug_cnt;
|
||||
if(idx==MAX_PLUGING_NUM)
|
||||
{
|
||||
pthread_mutex_unlock(&(_feather->plugin_table_reg_mutex));
|
||||
pthread_mutex_unlock(&(_feather->backgroud_update_mutex));
|
||||
return -1;
|
||||
}
|
||||
p_table->cb_info->cb_plug_cnt++;
|
||||
@@ -885,7 +926,7 @@ int Maat_table_callback_register(Maat_feather_t feather,short table_id,
|
||||
finish(u_para);
|
||||
}
|
||||
}
|
||||
pthread_mutex_unlock(&(_feather->plugin_table_reg_mutex));
|
||||
pthread_mutex_unlock(&(_feather->backgroud_update_mutex));
|
||||
return 1;
|
||||
}
|
||||
int Maat_full_scan_string_detail(Maat_feather_t feather,int table_id
|
||||
|
||||
@@ -552,7 +552,7 @@ int get_rm_key_list(redisContext *c, long long instance_version, long long desir
|
||||
goto FULL_UPDATE;
|
||||
}
|
||||
MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
|
||||
"Inc Update form instance_version %lld to %lld (%lld entries).",instance_version,target_version,rule_num);
|
||||
"Inc Update form instance_version %lld to %lld (%d entries).",instance_version,target_version,rule_num);
|
||||
*list=s_rule_array;
|
||||
*update_type=CM_UPDATE_TYPE_INC;
|
||||
*new_version=target_version;
|
||||
@@ -935,7 +935,6 @@ long long _exec_serial_rule_begin(redisContext* ctx)
|
||||
redisReply* data_reply=NULL;
|
||||
data_reply=_wrap_redisCommand(ctx, "INCRBY MAAT_PRE_VER 1");
|
||||
maat_redis_version=read_redis_integer(data_reply);
|
||||
maat_redis_version++;
|
||||
freeReplyObject(data_reply);
|
||||
data_reply=_wrap_redisCommand(ctx,"MULTI");
|
||||
return maat_redis_version;
|
||||
@@ -1285,8 +1284,8 @@ void redis_monitor_traverse(long long version,redisContext *c,
|
||||
return;
|
||||
}
|
||||
|
||||
rule_num=get_rm_key_list(c, version, feather->load_from_specific_version, &new_version, &rule_list, &update_type, logger, feather->cumulative_update_off);
|
||||
feather->load_from_specific_version=0;//only valid for one time.
|
||||
rule_num=get_rm_key_list(c, version, feather->load_version_from, &new_version, &rule_list, &update_type, logger, feather->cumulative_update_off);
|
||||
feather->load_version_from=0;//only valid for one time.
|
||||
if(rule_num<0||(rule_num==0&&update_type==CM_UPDATE_TYPE_INC))//error or nothing changed
|
||||
{
|
||||
return;
|
||||
@@ -1317,6 +1316,8 @@ void redis_monitor_traverse(long long version,redisContext *c,
|
||||
MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_redis_monitor,"%d of %d rules are empty.",empty_value_num,rule_num);
|
||||
}
|
||||
}
|
||||
start(new_version,update_type,u_para);
|
||||
MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor,"Start %s update: %lld->%lld (%d entries)\n",
|
||||
update_type==CM_UPDATE_TYPE_INC?"INC":"FULL",version,new_version,rule_num);
|
||||
for(i=0;i<rule_num;i++)
|
||||
{
|
||||
@@ -1340,8 +1341,10 @@ void redis_monitor_traverse(long long version,redisContext *c,
|
||||
continue;
|
||||
}
|
||||
}
|
||||
update(rule_list[i].table_name,rule_list[i].table_line,u_para);
|
||||
//printf("%s %s,%d\n", rule_list[i].op==MAAT_OP_DEL?"DEL":"ADD", rule_list[i].table_name, rule_list[i].rule_id);
|
||||
}
|
||||
finish(u_para);
|
||||
//printf("Finish %s update: %lld->%lld\n",update_type==CM_UPDATE_TYPE_INC?"INC":"FULL",version,new_version);
|
||||
clean_up:
|
||||
for(i=0;i<rule_num;i++)
|
||||
|
||||
@@ -30,7 +30,7 @@
|
||||
#include "stream_fuzzy_hash.h"
|
||||
#include "gram_index_engine.h"
|
||||
|
||||
int MAAT_FRAME_VERSION_2_2_20180607=1;
|
||||
int MAAT_FRAME_VERSION_2_2_20180617=1;
|
||||
|
||||
const char* CHARSET_STRING[]={"NONE","gbk","big5","unicode","utf8","bin",
|
||||
"unicode_ascii_esc","unicode_ascii_aligned","unicode_ncr_dec","unicode_ncr_hex","url_encode_gb2312","url_encode_utf8",""};
|
||||
@@ -802,6 +802,12 @@ void* create_bool_matcher(MESA_htable_handle compile_hash,int thread_num,void* l
|
||||
MESA_htable_iterate(compile_hash, walk_compile_hash, update_q);
|
||||
|
||||
const long q_cnt=MESA_lqueue_get_count(update_q);
|
||||
if(q_cnt==0)
|
||||
{
|
||||
MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_module,
|
||||
"No compile rule to build a bool matcher.");
|
||||
return NULL;
|
||||
}
|
||||
set_array=(universal_bool_expr_t*)malloc(sizeof(universal_bool_expr_t)*q_cnt);
|
||||
for(i=0;i<q_cnt;i++)
|
||||
{
|
||||
@@ -2087,7 +2093,7 @@ int add_group_rule(struct _Maat_table_info_t* table,struct db_group_rule_t* db_g
|
||||
if(ret<0)
|
||||
{
|
||||
MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_module,
|
||||
"update error,add %s group %d to compile %d error,compile rule is full or duplicate group."
|
||||
"update error,add group: %s %d to compile rule %d error, compile rule is full or duplicate group."
|
||||
,table->table_name[table->updating_name]
|
||||
,db_group_rule->group_id
|
||||
,db_group_rule->compile_id);
|
||||
@@ -2163,9 +2169,12 @@ int del_compile_rule(struct _Maat_table_info_t* table,struct db_compile_rule_t*
|
||||
return -1;
|
||||
}
|
||||
pthread_rwlock_wrlock(&(compile_rule->rwlock));
|
||||
free(compile_rule->db_c_rule->service_defined);
|
||||
free(compile_rule->db_c_rule);
|
||||
compile_rule->db_c_rule=NULL;
|
||||
if(compile_rule->db_c_rule!=NULL)
|
||||
{
|
||||
free(compile_rule->db_c_rule->service_defined);
|
||||
free(compile_rule->db_c_rule);
|
||||
compile_rule->db_c_rule=NULL;
|
||||
}
|
||||
pthread_rwlock_unlock(&(compile_rule->rwlock));
|
||||
|
||||
if(compile_rule->group_cnt==0)
|
||||
@@ -2198,7 +2207,7 @@ void update_group_rule(struct _Maat_table_info_t* table,const char* table_line,s
|
||||
if(ret<0)
|
||||
{
|
||||
MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_module ,
|
||||
"duplicate config of group table %s group_id %d compile_id %d.",table->table_name[table->conj_cnt]
|
||||
"duplicate config of group table %s group_id %d compile_id %d.",table->table_name[0]
|
||||
,db_group_rule.group_id
|
||||
,db_group_rule.compile_id);
|
||||
|
||||
@@ -3084,7 +3093,7 @@ void maat_finish_cb(void* u_para)
|
||||
feather->scanner->version=feather->maat_version;
|
||||
expr_wait_q_cnt=MESA_lqueue_get_count(feather->scanner->region_update_q);
|
||||
feather->postpone_q_size=expr_wait_q_cnt+feather->scanner->gie_total_q_size;
|
||||
if(time(NULL)-feather->scanner->last_update_time>feather->effect_interval_ms/1000)
|
||||
if(time(NULL)-feather->scanner->last_update_time>=feather->effect_interval_ms/1000)
|
||||
{
|
||||
do_scanner_update(feather->scanner
|
||||
,feather->garbage_q
|
||||
@@ -3204,77 +3213,79 @@ void *thread_rule_monitor(void *arg)
|
||||
{
|
||||
usleep(feather->scan_interval_ms*1000);
|
||||
scan_dir_cnt++;
|
||||
//plugin table register is not allowed during update;
|
||||
pthread_mutex_lock(&(feather->plugin_table_reg_mutex));
|
||||
if(feather->REDIS_MODE_ON==1)
|
||||
if(0==pthread_mutex_trylock(&(feather->backgroud_update_mutex)))
|
||||
{
|
||||
redis_monitor_traverse(feather->maat_version
|
||||
,feather->redis_read_ctx
|
||||
,maat_start_cb
|
||||
,maat_update_cb
|
||||
,maat_finish_cb
|
||||
,feather
|
||||
,feather->decrypt_key //Not used.
|
||||
,feather);
|
||||
}
|
||||
else
|
||||
{
|
||||
config_monitor_traverse(feather->maat_version,
|
||||
inc_cfg_dir,
|
||||
maat_start_cb,
|
||||
maat_update_cb,
|
||||
maat_finish_cb,
|
||||
feather,
|
||||
feather->decrypt_key,
|
||||
feather->logger);
|
||||
}
|
||||
pthread_mutex_unlock(&(feather->plugin_table_reg_mutex));
|
||||
if(feather->update_tmp_scanner!=NULL)
|
||||
{
|
||||
old_scanner=feather->scanner;
|
||||
//Some OS doesn't have __sync_lock_test_and_set.
|
||||
//feather->scanner=__sync_lock_test_and_set(&(feather->scanner),feather->update_tmp_scanner);
|
||||
feather->scanner=feather->update_tmp_scanner;
|
||||
if(old_scanner!=NULL)
|
||||
if(feather->REDIS_MODE_ON==1)
|
||||
{
|
||||
if(feather->scanner->version>old_scanner->version)
|
||||
redis_monitor_traverse(feather->maat_version
|
||||
,feather->redis_read_ctx
|
||||
,maat_start_cb
|
||||
,maat_update_cb
|
||||
,maat_finish_cb
|
||||
,feather
|
||||
,feather->decrypt_key //Not used.
|
||||
,feather);
|
||||
}
|
||||
else
|
||||
{
|
||||
config_monitor_traverse(feather->maat_version,
|
||||
inc_cfg_dir,
|
||||
maat_start_cb,
|
||||
maat_update_cb,
|
||||
maat_finish_cb,
|
||||
feather,
|
||||
feather->decrypt_key,
|
||||
feather->logger);
|
||||
}
|
||||
|
||||
if(feather->update_tmp_scanner!=NULL)
|
||||
{
|
||||
old_scanner=feather->scanner;
|
||||
//Some OS doesn't have __sync_lock_test_and_set.
|
||||
//feather->scanner=__sync_lock_test_and_set(&(feather->scanner),feather->update_tmp_scanner);
|
||||
feather->scanner=feather->update_tmp_scanner;
|
||||
if(old_scanner!=NULL)
|
||||
{
|
||||
if(feather->scanner->version>old_scanner->version)
|
||||
{
|
||||
MESA_handle_runtime_log(feather->logger,RLOG_LV_INFO,maat_module,
|
||||
"Maat version updated %d -> %d.",
|
||||
old_scanner->version, feather->scanner->version);
|
||||
}
|
||||
else
|
||||
{
|
||||
MESA_handle_runtime_log(feather->logger,RLOG_LV_FATAL,maat_module,
|
||||
"Maat version roll back %d -> %d.",
|
||||
old_scanner->version, feather->scanner->version);
|
||||
}
|
||||
assert(old_scanner->tomb_ref==feather->garbage_q);
|
||||
feather->zombie_rs_stream+=aligment_int64_array_sum(old_scanner->ref_cnt,old_scanner->max_thread_num);
|
||||
garbage_bagging(GARBAGE_SCANNER, old_scanner, feather->garbage_q);
|
||||
}
|
||||
feather->update_tmp_scanner=NULL;
|
||||
feather->maat_version=feather->scanner->version;
|
||||
feather->last_full_version=feather->scanner->version;
|
||||
}
|
||||
if(feather->scanner!=NULL)
|
||||
{
|
||||
expr_wait_q_cnt=MESA_lqueue_get_count(feather->scanner->region_update_q);
|
||||
feather->postpone_q_size=expr_wait_q_cnt+feather->scanner->gie_total_q_size;
|
||||
if(feather->postpone_q_size>0&&time(NULL)-feather->scanner->last_update_time>=feather->effect_interval_ms/1000)
|
||||
{
|
||||
do_scanner_update(feather->scanner
|
||||
,feather->garbage_q
|
||||
,feather->scan_thread_num
|
||||
,feather->logger);
|
||||
feather->postpone_q_size=0;
|
||||
MESA_handle_runtime_log(feather->logger,RLOG_LV_INFO,maat_module,
|
||||
"Maat version updated %d -> %d.",
|
||||
old_scanner->version, feather->scanner->version);
|
||||
"Actual udpate config version %u, %d entries load to rulescan after postpone.",
|
||||
feather->scanner->version,feather->scanner->cfg_num);
|
||||
}
|
||||
else
|
||||
{
|
||||
MESA_handle_runtime_log(feather->logger,RLOG_LV_FATAL,maat_module,
|
||||
"Maat version roll back %d -> %d.",
|
||||
old_scanner->version, feather->scanner->version);
|
||||
}
|
||||
assert(old_scanner->tomb_ref==feather->garbage_q);
|
||||
feather->zombie_rs_stream+=aligment_int64_array_sum(old_scanner->ref_cnt,old_scanner->max_thread_num);
|
||||
garbage_bagging(GARBAGE_SCANNER, old_scanner, feather->garbage_q);
|
||||
}
|
||||
feather->update_tmp_scanner=NULL;
|
||||
feather->maat_version=feather->scanner->version;
|
||||
feather->last_full_version=feather->scanner->version;
|
||||
}
|
||||
if(feather->scanner!=NULL)
|
||||
{
|
||||
expr_wait_q_cnt=MESA_lqueue_get_count(feather->scanner->region_update_q);
|
||||
feather->postpone_q_size=expr_wait_q_cnt+feather->scanner->gie_total_q_size;
|
||||
if(feather->postpone_q_size>0&&time(NULL)-feather->scanner->last_update_time>feather->effect_interval_ms/1000)
|
||||
{
|
||||
do_scanner_update(feather->scanner
|
||||
,feather->garbage_q
|
||||
,feather->scan_thread_num
|
||||
,feather->logger);
|
||||
feather->postpone_q_size=0;
|
||||
MESA_handle_runtime_log(feather->logger,RLOG_LV_INFO,maat_module,
|
||||
"Actual udpate config version %u, %d entries load to rulescan after postpone.",
|
||||
feather->scanner->version,feather->scanner->cfg_num);
|
||||
}
|
||||
pthread_mutex_unlock(&(feather->backgroud_update_mutex));
|
||||
}
|
||||
garbage_bury(feather->garbage_q,feather->effect_interval_ms/1000+10,feather->logger);
|
||||
if(feather->stat_on==1&&scan_dir_cnt%2==0)//output every 2 seconds
|
||||
if(feather->stat_on==1&&time(NULL)%2==0)//output every 2 seconds
|
||||
{
|
||||
maat_stat_output(feather);
|
||||
}
|
||||
|
||||
@@ -378,7 +378,7 @@ struct _Maat_feather_t
|
||||
char table_info_fn[MAX_TABLE_NAME_LEN];
|
||||
char compile_tn[MAX_TABLE_NAME_LEN];
|
||||
char group_tn[MAX_TABLE_NAME_LEN];
|
||||
pthread_mutex_t plugin_table_reg_mutex;
|
||||
pthread_mutex_t backgroud_update_mutex;
|
||||
unsigned char decrypt_key[MAX_TABLE_NAME_LEN];
|
||||
|
||||
char redis_ip[MAX_TABLE_NAME_LEN];
|
||||
@@ -393,13 +393,14 @@ struct _Maat_feather_t
|
||||
struct _Maat_cmd_inner_t* cmd_qhead, *cmd_qtail;
|
||||
pthread_mutex_t redis_write_lock; //protect redis_write_ctx
|
||||
long long base_rgn_seq,base_grp_seq,server_time;
|
||||
long long load_from_specific_version;
|
||||
long long load_version_from;
|
||||
//internal states
|
||||
long long new_version;
|
||||
int active_plugin_table_num;
|
||||
int is_last_plugin_table_updating;
|
||||
|
||||
//for stat>>>>
|
||||
int backgroud_update_enabled;
|
||||
screen_stat_handle_t stat_handle;
|
||||
int total_stat_id;
|
||||
int fs_status_id[MAX_MAAT_STAT_NUM];
|
||||
|
||||
Reference in New Issue
Block a user