允许在运行过程加载新的json文件。

This commit is contained in:
zhengchao
2018-12-02 22:50:20 +08:00
parent fbdc331b23
commit 03edeb90b7
12 changed files with 436 additions and 137 deletions

View File

@@ -499,7 +499,8 @@ Maat_feather_t Maat_feather(int max_thread_num,const char* table_info_path,void*
}
int Maat_set_feather_opt(Maat_feather_t feather,enum MAAT_INIT_OPT type,const void* value,int size)
{
_Maat_feather_t* _feather=(_Maat_feather_t*)feather;
_Maat_feather_t* _feather=(_Maat_feather_t*)feather;
struct stat attrib;
int intval=0,ret=-1;
switch(type)
{
@@ -557,36 +558,45 @@ int Maat_set_feather_opt(Maat_feather_t feather,enum MAAT_INIT_OPT type,const vo
}
_feather->scan_interval_ms=intval;
break;
case MAAT_OPT_FULL_CFG_DIR:
if(size>(int)sizeof(_feather->full_dir))
case MAAT_OPT_FULL_CFG_DIR:
assert(_feather->input_mode==SOURCE_NONE);
if(size>(int)sizeof(_feather->iris_ctx.full_dir))
{
return -1;
}
memcpy(_feather->full_dir,(const char*)value,size);
memcpy(_feather->iris_ctx.full_dir,(const char*)value,size);
_feather->input_mode=SOURCE_IRIS_FILE;
break;
case MAAT_OPT_INC_CFG_DIR:
if(size>(int)sizeof(_feather->inc_dir))
if(size>(int)sizeof(_feather->iris_ctx.inc_dir))
{
return -1;
}
memcpy(_feather->inc_dir,(const char*)value,size);
memcpy(_feather->iris_ctx.inc_dir,(const char*)value,size);
break;
case MAAT_OPT_JSON_FILE_PATH:
assert(_feather->input_mode==SOURCE_NONE);
ret=json2iris((const char*)value
,_feather->compile_tn,_feather->group_tn
,NULL
,_feather->full_dir
,sizeof(_feather->full_dir)
,_feather->json_ctx.iris_file
,sizeof(_feather->json_ctx.iris_file)
,_feather->logger);
if(ret<0)
{
return -1;
}
memcpy(_feather->inc_dir,_feather->full_dir,sizeof(_feather->inc_dir));
memcpy(_feather->json_ctx.json_file, value, size);
stat(_feather->json_ctx.json_file, &attrib);
_feather->json_ctx.last_md5_time=attrib.st_ctime;
md5_file(_feather->json_ctx.json_file, _feather->json_ctx.effective_json_md5);
MESA_handle_runtime_log(_feather->logger,RLOG_LV_INFO,maat_module ,
"Maat initial with JSON file %s,generate index file %s OK."
,(const char*)value
,_feather->full_dir);
"Maat initial with JSON file %s md5: %s,generate index file %s OK.",
_feather->json_ctx.json_file,
_feather->json_ctx.effective_json_md5,
_feather->json_ctx.iris_file);
_feather->input_mode=SOURCE_JSON_FILE;
break;
case MAAT_OPT_STAT_ON:
_feather->stat_on=1;
@@ -623,11 +633,13 @@ int Maat_set_feather_opt(Maat_feather_t feather,enum MAAT_INIT_OPT type,const vo
memcpy(_feather->decrypt_key,value,size);
break;
case MAAT_OPT_REDIS_IP:
assert(_feather->input_mode==SOURCE_NONE);
if((size_t)size>sizeof(_feather->mr_ctx.redis_ip))
{
return -1;
}
memcpy(_feather->mr_ctx.redis_ip,value,size);
_feather->input_mode=SOURCE_REDIS;
break;
case MAAT_OPT_REDIS_PORT:
if((size_t)size==sizeof(unsigned short))
@@ -696,61 +708,70 @@ int Maat_set_feather_opt(Maat_feather_t feather,enum MAAT_INIT_OPT type,const vo
}
void maat_read_full_config(_Maat_feather_t* _feather)
{
struct maat_redis_ctx* mr_ctx=&(_feather->mr_ctx);
if(strlen(mr_ctx->redis_ip)>0&&mr_ctx->redis_port!=0)
struct source_redis_ctx* mr_ctx=NULL;
switch(_feather->input_mode)
{
_feather->REDIS_MODE_ON=1;
MESA_handle_runtime_log(_feather->logger,RLOG_LV_INFO,maat_module ,
"Maat initiate from Redis %s:%hu db%d."
,mr_ctx->redis_ip
,mr_ctx->redis_port
,mr_ctx->redis_db);
mr_ctx->read_ctx=connect_redis(mr_ctx->redis_ip, mr_ctx->redis_port, mr_ctx->redis_db, _feather->logger);
if(mr_ctx->read_ctx != NULL)
{
redis_monitor_traverse(_feather->maat_version
,mr_ctx
,maat_start_cb
,maat_update_cb
,maat_finish_cb
, _feather
,_feather->decrypt_key //Not used.
,_feather);
}
}
else
{
if(strlen(_feather->full_dir)==0)
{
MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_module ,
"At initiation: NO FULL_CFG_DIR or JSON_FILE_PATH. ");
return;
}
config_monitor_traverse(_feather->maat_version,
_feather->full_dir,
maat_start_cb,
maat_update_cb,
maat_finish_cb,
_feather,
_feather->decrypt_key,
_feather->logger);
}
if(_feather->update_tmp_scanner==NULL)
{
if(_feather->REDIS_MODE_ON==1)
{
MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_module ,
"At initiation: no avilable rule in redis in %s:%hu"
,mr_ctx->redis_ip
,mr_ctx->redis_port);
}
else
{
MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_module ,
"At initiation: no valid index file in %s",_feather->full_dir);
}
case SOURCE_REDIS:
mr_ctx=&(_feather->mr_ctx);
MESA_handle_runtime_log(_feather->logger,RLOG_LV_INFO,maat_module ,
"Maat initiate from Redis %s:%hu db%d."
,mr_ctx->redis_ip
,mr_ctx->redis_port
,mr_ctx->redis_db);
mr_ctx->read_ctx=connect_redis(mr_ctx->redis_ip, mr_ctx->redis_port, mr_ctx->redis_db, _feather->logger);
if(mr_ctx->read_ctx != NULL)
{
redis_monitor_traverse(_feather->maat_version
,mr_ctx
,maat_start_cb
,maat_update_cb
,maat_finish_cb
, _feather
,_feather->decrypt_key //Not used.
,_feather);
}
if(_feather->update_tmp_scanner)
{
MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_module ,
"At initiation: no avilable rule in redis in %s:%hu"
,mr_ctx->redis_ip
,mr_ctx->redis_port);
}
break;
case SOURCE_IRIS_FILE:
config_monitor_traverse(_feather->maat_version,
_feather->iris_ctx.full_dir,
maat_start_cb,
maat_update_cb,
maat_finish_cb,
_feather,
_feather->decrypt_key,
_feather->logger);
if(!_feather->update_tmp_scanner)
{
MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_module ,
"At initiation: NO effective rule in %s.",
_feather->iris_ctx.full_dir);
}
break;
case SOURCE_JSON_FILE:
config_monitor_traverse(_feather->maat_version,
_feather->json_ctx.iris_file,
maat_start_cb,
maat_update_cb,
maat_finish_cb,
_feather,
_feather->decrypt_key,
_feather->logger);
if(!_feather->update_tmp_scanner)
{
MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_module ,
"At initiation: NO efffective rule in JSON generate %s.",
_feather->json_ctx.iris_file);
}
break;
default:
break;
}
_feather->scanner=_feather->update_tmp_scanner;
_feather->update_tmp_scanner=NULL;

View File

@@ -92,7 +92,7 @@ redisContext * connect_redis(const char*redis_ip, int redis_port, int redis_db,
}
int connect_redis_for_write(struct maat_redis_ctx* mr_ctx, void* logger)
int connect_redis_for_write(struct source_redis_ctx* mr_ctx, void* logger)
{
assert(mr_ctx->write_ctx==NULL);
mr_ctx->write_ctx=connect_redis(mr_ctx->redis_ip, mr_ctx->redis_port, mr_ctx->redis_db, logger);
@@ -1728,7 +1728,7 @@ void get_foreign_conts(redisContext *ctx, struct serial_rule_t* rule_list, int r
return;
}
void redis_monitor_traverse(long long version, struct maat_redis_ctx* mr_ctx,
void redis_monitor_traverse(long long version, struct source_redis_ctx* mr_ctx,
void (*start)(long long,int ,void*),//vesion,CM_UPDATE_TYPE_*,u_para
int (*update)(const char* ,const char*,void* ),//table name ,line ,u_para
void (*finish)(void*),//u_para
@@ -2287,7 +2287,7 @@ int Maat_cmd_commit(Maat_feather_t feather)
redisContext* write_ctx=NULL;
redisReply* data_reply=NULL;
struct serial_rule_t* s_rule=NULL;
struct serial_rule_t* s_rule=NULL;
if(_feather->input_mode!=SOURCE_REDIS)
{
return -1;

View File

@@ -32,7 +32,7 @@
#include "stream_fuzzy_hash.h"
#include "gram_index_engine.h"
int MAAT_FRAME_VERSION_2_4_20181129=1;
int MAAT_FRAME_VERSION_2_4_20181202=1;
const char* CHARSET_STRING[]={"NONE","gbk","big5","unicode","utf8","bin",
"unicode_ascii_esc","unicode_ascii_aligned","unicode_ncr_dec","unicode_ncr_hex","url_encode_gb2312","url_encode_utf8",""};
@@ -3439,11 +3439,14 @@ int maat_update_cb(const char* table_name,const char* line,void *u_para)
void *thread_rule_monitor(void *arg)
{
struct _Maat_feather_t *feather=(struct _Maat_feather_t *)arg;
const char* inc_cfg_dir=(const char*)feather->inc_dir;
struct _Maat_scanner_t* old_scanner=NULL;
long expr_wait_q_cnt=0;
int scan_dir_cnt=0;
UNUSED int ret=0;
int ret=0;
char md5_tmp[MD5_DIGEST_LENGTH*2+1]={0};
char tmp_dir[MAX_TABLE_NAME_LEN]={0};
struct stat attrib;
char maat_name[16];//Defined by prctl: The name can be up to 16 bytes long,and should
// be null terminated if it contains fewer bytes.
if(strlen(feather->instance_name)>0)
@@ -3470,29 +3473,78 @@ void *thread_rule_monitor(void *arg)
scan_dir_cnt++;
if(0==pthread_mutex_trylock(&(feather->backgroud_update_mutex)))
{
if(feather->REDIS_MODE_ON==1)
switch(feather->input_mode)
{
redis_monitor_traverse(feather->maat_version
,&(feather->mr_ctx)
,maat_start_cb
,maat_update_cb
,maat_finish_cb
,feather
,feather->decrypt_key //Not used.
,feather);
case SOURCE_REDIS:
redis_monitor_traverse(feather->maat_version,
&(feather->mr_ctx),
maat_start_cb,
maat_update_cb,
maat_finish_cb,
feather,
feather->decrypt_key, //Not used.
feather);
break;
case SOURCE_IRIS_FILE:
config_monitor_traverse(feather->maat_version,
feather->iris_ctx.inc_dir,
maat_start_cb,
maat_update_cb,
maat_finish_cb,
feather,
feather->decrypt_key,
feather->logger);
break;
case SOURCE_JSON_FILE:
memset(md5_tmp, 0, sizeof(md5_tmp));
memset(tmp_dir, 0, sizeof(tmp_dir));
stat(feather->json_ctx.json_file, &attrib);
if(attrib.st_ctime!=feather->json_ctx.last_md5_time)
{
feather->json_ctx.last_md5_time=attrib.st_ctime;
md5_file(feather->json_ctx.json_file, md5_tmp);
if(0!=strcmp(md5_tmp,feather->json_ctx.effective_json_md5))
{
ret=json2iris(feather->json_ctx.json_file,
feather->compile_tn, feather->group_tn,
NULL,
tmp_dir,
sizeof(tmp_dir),
feather->logger);
if(ret<0)
{
MESA_handle_runtime_log(feather->logger,RLOG_LV_INFO,maat_module ,
"Maat re-initiate with JSON file %s failed, md5: %s",
feather->json_ctx.json_file,
md5_tmp);
}
else
{
strcpy(feather->json_ctx.effective_json_md5, md5_tmp);
strcpy(feather->json_ctx.iris_file, tmp_dir);
config_monitor_traverse(0,
feather->json_ctx.iris_file,
maat_start_cb,
maat_update_cb,
maat_finish_cb,
feather,
feather->decrypt_key,
feather->logger);
MESA_handle_runtime_log(feather->logger,RLOG_LV_INFO,maat_module ,
"Maat re-initiate with JSON file %s success, md5: %s",
feather->json_ctx.json_file,
md5_tmp);
}
}
}
break;
default:
assert(0);
break;
}
else
{
config_monitor_traverse(feather->maat_version,
inc_cfg_dir,
maat_start_cb,
maat_update_cb,
maat_finish_cb,
feather,
feather->decrypt_key,
feather->logger);
}
if(feather->update_tmp_scanner!=NULL)
{
old_scanner=feather->scanner;
@@ -3567,7 +3619,7 @@ void *thread_rule_monitor(void *arg)
alignment_int64_array_free(feather->hit_cnt);
alignment_int64_array_free(feather->orphan_group_saving);
alignment_int64_array_free(feather->last_region_saving);
if(feather->REDIS_MODE_ON==1)
if(feather->input_mode==SOURCE_REDIS)
{
if(feather->mr_ctx.read_ctx)
{

View File

@@ -2,6 +2,7 @@
#include <stdio.h>
#include <ctype.h>
#include <stdlib.h>
#include <openssl/md5.h>
#include "Maat_utils.h"
pid_t gettid()
@@ -177,10 +178,43 @@ int system_cmd_mv(const char* src_file,const char*dst_file)
snprintf(cmd,sizeof(cmd), "mv %s %s", src_file, dst_file);
return system(cmd);
}
int system_cmd_cp(const char* src_file,const char*dst_file)
{
char cmd[MAX_SYSTEM_CMD_LEN] = { 0 };
snprintf(cmd,sizeof(cmd), "cp -f %s %s", src_file, dst_file);
return system(cmd);
}
int system_cmd_rm(const char* src_file)
{
char cmd[MAX_SYSTEM_CMD_LEN] = { 0 };
snprintf(cmd,sizeof(cmd), "rm %s -f", src_file);
return system(cmd);
}
char* md5_file(const char* filename, char* md5string)
{
FILE* fp=NULL;
int i=0;
unsigned char md5[MD5_DIGEST_LENGTH];
struct stat file_info;
stat(filename, &file_info);
size_t file_size=file_info.st_size;
fp=fopen(filename,"r");
if(fp==NULL)
{
return NULL;
}
char* file_buff=(char*)malloc(file_size);
fread(file_buff,1,file_size,fp);
fclose(fp);
MD5((const unsigned char *)(file_buff), (unsigned long)(file_size), md5);
for(i = 0; i < MD5_DIGEST_LENGTH; ++i)
{
sprintf(&md5string[i*2], "%02x", (unsigned int)md5[i]);
}
free(file_buff);
return md5string;
}

View File

@@ -17,6 +17,7 @@
#include "alignment_int64.h"
#include <pthread.h>
#include <iconv.h>
#include <openssl/md5.h>
extern const char *maat_module;
@@ -359,16 +360,6 @@ struct rule_tag
char* tag_name;
char* tag_val;
};
struct maat_redis_ctx
{
redisContext *read_ctx;
redisContext *write_ctx;
char redis_ip[64];
int redis_port;
int redis_db;
time_t last_reconnect_time;
};
struct _Maat_scanner_t
{
long long version;
@@ -394,6 +385,34 @@ struct _Maat_scanner_t
int max_thread_num;
iconv_t iconv_handle[MAX_CHARSET_NUM][MAX_CHARSET_NUM];//iconv_handle[to][from]
};
enum data_source
{
SOURCE_NONE=0,
SOURCE_REDIS,
SOURCE_IRIS_FILE,
SOURCE_JSON_FILE
};
struct source_iris_ctx
{
char inc_dir[MAX_TABLE_NAME_LEN];
char full_dir[MAX_TABLE_NAME_LEN];
};
struct source_json_ctx
{
char json_file[MAX_TABLE_NAME_LEN];
char iris_file[MAX_TABLE_NAME_LEN];
char effective_json_md5[MD5_DIGEST_LENGTH*2+1];
time_t last_md5_time;
};
struct source_redis_ctx
{
redisContext *read_ctx;
redisContext *write_ctx;
char redis_ip[64];
int redis_port;
int redis_db;
time_t last_reconnect_time;
};
struct _Maat_feather_t
{
struct _Maat_scanner_t *scanner;
@@ -403,6 +422,13 @@ struct _Maat_feather_t
int DEFERRED_LOAD_ON;
int GROUP_MODE_ON;
int REDIS_MODE_ON;
enum data_source input_mode;
union
{
struct source_iris_ctx iris_ctx;
struct source_json_ctx json_ctx;
struct source_redis_ctx mr_ctx;
};
int still_working;
int scan_interval_ms;
int effect_interval_ms;
@@ -416,8 +442,7 @@ struct _Maat_feather_t
long long last_full_version;
int scan_thread_num;
int rule_scan_type;
char inc_dir[MAX_TABLE_NAME_LEN];
char full_dir[MAX_TABLE_NAME_LEN];
char stat_file[MAX_TABLE_NAME_LEN];
char instance_name[MAX_TABLE_NAME_LEN];
char table_info_fn[MAX_TABLE_NAME_LEN];
@@ -426,8 +451,7 @@ struct _Maat_feather_t
pthread_mutex_t backgroud_update_mutex;
unsigned char decrypt_key[MAX_TABLE_NAME_LEN];
pthread_t cfg_mon_t;
struct maat_redis_ctx mr_ctx;
int AUTO_NUMBERING_ON;
// redisContext *redis_write_ctx; // not thread safe.
@@ -542,8 +566,9 @@ void empty_serial_rules(struct serial_rule_t* rule);
int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_rule_num, long long server_time, void* logger);
long long redis_server_time(redisContext* ctx);
redisContext * connect_redis(const char*redis_ip, int redis_port, int redis_db, void* logger);
char* md5_file(const char* filename, char* md5string);
void redis_monitor_traverse(long long version, struct maat_redis_ctx* mr_ctx,
void redis_monitor_traverse(long long version, struct source_redis_ctx* mr_ctx,
void (*start)(long long,int ,void*),//vesion,CM_UPDATE_TYPE_*,u_para
int (*update)(const char* ,const char*,void* ),//table name ,line ,u_para
void (*finish)(void*),//u_para

View File

@@ -44,4 +44,7 @@ char* str_unescape(char* s);
pid_t gettid(void);
int system_cmd_mkdir(const char* path);
int system_cmd_rm(const char* src_file);
int system_cmd_mv(const char* src_file,const char*dst_file);
int system_cmd_cp(const char* src_file,const char*dst_file);
char* md5_file(const char* filename, char* md5string);

View File

@@ -6,6 +6,8 @@ global:
*GIE_*;
#for test
*my_scandir*;
*md5_file*;
*system_cmd_*;
};
local: *;
};