支持内容外键,即某一列指向redis中的一个key,将其变成文件路径。

This commit is contained in:
zhengchao
2018-09-24 18:49:18 +08:00
parent 16ff0886c9
commit e2f4a583ad
11 changed files with 656 additions and 208 deletions

View File

@@ -1,6 +1,7 @@
#include "Maat_command.h"
#include "Maat_rule.h"
#include "Maat_rule_internal.h"
#include "Maat_utils.h"
#include "config_monitor.h"
#include "map_str2int.h"
#include "hiredis.h"
@@ -292,7 +293,15 @@ void empty_serial_rules(struct serial_rule_t* rule)
if(rule->table_line!=NULL)
{
free(rule->table_line);
rule->table_line=NULL;
}
if(rule->n_foreign>0)
{
for(int i=0; i<rule->n_foreign; i++)
{
free(rule->f_keys[i].filename);
free(rule->f_keys[i].key);
}
free(rule->f_keys);
}
memset(rule,0,sizeof(struct serial_rule_t));
return;
@@ -1145,7 +1154,6 @@ void _exec_serial_rule(redisContext* ctx, long long version, struct serial_rule_
return;
}
#define MAX_REDIS_OP_PER_SRULE 8
int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_rule_num, long long server_time, void* logger)
{
int max_redis_batch=1*1024,batch_cnt=0;
@@ -1154,6 +1162,7 @@ int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_r
unsigned int i=0;
unsigned int multi_cmd_cnt=0;
const int MAX_REDIS_OP_PER_SRULE=8;
unsigned int max_multi_cmd_num=MAX_REDIS_OP_PER_SRULE*serial_rule_num+2;// 2 for operation in _exec_serial_rule_end()
struct expected_reply_t *expected_reply=(struct expected_reply_t*)calloc(sizeof(struct expected_reply_t), max_multi_cmd_num);
@@ -1371,6 +1380,208 @@ void cleanup_update_status(redisContext *ctx, void *logger)
,entry_num);
}
const char* find_Nth_column(const char* line, int Nth, int* column_len)
{
int i=0, j=0;
int start=0, end=0;
for(i=0;i<(int)strlen(line);i++)
{
if(line[i]==' '||line[i]=='\t')
{
j++;
}
if(j==Nth-1)
{
start=i+1;
}
if(j==Nth)
{
end=i+1;
break;
}
}
if(start==0)
{
return NULL;
}
if(end==0)
{
end=i;
}
*column_len=end-start;
return line+start;
}
char* get_foreign_cont_filename(const char* table_name, int rule_id, const char* foreign_key, const char* dir)
{
char* filename=NULL;
char buffer[512];
snprintf(buffer, sizeof(buffer),"%s/%s/%d%s",dir, table_name, rule_id, foreign_key);
filename=(char*)calloc(sizeof(char), strlen(buffer)+1);
memcpy(filename, buffer, strlen(buffer));
return filename;
}
void rewrite_table_line_with_foreign(struct serial_rule_t*p)
{
int origin_column_size=0;
const char* origin_column=NULL, *pos_origin_line=NULL;
char* pos_rewrite_line=NULL;
char* rewrite_line=NULL;
size_t fn_size=0;
int i=0;
for(i=0; i<p->n_foreign; i++)
{
fn_size+=strlen(p->f_keys[i].filename);
}
rewrite_line=(char*)calloc(sizeof(char), strlen(p->table_line)+fn_size);
pos_origin_line=p->table_line;
pos_rewrite_line=rewrite_line;
for(i=0; i<p->n_foreign; i++)
{
origin_column=find_Nth_column(p->table_line, p->f_keys[i].column, &origin_column_size);
strncat(pos_rewrite_line, pos_origin_line, origin_column-pos_origin_line);
pos_rewrite_line+=origin_column-pos_origin_line;
pos_origin_line+=origin_column_size;
strncat(pos_rewrite_line, p->f_keys[i].filename, strlen(p->f_keys[i].filename));
pos_rewrite_line+=strlen(p->f_keys[i].filename);
}
strncat(pos_rewrite_line, pos_origin_line, strlen(p->table_line)-(pos_origin_line-p->table_line));
pos_rewrite_line+=strlen(p->f_keys[i].filename);
free(p->table_line);
p->table_line=rewrite_line;
return;
}
int get_foreign_keys(redisContext *ctx, struct serial_rule_t* rule_list, int rule_num, _Maat_feather_t* feather, const char* dir,void *logger)
{
int ret=0, table_id=0, i=0, j=0;
int foregin_key_size=0;
int rule_with_foreign_key=0;
const char* foreign_source_prefix="redis://";
const char* foreign_key_prefix="__FILE_";
const char* p_foregin=NULL;
struct _Maat_table_info_t* p_table=NULL;
for(i=0; i<rule_num; i++)
{
if(rule_list[i].table_line==NULL)
{
continue;
}
ret=map_str2int(feather->map_tablename2id, rule_list[i].table_name, &table_id);
if(ret<0)
{
continue;
}
p_table=feather->p_table_info[table_id];
if(p_table->table_type!=TABLE_TYPE_PLUGIN||p_table->n_foreign==0)
{
continue;
}
rule_list[i].n_foreign=p_table->n_foreign;
rule_list[i].f_keys=(struct foreign_key*)calloc(sizeof(struct foreign_key), rule_list[i].n_foreign);
for(j=0;j<p_table->n_foreign;j++)
{
p_foregin=find_Nth_column(rule_list[i].table_line, p_table->foreign_columns[j], &foregin_key_size);
if(p_foregin==NULL)
{
MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_redis_monitor
, "Get %s,%d foreign keys failed: No %dth column."
, rule_list[i].table_name, rule_list[i].rule_id, p_table->foreign_columns[j]);
continue;
}
if(0!=strncasecmp(p_foregin, foreign_source_prefix, strlen(foreign_source_prefix)))
{
MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_redis_monitor
,"Get %s,%d foreign key failed: Invalid source prefix %s."
, rule_list[i].table_name, rule_list[i].rule_id, p_foregin);
continue;
}
rule_list[i].f_keys[j].column=p_table->foreign_columns[j];
foregin_key_size=foregin_key_size+1-strlen(foreign_source_prefix);
p_foregin+=strlen(foreign_source_prefix);
if(0!=strncasecmp(p_foregin, foreign_key_prefix, strlen(foreign_key_prefix)))
{
MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor
,"%s,%d foreign key prefix %s is discouraged."
, rule_list[i].table_name, rule_list[i].rule_id, p_foregin);
}
rule_list[i].f_keys[j].key=(char*)calloc(sizeof(char),foregin_key_size);
memcpy(rule_list[i].f_keys[j].key, p_foregin, foregin_key_size);
rule_list[i].f_keys[j].filename=get_foreign_cont_filename(rule_list[i].table_name, rule_list[i].rule_id, p_foregin, dir);
}
rule_with_foreign_key++;
}
return rule_with_foreign_key;
}
void get_foreign_conts(redisContext *ctx, struct serial_rule_t* rule_list, int rule_num, void *logger)
{
int i=0, j=0, ret=0;
int key_num=0;
char redis_cmd[256];
redisReply* reply=NULL;
struct serial_rule_t*p=NULL;
char** filenames[MAX_FOREIGN_CLMN_NUM];
FILE* fp=NULL;
for(i=0;i<rule_num;i++)
{
p=rule_list+i;
if(p->op==MAAT_OP_DEL||p->n_foreign==0)
{
continue;
}
for(j=0; j<p->n_foreign; j++)
{
snprintf(redis_cmd,sizeof(redis_cmd),"GET %s", p->f_keys[j].key);
ret=redisAppendCommand(ctx, redis_cmd);
key_num++;
assert(ret==REDIS_OK);
}
}
for(i=0;i<rule_num;i++)
{
p=rule_list+i;
if(p->op==MAAT_OP_DEL||p->n_foreign==0)
{
continue;
}
for(j=0; j<p->n_foreign; j++)
{
ret=_wrap_redisGetReply(ctx,&reply);
if(reply->type!=REDIS_REPLY_STRING)
{
MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor
,"Get %s,%d foreign key %s content failed."
,rule_list[i].table_name
,rule_list[i].rule_id
,p->f_keys[j].key);
continue;
}
else
{
fp=fopen(p->f_keys[j].filename, "w");
if(fp==NULL)
{
MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor
, "Write foreign content failed: fopen %s error."
, filenames[j]);
}
else
{
fwrite(reply->str, 1, reply->len, fp);
fclose(fp);
fp=NULL;
}
}
freeReplyObject(reply);
}
}
return;
}
void redis_monitor_traverse(long long version,redisContext *c,
void (*start)(long long,int ,void*),//vesion,CM_UPDATE_TYPE_*,u_para
@@ -1380,7 +1591,7 @@ void redis_monitor_traverse(long long version,redisContext *c,
const unsigned char* dec_key,
_Maat_feather_t* feather)
{
int table_id=0,i=0,rule_num=0,empty_value_num=0;
int table_id=0,i=0, j=0, rule_num=0,empty_value_num=0;
int ret=0;
struct serial_rule_t* rule_list=NULL;
int update_type=CM_UPDATE_TYPE_INC;
@@ -1432,6 +1643,11 @@ void redis_monitor_traverse(long long version,redisContext *c,
{
MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_redis_monitor,"%d of %d rules are empty.",empty_value_num,rule_num);
}
ret=get_foreign_keys(c, rule_list, rule_num, feather, feather->foreign_cont_dir, logger);
if(ret>0)
{
get_foreign_conts(c, rule_list, rule_num, logger);
}
}
start(new_version,update_type,u_para);
@@ -1458,11 +1674,41 @@ void redis_monitor_traverse(long long version,redisContext *c,
,rule_list[i].table_line);
continue;
}
}
if(rule_list[i].n_foreign>0)
{
rewrite_table_line_with_foreign(rule_list+i);
}
update(rule_list[i].table_name,rule_list[i].table_line,u_para);
update(rule_list[i].table_name,rule_list[i].table_line,u_para);
if(rule_list[i].n_foreign&&rule_list[i].op==MAAT_OP_DEL)
{
for(j=0; j<rule_list[i].n_foreign; j++)
{
if(feather->foreign_cont_linger==0)
{
ret=system_cmd_rm(rule_list[i].f_keys[j].filename);
if(ret==-1)
{
MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_module,
"Foreign content file %s remove failed.",
rule_list[i].f_keys[j].filename);
}
}
else if(feather->foreign_cont_linger>0)
{
garbage_bagging_with_timeout(GARBAGE_FOREIGN_FILE, rule_list[i].f_keys[j].filename, feather->foreign_cont_linger, feather->garbage_q);
rule_list[i].f_keys[j].filename=NULL;//transfer owner to garbage collection.
}
else
{
//Less than 0, don't delete.
}
}
}
}
finish(u_para);
finish(u_para);
clean_up:
for(i=0;i<rule_num;i++)