@@ -13,20 +13,24 @@ const char* maat_redis_monitor="MAAT_REDIS_MONITOR";
const char * maat_redis_command = " MAAT_REDIS_COMMAND " ;
const char * rm_key_prefix [ 2 ] = { " OBSOLETE_RULE " , " EFFECTIVE_RULE " } ;
const char * rm_status_key = " MAAT_UPDATE_STATUS " ;
const char * rm_status_sset = " MAAT_UPDATE_STATUS " ;
const char * rm_expire_sset = " MAAT_EXPIRE_TIMER " ;
const char * rm_label_sset = " MAAT_LABEL_INDEX " ;
const int MAAT_REDIS_SYNC_TIME = 30 * 60 ;
struct serial_rule_t //rm= Redis Maat
{
enum MAAT_OPERATION op ; //0: delete, 1: add.
int rule_id ;
int label_id ;
long long timeout ; // absolute unix time.
char table_name [ 256 ] ;
char * table_line ;
} ;
struct _Maat_cmd_inner_t
{
struct Maat_comman d_t cmd ;
struct Maat_cm d_t cmd ;
enum MAAT_OPERATION op ;
int ref_cnt ;
int region_size [ MAX_EXPR_ITEM_NUM ] ;
@@ -65,6 +69,16 @@ long long read_redis_integer(const redisReply* reply)
}
return 0 ;
}
long long redis_server_time ( redisContext * ctx )
{
long long server_time = 0 ;
redisReply * data_reply = NULL ;
data_reply = _wrap_redisCommand ( ctx , " TIME " ) ;
assert ( data_reply - > type = = REDIS_REPLY_INTEGER ) ;
server_time = data_reply - > integer ;
freeReplyObject ( data_reply ) ;
return server_time ;
}
enum MAAT_TABLE_TYPE type_region2table ( const struct Maat_region_t * p )
{
enum MAAT_TABLE_TYPE ret = TABLE_TYPE_IP ;
@@ -151,6 +165,43 @@ void invalidate_line(char* line, enum MAAT_TABLE_TYPE type,int valid_column_seq)
line [ i ] = ' 0 ' ;
return ;
}
int del_rule_from_redis ( redisContext * ctx , struct serial_rule_t * s_rule , long long new_version )
{
int append_cmd_cnt = 0 <EFBFBD> <EFBFBD>
redisAppendCommand ( ctx , " RENAME %s:%s,%d %s:%s,%d "
, rm_key_prefix [ MAAT_OP_ADD ]
, s_rule - > table_name
, s_rule - > rule_id
, rm_key_prefix [ MAAT_OP_DEL ]
, s_rule - > table_name
, s_rule - > rule_id
) ;
append_cmd_cnt + + ;
redisAppendCommand ( ctx , " EXPIRE %s:%s,%d %d " , rm_key_prefix [ MAAT_OP_DEL ]
, s_rule - > table_name
, s_rule - > rule_id
, MAAT_REDIS_SYNC_TIME ) ;
append_cmd_cnt + + ;
//NX: Don't update already exisiting elements. Always add new elements.
redisAppendCommand ( ctx , " ZADD %s NX %d DEL,%s,%d " , rm_status_sset
, new_version
, s_rule - > table_name
, s_rule - > rule_id ) ;
append_cmd_cnt + + ;
// Try to remove from expiration sorted set, no matter wheather it exists or not.
redisAppendCommand ( ctx , " ZREM %s %s,%d " , rm_expire_sset
, s_rule - > table_name
, s_rule - > rule_id ) ;
append_cmd_cnt + + ;
if ( s_rule - > label_id > 0 )
{
redisAppendCommand ( ctx , " ZREM %s %d " , rm_label_sset
, s_rule - > rule_id ) ;
}
return append_cmd_cnt ;
}
void serialize_region ( const struct Maat_region_t * p , int group_id , char * buff , int size )
{
int ret = 0 ;
@@ -229,10 +280,12 @@ void empty_serial_rules(struct serial_rule_t* rule)
memset ( rule , 0 , sizeof ( struct serial_rule_t ) ) ;
return ;
}
void set_serial_rule ( struct serial_rule_t * rule , enum MAAT_OPERATION op , int rule_id , const char * table_name , const char * line )
void set_serial_rule ( struct serial_rule_t * rule , enum MAAT_OPERATION op , int rule_id , int label_id , const char * table_name , const char * line , long long timeout )
{
rule - > op = op ;
rule - > rule_id = rule_id ;
rule - > label_id = label_id ;
rule - > timeout = timeout ;
assert ( strlen ( table_name ) < sizeof ( rule - > table_name ) ) ;
memcpy ( rule - > table_name , table_name , strlen ( table_name ) ) ;
if ( line ! = NULL )
@@ -293,16 +346,16 @@ int get_rm_key_list(unsigned int version,redisContext *c,struct serial_rule_t**
//Returns all the elements in the sorted set at key with a score that version < score <= version_in_redis.
//The elements are considered to be ordered from low to high scores(version).
reply = ( redisReply * ) redisCommand ( c , " ZRANGEBYSCORE %s (%d %d " , rm_status_key , version , version_in_redis ) ;
reply = ( redisReply * ) redisCommand ( c , " ZRANGEBYSCORE %s (%d %d " , rm_status_sset , version , version_in_redis ) ;
if ( reply = = NULL )
{
__redis_strerror_r ( errno , err_buff , sizeof ( err_buff ) ) ;
MESA_handle_runtime_log ( logger , RLOG_LV_FATAL , maat_redis_monitor ,
" GET %s failed %s. " , rm_status_key , err_buff ) ;
" GET %s failed %s. " , rm_status_sset , err_buff ) ;
return 0 ;
}
assert ( reply - > type = = REDIS_REPLY_ARRAY ) ;
tmp_reply = ( redisReply * ) redisCommand ( c , " ZSCORE %s %s " , rm_status_key , reply - > element [ 0 ] - > str ) ;
tmp_reply = _wrap_ redisCommand( c , " ZSCORE %s %s " , rm_status_sset , reply - > element [ 0 ] - > str ) ;
nearest_rule_version = read_redis_integer ( tmp_reply ) ;
freeReplyObject ( tmp_reply ) ;
tmp_reply = NULL ;
@@ -366,7 +419,7 @@ int calculate_serial_rule_num(struct _Maat_cmd_inner_t* _cmd,int * new_region_cn
{
int serial_num = 0 ;
int i = 0 ;
struct Maat_comman d_t * cmd = & ( _cmd - > cmd ) ;
struct Maat_cm d_t * cmd = & ( _cmd - > cmd ) ;
serial_num + + ; //compile rule
for ( i = 0 ; i < cmd - > group_num ; i + + )
{
@@ -389,7 +442,7 @@ int calculate_serial_rule_num(struct _Maat_cmd_inner_t* _cmd,int * new_region_cn
int reconstruct_cmd ( struct _Maat_feather_t * feather , struct _Maat_cmd_inner_t * _cmd )
{
int i = 0 , j = 0 , grp_idx = 0 ;
struct Maat_comman d_t * cmd = & ( _cmd - > cmd ) ;
struct Maat_cm d_t * cmd = & ( _cmd - > cmd ) ;
struct Maat_group_t * group_cmd = NULL ;
struct Maat_region_t * region_cmd = NULL ;
@@ -450,12 +503,17 @@ int build_serial_rule(_Maat_feather_t *feather,struct _Maat_cmd_inner_t* _cmd,st
struct Maat_group_t * p_group = NULL ;
struct Maat_region_t * p_region = NULL ;
struct Maat_rule_t * p_m_rule = NULL ;
struct Maat_comman d_t * cmd = & ( _cmd - > cmd ) ;
struct Maat_cm d_t * cmd = & ( _cmd - > cmd ) ;
enum MAAT_OPERATION op = _cmd - > op ;
int rule_num = 0 , i = 0 , j = 0 ;
p_m_rule = & ( cmd - > compile ) ;
char line [ 1024 ] ;
time_t timeout = 0 ;
if ( _cmd - > cmd . expire_after > 0 )
{
timeout = feather - > server_time + _cmd - > cmd . expire_after ;
}
if ( op = = MAAT_OP_ADD )
{
snprintf ( line , sizeof ( line ) , " %d \t %d \t %hhd \t %hhd \t %hhd \t 0 \t %s \t 1 \t %d " , p_m_rule - > config_id
@@ -465,28 +523,31 @@ int build_serial_rule(_Maat_feather_t *feather,struct _Maat_cmd_inner_t* _cmd,st
, p_m_rule - > do_log
, p_m_rule - > service_defined
, cmd - > group_num ) ;
set_serial_rule ( list + rule_num , MAAT_OP_ADD , cmd - > compile . config_id , feather - > compile_tn , line ) ;
set_serial_rule ( list + rule_num , MAAT_OP_ADD , cmd - > compile . config_id , cmd - > label_id , feather- > compile_tn , line , timeout );
}
else
{
set_serial_rule ( list + rule_num , MAAT_OP_DEL , cmd - > compile . config_id , feather - > compile_tn , NULL ) ;
set_serial_rule ( list + rule_num , MAAT_OP_DEL , cmd - > compile . config_id , cmd - > label_id , feather- > compile_tn , NULL , timeout );
}
rule_num + + ;
for ( i = 0 ; i < cmd - > group_num ; i + + )
{
p_group = & ( cmd - > groups [ i ] ) ;
if ( op = = MAAT_OP_ADD )
{
if ( feather - > AUTO_NUMBERING_ON = = 1 )
{
p_group - > group_id = feather - > base_grp_seq ;
feather - > base_grp_seq + + ;
}
snprintf ( line , sizeof ( line ) , " %d \t %d \t 1 " , p_group - > group_id
, p_m_rule - > config_id ) ;
set_serial_rule ( list + rule_num , MAAT_OP_ADD , p_group - > group_id , feather - > group_tn , line ) ;
set_serial_rule ( list + rule_num , MAAT_OP_ADD , p_group - > group_id , 0 , feather - > group_tn , line ) ;
}
else
{
set_serial_rule ( list + rule_num , MAAT_OP_DEL , p_group - > group_id , feather - > group_tn , NULL ) ;
set_serial_rule ( list + rule_num , MAAT_OP_DEL , p_group - > group_id , 0 , feather - > group_tn , NULL ) ;
}
rule_num + + ;
if ( p_group - > regions = = NULL ) //group reuse.
@@ -497,17 +558,20 @@ int build_serial_rule(_Maat_feather_t *feather,struct _Maat_cmd_inner_t* _cmd,st
{
p_region = & ( p_group - > regions [ j ] ) ;
if ( op = = MAAT_OP_ADD )
{
if ( feather - > AUTO_NUMBERING_ON = = 1 )
{
p_region - > region_id = feather - > base_rgn_seq ;
feather - > base_rgn_seq + + ;
}
serialize_region ( p_region , p_group - > group_id , line , sizeof ( line ) ) ;
set_serial_rule ( list + rule_num , MAAT_OP_ADD
, p_region - > region_id , p_region - > table_name , line ) ;
, p_region - > region_id , 0 , p_region - > table_name , line ) ;
}
else
{
set_serial_rule ( list + rule_num , MAAT_OP_DEL
, p_region - > region_id , p_region - > table_name , NULL ) ;
, p_region - > region_id , 0 , p_region - > table_name , NULL ) ;
}
rule_num + + ;
@@ -527,7 +591,7 @@ int mr_transaction_success(redisReply* data_reply)
return 1 ;
}
}
int fix_table_name ( _Maat_feather_t * feather , struct Maat_comman d_t * cmd )
int fix_table_name ( _Maat_feather_t * feather , struct Maat_cm d_t * cmd )
{
int i = 0 , j = 0 , ret = 0 ;
const char * table_name = NULL ;
@@ -558,7 +622,7 @@ int fix_table_name(_Maat_feather_t* feather,struct Maat_command_t* cmd)
if ( ret < 0 )
{
MESA_handle_runtime_log ( feather - > logger , RLOG_LV_FATAL , maat_module
, " Unknown table %s of Maat_comman d_t[%d]->group[%d]->region[%d]. "
, " Unknown table %s of Maat_cm d_t[%d]->group[%d]->region[%d]. "
, table_name , cmd - > compile . config_id , i , j ) ;
return - 1 ;
@@ -567,7 +631,7 @@ int fix_table_name(_Maat_feather_t* feather,struct Maat_command_t* cmd)
if ( table_type ! = feather - > p_table_info [ table_id ] - > table_type )
{
MESA_handle_runtime_log ( feather - > logger , RLOG_LV_FATAL , maat_module
, " Table %s not support region type %d of Maat_comman d_t[%d]->group[%d]->region[%d]. "
, " Table %s not support region type %d of Maat_cm d_t[%d]->group[%d]->region[%d]. "
, table_name
, p_region - > region_type
, cmd - > compile . config_id , i , j ) ;
@@ -579,7 +643,46 @@ int fix_table_name(_Maat_feather_t* feather,struct Maat_command_t* cmd)
}
return 0 ;
}
void check_maat_expiration ( redisContext * ctx , void * logger )
{
unsigned int i = 0 , s_rule_num = 0 ;
int ret = 0 , append_cmd_cnt = 0 ;
int is_success = 0 ;
redisReply * data_reply = NULL ;
struct serial_rule_t * s_rule = NULL ;
long long server_time = 0 , maat_redis_version = 0 ;
data_reply = _wrap_redisCommand ( ctx , " TIME " ) ;
server_time = data_reply - > element [ 0 ] . integer ;
freeReplyObject ( data_reply ) ;
data_reply = _wrap_redisCommand ( ctx , " ZRANGEBYSCORE %s -inf %lld " , rm_expire_sset , server_time ) ;
if ( data_reply - > type ! = REDIS_REPLY_ARRAY )
{
freeReplyObject ( data_reply ) ;
return ;
}
s_rule_num = data_reply - > elements ;
s_rule = ( struct serial_rule_t * ) calloc ( sizeof ( struct serial_rule_t ) * s_rule_num ) ;
for ( i = 0 ; i < s_rule_num ; i + + )
{
s_rule [ i ] . op = MAAT_OP_DEL ;
ret = sscanf ( data_reply - > element [ i ] . str , " %[^,],%d " , s_rule [ i ] . table_name , & ( s_rule . rule_id ) ) ;
assert ( ret = = 2 ) ;
}
is_success = exec_serial_rule ( ctx , s_rule , s_rule_num ) ;
if ( is_success = = 1 )
{
MESA_handle_runtime_log ( logger , RLOG_LV_INFO , maat_module
, " Succesfully expried %d rules in Redis. " , s_rule_num ) ;
}
else
{
MESA_handle_runtime_log ( logger , RLOG_LV_INFO , maat_module
, " Failed to expried %d rules in Redis. " , s_rule_num ) ;
}
free ( s_rule ) ;
return ;
}
void redis_monitor_traverse ( unsigned int version , redisContext * c ,
void ( * start ) ( unsigned int , int , void * ) , //vesion,CM_UPDATE_TYPE_*,u_para
void ( * update ) ( const char * , const char * , void * ) , //table name ,line ,u_para
@@ -598,6 +701,11 @@ void redis_monitor_traverse(unsigned int version,redisContext *c,
unsigned int new_version = 0 ;
enum MAAT_TABLE_TYPE table_type ;
void * logger = feather - > logger ;
if ( feather - > redis_write_ctx ! = NULL ) //authorized to write
{
//For thread safe, deliberately use redis_read_ctx but not redis_write_ctx.
check_maat_expiration ( feather - > redis_read_ctx , logger ) ;
}
rule_num = get_rm_key_list ( version , c , & rule_list , logger , & new_version , & update_type ) ;
if ( rule_num = = 0 )
{
@@ -635,8 +743,75 @@ void redis_monitor_traverse(unsigned int version,redisContext *c,
rule_list = NULL ;
return ;
}
int exec_serial_rule ( redisContext * ctx , struct serial_rule_t * s_rule , int serial_rule_num )
{
int append_cmd_cnt = 0 , i = 0 ;
long long maat_redis_version = 0 ;
redisReply * data_reply = NULL ;
int redis_transaction_success = 1 ;
data_reply = _wrap_redisCommand ( ctx , " WATCH MAAT_VERSION " ) ;
freeReplyObject ( data_reply ) ;
data_reply = _wrap_redisCommand ( ctx , " GET MAAT_VERSION " ) ;
maat_redis_version = read_redis_integer ( data_reply ) ;
maat_redis_version + + ;
freeReplyObject ( data_reply ) ;
data_reply = _wrap_redisCommand ( ctx , " MULTI " ) ;
freeReplyObject ( data_reply ) ;
append_cmd_cnt = 0 ;
for ( i = 0 ; i < serial_rule_num ; i + + )
{
if ( s_rule [ i ] . op = = MAAT_OP_ADD )
{
redisAppendCommand ( ctx , " SET %s:%s,%d %s " , rm_key_prefix [ MAAT_OP_ADD ]
, s_rule [ i ] . table_name
, s_rule [ i ] . rule_id
, s_rule [ i ] . table_line ) ;
append_cmd_cnt + + ;
//NX: Don't update already exisiting elements. Always add new elements.
redisAppendCommand ( ctx , " ZADD %s NX %lld ADD,%s,%d " , rm_status_sset
, maat_redis_version
, s_rule [ i ] . table_name
, s_rule [ i ] . rule_id ) ;
append_cmd_cnt + + ;
if ( s_rule [ i ] . timeout > 0 )
{
redisAppendCommand ( ctx , " ZADD %s NX %lld %s,%d " , rm_expire_sset
, s_rule [ i ] . timeout
, s_rule [ i ] . table_name
, s_rule [ i ] . rule_id ) ;
append_cmd_cnt + + ;
}
if ( s_rule [ i ] . label_id > 0 )
{
redisAppendCommand ( ctx , " ZADD %s NX %d %d " , rm_label_sset
, s_rule [ i ] . label_id
, s_rule [ i ] . rule_id ) ;
append_cmd_cnt + + ;
}
}
else
{
append_cmd_cnt + = del_rule_from_redis ( ctx , s_rule + i , maat_redis_version ) ;
}
void Maat_copy_region ( struct Maat_region_t * dst , const struct Maat_region_t * src )
}
redisAppendCommand ( ctx , " INCRBY MAAT_VERSION 1 " ) ;
append_cmd_cnt + + ;
redisAppendCommand ( ctx , " EXEC " ) ;
append_cmd_cnt + + ;
redis_transaction_success = 1 ;
for ( i = 0 ; i < append_cmd_cnt ; i + + )
{
_wrap_redisGetReply ( ctx , & data_reply ) ;
if ( 0 = = mr_transaction_success ( data_reply ) )
{
redis_transaction_success = 0 ;
}
freeReplyObject ( data_reply ) ;
}
return redis_transaction_success ;
}
void _maat_copy_region ( struct Maat_region_t * dst , const struct Maat_region_t * src )
{
memcpy ( dst , src , sizeof ( struct Maat_region_t ) ) ;
if ( src - > table_name ! = NULL )
@@ -668,7 +843,7 @@ void Maat_copy_region(struct Maat_region_t* dst,const struct Maat_region_t* src)
}
return ;
}
void M aat_empty_region( struct Maat_region_t * p )
void _m aat_empty_region( struct Maat_region_t * p )
{
free ( ( char * ) p - > table_name ) ;
p - > table_name = NULL ;
@@ -699,7 +874,7 @@ void Maat_empty_region(struct Maat_region_t* p)
return ;
}
struct Maat_comman d_t * Maat_create_coman d ( const struct Maat_rule_t * rule , int group_num )
struct Maat_cm d_t * Maat_create_cm d ( const struct Maat_rule_t * rule , int group_num , const char * label )
{
int i = 0 ;
struct _Maat_cmd_inner_t * _cmd = ( struct _Maat_cmd_inner_t * ) calloc ( sizeof ( struct _Maat_cmd_inner_t ) , 1 ) ;
@@ -719,21 +894,65 @@ struct Maat_command_t* Maat_create_comand(const struct Maat_rule_t* rule, int gr
_cmd - > cmd . groups [ i ] . regions = ( struct Maat_region_t * ) calloc ( sizeof ( struct Maat_region_t ) , 1 ) ;
_cmd - > region_size [ i ] = 1 ;
}
return ( struct Maat_comman d_t * ) _cmd ;
return ( struct Maat_cm d_t * ) _cmd ;
}
void Maat_cmd_set_group ( struct Maat_command_t * cmd , int which_ group, const char * group_name )
int Maat_cmd_set_group ( Maat_feather_t feather , int group_id , const struct Maat_region_t * region , enum MAAT_OPERATION op )
{
ass ert( which_group < cmd - > group_num ) ;
if ( cmd - > groups [ which_group ] . group_name ! = NULL )
_Maat_feath er_ t* _feather = ( _Maat_feather_t * ) feather ;
if ( _feather - > AUTO_NUMBERING_ON = = 1 )
{
free ( cmd - > groups [ which_group ] . group_name ) ;
return - 1 ;
}
cmd - > groups [ which_group ] . group_name = _maat_strdup ( group_name ) ;
//struct _Maat_group_inner_t* group_inner=NULL;
//group_inner=(struct _Maat_group_inner_t*)HASH_fetch_by_id(_feather->scanner->group_hash, group_id);
//NOT implemented yet.
assert ( 0 ) ;
return 0 ;
}
int Maat_cmd_set_line ( Maat_feather_t feather , const char * table_name , int rule_id , const char * line , int timeout , enum MAAT_OPERATION op )
{
_Maat_feather_t * _feather = ( _Maat_feather_t * ) feather ;
int ret = 0 , table_id = 0 , retry = 0 ;
struct serial_rule_t s_rule ;
long long absolute_expire_time = 0 ;
if ( _feather - > AUTO_NUMBERING_ON = = 1 )
{
return - 1 ;
}
ret = map_str2int ( _feather - > map_tablename2id , table_name , & table_id ) ;
if ( ret < 0 )
{
MESA_handle_runtime_log ( feather - > logger , RLOG_LV_FATAL , maat_module
, " Command set line id %d failed: unknown table %s. "
, rule_id
, table_name ) ;
return ;
return - 1 ;
}
void Maat_cmd_add_region ( struct Maat_command_t * cmd , int which_group , const struct Maat_region_t * region )
if ( TABLE_TYPE_PLUGIN ! = feather - > p_table_info [ table_id ] - > table_type )
{
MESA_handle_runtime_log ( feather - > logger , RLOG_LV_FATAL , maat_module
, " Command set line id %d failed: table %s is not a plugin table. "
, rule_id
, table_name ) ;
return - 1 ;
}
if ( timeout > 0 )
{
absolute_expire_time = redis_server_time ( _feather - > redis_write_ctx ) ;
absolute_expire_time + = timeout ;
}
set_serial_rule ( & s_rule , op , rule_id , table_name , line , absolute_expire_time ) ;
ret = 0 ;
while ( ! ret )
{
ret = exec_serial_rule ( _feather - > redis_write_ctx , & s_rule , 1 ) ;
retry + + ;
assert ( retry < 5 ) ;
}
return 0 ;
}
void Maat_add_region2cmd ( struct Maat_cmd_t * cmd , int which_group , const struct Maat_region_t * region )
{
struct _Maat_cmd_inner_t * _cmd = ( struct _Maat_cmd_inner_t * ) cmd ;
struct Maat_region_t * dst = NULL ;
@@ -748,11 +967,11 @@ void Maat_cmd_add_region(struct Maat_command_t* cmd,int which_group,const struct
}
dst = & ( p_group - > regions [ p_group - > region_num ] ) ;
p_group - > region_num + + ;
M aat_copy_region( dst , region ) ;
_m aat_copy_region( dst , region ) ;
return ;
}
void Maat_free_comman d ( struct Maat_comman d_t * cmd )
void Maat_free_cm d ( struct Maat_cm d_t * cmd )
{
struct _Maat_cmd_inner_t * _cmd = ( struct _Maat_cmd_inner_t * ) cmd ;
int i = 0 , j = 0 ;
@@ -765,9 +984,8 @@ void Maat_free_command(struct Maat_command_t* cmd)
{
for ( j = 0 ; j < cmd - > groups [ i ] . region_num ; j + + )
{
M aat_empty_region( & ( cmd - > groups [ i ] . regions [ j ] ) ) ;
_m aat_empty_region( & ( cmd - > groups [ i ] . regions [ j ] ) ) ;
}
free ( cmd - > groups [ i ] . group_name ) ;
free ( cmd - > groups [ i ] . regions ) ;
cmd - > groups [ i ] . regions = NULL ;
}
@@ -777,24 +995,24 @@ void Maat_free_command(struct Maat_command_t* cmd)
free ( _cmd ) ;
return ;
}
int Maat_format_comman d ( struct Maat_comman d_t * rule , char * buffer , int size )
int Maat_format_cm d ( struct Maat_cm d_t * rule , char * buffer , int size )
{
//TODO
return 0 ;
}
int Maat_comman d ( Maat_feather_t feather , struct Maat_comman d_t * raw_rule , enum MAAT_OPERATION op )
int Maat_cm d ( Maat_feather_t feather , struct Maat_cm d_t * raw_rule , enum MAAT_OPERATION op )
{
int ret = 0 ;
ret = Maat_append_comma nd ( feather , raw_rule , op ) ;
ret = Maat_cmd_ append ( feather , raw_rule , op ) ;
if ( ret < 0 )
{
return ret ;
}
ret = Maat_commit _command ( feather ) ;
ret = Maat_cmd _commit ( feather ) ;
return ret ;
}
int Maat_append_comma nd ( Maat_feather_t feather , struct Maat_comman d_t * cmd , enum MAAT_OPERATION op )
int Maat_cmd_ append ( Maat_feather_t feather , struct Maat_cm d_t * cmd , enum MAAT_OPERATION op )
{
_Maat_feather_t * _feather = ( _Maat_feather_t * ) feather ;
struct _Maat_cmd_inner_t * _cmd = ( struct _Maat_cmd_inner_t * ) cmd ;
@@ -829,7 +1047,7 @@ int Maat_append_command(Maat_feather_t feather,struct Maat_command_t* cmd,enum M
return 0 ;
}
int Maat_commit _command ( Maat_feather_t feather )
int Maat_cmd _commit ( Maat_feather_t feather )
{
_Maat_feather_t * _feather = ( _Maat_feather_t * ) feather ;
@@ -837,7 +1055,7 @@ int Maat_commit_command(Maat_feather_t feather)
long long maat_redis_version = 0 ;
int new_region_num = 0 , new_group_num = 0 ;
int serial_rule_num = 0 , serial_rule_idx = 0 ;
int redis_ transa ction_failed = 1 ;
int transe ction_success = 1 ;
struct _Maat_cmd_inner_t * p = NULL , * n = NULL ;
redisContext * ctx = NULL ;
@@ -870,7 +1088,10 @@ int Maat_commit_command(Maat_feather_t feather)
serial_rule_num + = calculate_serial_rule_num ( p , & new_region_num , & new_group_num ) ;
p = p - > next ;
}
_feather - > server_time = redis_server_time ( ctx ) ;
if ( feather - > AUTO_NUMBERING_ON = = 1 )
{
data_reply = _wrap_redisCommand ( ctx , " INCRBY SEQUENCE_REGION %d " , new_region_num ) ;
assert ( data_reply - > type = = REDIS_REPLY_INTEGER ) ;
_feather - > base_rgn_seq = data_reply - > integer - new_region_num ;
@@ -878,9 +1099,9 @@ int Maat_commit_command(Maat_feather_t feather)
data_reply = _wrap_redisCommand ( ctx , " INCRBY SEQUENCE_GROUP %d " , new_group_num ) ;
assert ( data_reply - > type = = REDIS_REPLY_INTEGER ) ;
_feather - > base_rgn _seq = data_reply - > integer - new_group_num ;
_feather - > base_grp _seq = data_reply - > integer - new_group_num ;
freeReplyObject ( data_reply ) ;
}
s_rule = ( struct serial_rule_t * ) calloc ( sizeof ( struct serial_rule_t ) , serial_rule_num ) ;
for ( i = 0 , p = _feather - > cmd_qhead ; i < _feather - > cmd_q_cnt ; i + + )
@@ -889,74 +1110,11 @@ int Maat_commit_command(Maat_feather_t feather)
p = p - > next ;
}
assert ( serial_rule_idx = = serial_rule_num ) ;
redis_ transa ction_failed = 1 ;
while ( redis_ transa ction_failed )
transe ction_success = 0 ;
while ( ! transe ction_success )
{
data_reply = _wrap_redisCommand ( ctx , " WATCH MAAT_VERSION " ) ;
freeReplyObject ( data_reply ) ;
data_reply = _wrap_redisCommand ( ctx , " GET MAAT_VERSION " ) ;
maat_redis_version = read_redis_integer ( data_reply ) ;
maat_redis_version + + ;
freeReplyObject ( data_reply ) ;
data_reply = _wrap_redisCommand ( ctx , " MULTI " ) ;
freeReplyObject ( data_reply ) ;
append_cmd_cnt = 0 ;
for ( i = 0 ; i < serial_rule_num ; i + + )
{
if ( s_rule [ i ] . op = = MAAT_OP_ADD )
{
redisAppendCommand ( ctx , " SET %s:%s,%d %s " , rm_key_prefix [ MAAT_OP_ADD ]
, s_rule [ i ] . table_name
, s_rule [ i ] . rule_id
, s_rule [ i ] . table_line ) ;
append_cmd_cnt + + ;
//NX: Don't update already exisiting elements. Always add new elements.
redisAppendCommand ( ctx , " ZADD %s NX %d ADD,%s,%d " , rm_status_key
, maat_redis_version
, s_rule [ i ] . table_name
, s_rule [ i ] . rule_id ) ;
append_cmd_cnt + + ;
}
else
{
redisAppendCommand ( ctx , " RENAME %s:%s,%d %s:%s,%d "
, rm_key_prefix [ MAAT_OP_ADD ]
, s_rule [ i ] . table_name
, s_rule [ i ] . rule_id
, rm_key_prefix [ MAAT_OP_DEL ]
, s_rule [ i ] . table_name
, s_rule [ i ] . rule_id
) ;
append_cmd_cnt + + ;
redisAppendCommand ( ctx , " EXPIRE %s:%s,%d %d " , rm_key_prefix [ MAAT_OP_DEL ]
, s_rule [ i ] . table_name
, s_rule [ i ] . rule_id
, MAAT_REDIS_SYNC_TIME ) ;
append_cmd_cnt + + ;
//NX: Don't update already exisiting elements. Always add new elements.
redisAppendCommand ( ctx , " ZADD %s NX %d DEL,%s,%d " , rm_status_key
, maat_redis_version
, s_rule [ i ] . table_name
, s_rule [ i ] . rule_id ) ;
append_cmd_cnt + + ;
}
}
redisAppendCommand ( ctx , " INCRBY MAAT_VERSION 1 " ) ;
append_cmd_cnt + + ;
redisAppendCommand ( ctx , " EXEC " ) ;
append_cmd_cnt + + ;
redis_transaction_failed = 0 ;
for ( i = 0 ; i < append_cmd_cnt ; i + + )
{
_wrap_redisGetReply ( ctx , & data_reply ) ;
if ( 0 = = mr_transaction_success ( data_reply ) )
{
redis_transaction_failed = 1 ;
}
freeReplyObject ( data_reply ) ;
}
if ( redis_transaction_failed = = 1 )
transection_success = exec_serial_rule ( ctx , s_rule , serial_rule_num ) ;
if ( transection_success ! = 1 )
{
retry + + ;
assert ( retry < 5 ) ;
@@ -968,7 +1126,7 @@ error_out:
for ( i = 0 ; i < _feather - > cmd_q_cnt ; i + + )
{
n = p - > next ;
Maat_free_comman d ( ( struct Maat_comman d_t * ) p ) ;
Maat_free_cm d ( ( struct Maat_cm d_t * ) p ) ;
p = n ;
}
_feather - > cmd_qhead = _feather - > cmd_qtail = NULL ;
@@ -981,4 +1139,31 @@ error_out:
free ( s_rule ) ;
return ret ;
}
long long Maat_cmd_incrby ( Maat_feather_t feather , const char * key , int increment )
{
_Maat_feather_t * _feather = ( _Maat_feather_t * ) feather ;
redisReply * data_reply = NULL ;
long long result = 0 ;
data_reply = _wrap_redisCommand ( _feather - > redis_write_ctx , " INCRBY %s %d " , key , increment ) ;
assert ( data_reply - > type = = REDIS_REPLY_INTEGER ) ;
result = data_reply - > integer ;
freeReplyObject ( data_reply ) ;
return result ;
}
int Maat_cmd_select ( Maat_feather_t feather , int label_id , int * output_ids , int size )
{
_Maat_feather_t * _feather = ( _Maat_feather_t * ) feather ;
redisReply * data_reply = NULL ;
unsigned int i = 0 ;
data_reply = _wrap_redisCommand ( _feather - > redis_write_ctx , " ZRANGEBYSCORE %s %d %d "
, rm_label_sset
, label_id
, label_id ) ;
for ( i = 0 ; i < data_reply - > element & & i < size ; i + + )
{
output_ids [ i ] = atoi ( data_reply - > element [ i ] - > integer ) ;
}
freeReplyObject ( data_reply ) ;
return i ;
}