[PATCH]add log handle for maat_wrap_redis_command
This commit is contained in:
@@ -19,6 +19,7 @@ extern "C"
|
||||
#include <stdint.h>
|
||||
#include <sys/queue.h>
|
||||
|
||||
#include "log/log.h"
|
||||
#include "hiredis/hiredis.h"
|
||||
#include "maat_command.h"
|
||||
#include "uthash/uthash.h"
|
||||
@@ -51,9 +52,10 @@ void maat_set_serial_rule(struct serial_rule *rule, enum maat_operation op,
|
||||
const char *line, long long timeout);
|
||||
|
||||
redisContext *maat_connect_redis(const char *redis_ip, int redis_port,
|
||||
int redis_db, struct log_handle *logger);
|
||||
int redis_db, struct log_handle *logger);
|
||||
|
||||
redisReply *maat_wrap_redis_command(redisContext *c, const char *format, ...);
|
||||
redisReply *maat_wrap_redis_command(redisContext *c, struct log_handle *logger,
|
||||
const char *format, ...);
|
||||
|
||||
int maat_wrap_redis_get_reply(redisContext *c, redisReply **reply);
|
||||
|
||||
@@ -68,10 +70,10 @@ int maat_cmd_write_rule(redisContext *c, struct serial_rule *s_rule,
|
||||
struct log_handle *logger);
|
||||
|
||||
int maat_get_redis_value(redisContext *c, struct serial_rule *rule_list,
|
||||
int rule_num, int print_process, struct log_handle *logger);
|
||||
int rule_num, int print_process, struct log_handle *logger);
|
||||
|
||||
int maat_get_foreign_keys_by_prefix(redisContext *ctx, struct serial_rule *rule_list,
|
||||
int rule_num, const char* dir, struct log_handle *logger);
|
||||
int rule_num, const char *dir, struct log_handle *logger);
|
||||
|
||||
void maat_get_foreign_conts(redisContext *c, struct serial_rule *rule_list,
|
||||
int rule_num, int print_fn, struct log_handle *logger);
|
||||
|
||||
@@ -333,8 +333,10 @@ static int get_region_seq(struct iris_description *iris_cfg)
|
||||
if (NULL == iris_cfg->redis_write_ctx) {
|
||||
sequence = iris_cfg->region_cnt + 1;
|
||||
} else {
|
||||
redisReply *data_reply = maat_wrap_redis_command(iris_cfg->redis_write_ctx,
|
||||
"INCRBY %s 1", mr_region_id_var);
|
||||
redisReply *data_reply =
|
||||
maat_wrap_redis_command(iris_cfg->redis_write_ctx, NULL,
|
||||
"INCRBY %s 1", mr_region_id_var);
|
||||
|
||||
sequence = (int)data_reply->integer;
|
||||
freeReplyObject(data_reply);
|
||||
}
|
||||
|
||||
@@ -35,7 +35,9 @@ extern const char *mr_label_sset;
|
||||
|
||||
extern const int MAAT_REDIS_SYNC_TIME;
|
||||
|
||||
redisReply *maat_wrap_redis_command(redisContext *c, const char *format, ...)
|
||||
redisReply *
|
||||
maat_wrap_redis_command(redisContext *c, struct log_handle *logger,
|
||||
const char *format, ...)
|
||||
{
|
||||
va_list ap;
|
||||
void *reply = NULL;
|
||||
@@ -47,6 +49,12 @@ redisReply *maat_wrap_redis_command(redisContext *c, const char *format, ...)
|
||||
reply = redisvCommand(c,format,ap);
|
||||
va_end(ap);
|
||||
if (NULL == reply) {
|
||||
if (logger != NULL) {
|
||||
log_fatal(logger, MODULE_MAAT_COMMAND,
|
||||
"[%s:%d]execute redis command error:%s",
|
||||
__FUNCTION__, __LINE__, c->errstr);
|
||||
}
|
||||
|
||||
ret = redisReconnect(c);
|
||||
retry++;
|
||||
}
|
||||
@@ -56,21 +64,24 @@ redisReply *maat_wrap_redis_command(redisContext *c, const char *format, ...)
|
||||
}
|
||||
|
||||
redisContext *maat_connect_redis(const char *redis_ip, int redis_port,
|
||||
int redis_db, struct log_handle *logger)
|
||||
int redis_db, struct log_handle *logger)
|
||||
{
|
||||
struct timeval connect_timeout;
|
||||
connect_timeout.tv_sec = 0;
|
||||
connect_timeout.tv_usec = 100 * 1000; // 100 ms
|
||||
|
||||
redisContext *c = redisConnectWithTimeout(redis_ip, redis_port, connect_timeout);
|
||||
redisContext *c = redisConnectWithTimeout(redis_ip, redis_port,
|
||||
connect_timeout);
|
||||
if (NULL == c || c->err) {
|
||||
if (NULL == logger) {
|
||||
printf("Unable to connect redis server %s:%d db%d, error: %s",
|
||||
redis_ip, redis_port, redis_db, c == NULL ? "Unknown" : c->errstr);
|
||||
redis_ip, redis_port, redis_db,
|
||||
c == NULL ? "Unknown" : c->errstr);
|
||||
} else {
|
||||
log_fatal(logger, MODULE_MAAT_COMMAND,
|
||||
"[%s:%d] Unable to connect redis server %s:%d db%d, error: %s",
|
||||
__FUNCTION__, __LINE__, redis_ip, redis_port, redis_db,
|
||||
"[%s:%d] Unable to connect redis server"
|
||||
" %s:%d db%d, error: %s", __FUNCTION__,
|
||||
__LINE__, redis_ip, redis_port, redis_db,
|
||||
c == NULL ? "Unknown" : c->errstr);
|
||||
}
|
||||
|
||||
@@ -81,7 +92,8 @@ redisContext *maat_connect_redis(const char *redis_ip, int redis_port,
|
||||
}
|
||||
|
||||
redisEnableKeepAlive(c);
|
||||
redisReply *reply = maat_wrap_redis_command(c, "select %d", redis_db);
|
||||
redisReply *reply =
|
||||
maat_wrap_redis_command(c, logger, "select %d", redis_db);
|
||||
freeReplyObject(reply);
|
||||
reply = NULL;
|
||||
|
||||
@@ -108,15 +120,17 @@ long long maat_read_redis_integer(const redisReply *reply)
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int redis_flushDB(redisContext *ctx, int db_index, struct log_handle *logger)
|
||||
static int
|
||||
redis_flushDB(redisContext *ctx, int db_index, struct log_handle *logger)
|
||||
{
|
||||
long long maat_redis_version = 0;
|
||||
|
||||
redisReply *data_reply = maat_wrap_redis_command(ctx, "WATCH MAAT_VERSION");
|
||||
redisReply *data_reply =
|
||||
maat_wrap_redis_command(ctx, logger, "WATCH MAAT_VERSION");
|
||||
freeReplyObject(data_reply);
|
||||
data_reply = NULL;
|
||||
|
||||
data_reply = maat_wrap_redis_command(ctx, "GET MAAT_VERSION");
|
||||
data_reply = maat_wrap_redis_command(ctx, logger, "GET MAAT_VERSION");
|
||||
if (data_reply->type == REDIS_REPLY_NIL) {
|
||||
maat_redis_version = 0;
|
||||
} else {
|
||||
@@ -126,12 +140,12 @@ static int redis_flushDB(redisContext *ctx, int db_index, struct log_handle *log
|
||||
data_reply = NULL;
|
||||
}
|
||||
|
||||
data_reply = maat_wrap_redis_command(ctx, "DBSIZE");
|
||||
data_reply = maat_wrap_redis_command(ctx, logger, "DBSIZE");
|
||||
long long dbsize = maat_read_redis_integer(data_reply);
|
||||
freeReplyObject(data_reply);
|
||||
data_reply = NULL;
|
||||
|
||||
data_reply = maat_wrap_redis_command(ctx, "MULTI");
|
||||
data_reply = maat_wrap_redis_command(ctx, logger, "MULTI");
|
||||
freeReplyObject(data_reply);
|
||||
data_reply = NULL;
|
||||
|
||||
@@ -165,7 +179,8 @@ static int redis_flushDB(redisContext *ctx, int db_index, struct log_handle *log
|
||||
if (redis_transaction_success == 1) {
|
||||
log_info(logger, MODULE_MAAT_COMMAND,
|
||||
"FlushDB %d, MAAT_VERSION:%llu, DBSize:%llu.",
|
||||
db_index, (maat_redis_version == 0)?0:(maat_redis_version-1),dbsize);
|
||||
db_index, (maat_redis_version == 0)?0:(maat_redis_version-1),
|
||||
dbsize);
|
||||
}
|
||||
|
||||
return redis_transaction_success;
|
||||
@@ -176,8 +191,8 @@ static int connect_redis_for_write(struct source_redis_ctx *redis_ctx,
|
||||
{
|
||||
assert(redis_ctx->write_ctx == NULL);
|
||||
redis_ctx->write_ctx = maat_connect_redis(redis_ctx->redis_ip,
|
||||
redis_ctx->redis_port,
|
||||
redis_ctx->redis_db, logger);
|
||||
redis_ctx->redis_port,
|
||||
redis_ctx->redis_db, logger);
|
||||
if (NULL == redis_ctx->write_ctx) {
|
||||
return -1;
|
||||
} else {
|
||||
@@ -234,7 +249,7 @@ long long maat_redis_server_time_s(redisContext *c)
|
||||
{
|
||||
long long server_time = 0;
|
||||
|
||||
redisReply *data_reply = maat_wrap_redis_command(c, "TIME");
|
||||
redisReply *data_reply = maat_wrap_redis_command(c, NULL, "TIME");
|
||||
if (data_reply->type == REDIS_REPLY_ARRAY) {
|
||||
server_time = atoll(data_reply->element[0]->str);
|
||||
freeReplyObject(data_reply);
|
||||
@@ -249,7 +264,8 @@ int maat_wrap_redis_get_reply(redisContext *c, redisReply **reply)
|
||||
return redisGetReply(c, (void **)reply);
|
||||
}
|
||||
|
||||
int maat_cmd_set_line(struct maat *maat_inst, const struct maat_cmd_line *line_rule)
|
||||
int maat_cmd_set_line(struct maat *maat_inst,
|
||||
const struct maat_cmd_line *line_rule)
|
||||
{
|
||||
int ret = 0;
|
||||
long long absolute_expire_time = 0;
|
||||
@@ -266,29 +282,36 @@ int maat_cmd_set_line(struct maat *maat_inst, const struct maat_cmd_line *line_r
|
||||
|
||||
struct serial_rule *s_rule = ALLOC(struct serial_rule, 1);
|
||||
|
||||
int table_id = table_manager_get_table_id(maat_inst->tbl_mgr, line_rule->table_name);
|
||||
int table_id = table_manager_get_table_id(maat_inst->tbl_mgr,
|
||||
line_rule->table_name);
|
||||
if (table_id < 0) {
|
||||
log_fatal(maat_inst->logger, MODULE_MAAT_COMMAND,
|
||||
"[%s:%d] Command set line id %lld failed: unknown table %s",
|
||||
__FUNCTION__, __LINE__, line_rule->rule_id, line_rule->table_name);
|
||||
"[%s:%d] Command set line id %lld failed: "
|
||||
"unknown table %s", __FUNCTION__, __LINE__,
|
||||
line_rule->rule_id, line_rule->table_name);
|
||||
FREE(s_rule);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int valid_column = table_manager_get_valid_column(maat_inst->tbl_mgr, table_id);
|
||||
int valid_column = table_manager_get_valid_column(maat_inst->tbl_mgr,
|
||||
table_id);
|
||||
if (valid_column < 0) {
|
||||
log_fatal(maat_inst->logger, MODULE_MAAT_COMMAND,
|
||||
"[%s:%d] Command set line id %lld failed: table %s is not a plugin or ip_plugin table",
|
||||
__FUNCTION__, __LINE__, line_rule->rule_id, line_rule->table_name);
|
||||
"[%s:%d] Command set line id %lld failed: "
|
||||
"table %s is not a plugin or ip_plugin table",
|
||||
__FUNCTION__, __LINE__, line_rule->rule_id,
|
||||
line_rule->table_name);
|
||||
FREE(s_rule);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int valid_offset = maat_get_valid_flag_offset(line_rule->table_line, valid_column);
|
||||
int valid_offset = maat_get_valid_flag_offset(line_rule->table_line,
|
||||
valid_column);
|
||||
if (valid_offset < 0) {
|
||||
log_fatal(maat_inst->logger, MODULE_MAAT_COMMAND,
|
||||
"[%s:%d] Command set line id %lld failed: table %s valid_offset error",
|
||||
__FUNCTION__, __LINE__, line_rule->rule_id, line_rule->table_name);
|
||||
"[%s:%d] Command set line id %lld failed: "
|
||||
"table %s valid_offset error", __FUNCTION__, __LINE__,
|
||||
line_rule->rule_id, line_rule->table_name);
|
||||
FREE(s_rule);
|
||||
return -1;
|
||||
}
|
||||
@@ -298,10 +321,12 @@ int maat_cmd_set_line(struct maat *maat_inst, const struct maat_cmd_line *line_r
|
||||
absolute_expire_time = server_time + line_rule->expire_after;
|
||||
}
|
||||
|
||||
maat_set_serial_rule(s_rule, (enum maat_operation)is_valid, line_rule->rule_id,
|
||||
line_rule->table_name, line_rule->table_line, absolute_expire_time);
|
||||
maat_set_serial_rule(s_rule, (enum maat_operation)is_valid,
|
||||
line_rule->rule_id, line_rule->table_name,
|
||||
line_rule->table_line, absolute_expire_time);
|
||||
|
||||
int success_cnt = maat_cmd_write_rule(write_ctx, s_rule, 1, server_time, maat_inst->logger);
|
||||
int success_cnt = maat_cmd_write_rule(write_ctx, s_rule, 1, server_time,
|
||||
maat_inst->logger);
|
||||
if (success_cnt != 1) {
|
||||
ret = -1;
|
||||
goto error_out;
|
||||
@@ -317,8 +342,9 @@ error_out:
|
||||
return ret;
|
||||
}
|
||||
|
||||
int maat_cmd_set_file(struct maat *maat_inst, const char *key, const char *value,
|
||||
size_t size, enum maat_operation op)
|
||||
#define ARG_VEC_NUM 3
|
||||
int maat_cmd_set_file(struct maat *maat_inst, const char *key,
|
||||
const char *value, size_t size, enum maat_operation op)
|
||||
{
|
||||
redisContext *ctx = maat_inst->opts.redis_ctx.write_ctx;
|
||||
if (NULL == ctx) {
|
||||
@@ -328,8 +354,8 @@ int maat_cmd_set_file(struct maat *maat_inst, const char *key, const char *value
|
||||
return -1;
|
||||
}
|
||||
|
||||
const char *arg_vec[3];
|
||||
size_t len_vec[3];
|
||||
const char *arg_vec[ARG_VEC_NUM];
|
||||
size_t len_vec[ARG_VEC_NUM];
|
||||
|
||||
arg_vec[0] = "SET";
|
||||
len_vec[0] = strlen("SET");
|
||||
@@ -343,24 +369,28 @@ int maat_cmd_set_file(struct maat *maat_inst, const char *key, const char *value
|
||||
redisReply *reply = NULL;
|
||||
if (0 != strncmp(key, foreign_key_prefix, strlen(foreign_key_prefix))) {
|
||||
log_fatal(maat_inst->logger, MODULE_MAAT_COMMAND,
|
||||
"Invalid File key, prefix %s is mandatory.", foreign_key_prefix);
|
||||
"Invalid File key, prefix %s is mandatory.",
|
||||
foreign_key_prefix);
|
||||
return -1;
|
||||
}
|
||||
|
||||
switch (op) {
|
||||
case MAAT_OP_ADD:
|
||||
reply = (redisReply *)redisCommandArgv(ctx, sizeof(arg_vec) / sizeof(arg_vec[0]),
|
||||
reply = (redisReply *)redisCommandArgv(ctx, ARG_VEC_NUM,
|
||||
arg_vec, len_vec);
|
||||
break;
|
||||
case MAAT_OP_DEL:
|
||||
reply = maat_wrap_redis_command(ctx, "EXPIRE %s %d", key, MAAT_REDIS_SYNC_TIME);
|
||||
reply = maat_wrap_redis_command(ctx, maat_inst->logger,
|
||||
"EXPIRE %s %d", key,
|
||||
MAAT_REDIS_SYNC_TIME);
|
||||
break;
|
||||
default:
|
||||
return -1;
|
||||
break;
|
||||
}
|
||||
|
||||
if (NULL == reply || reply->type == REDIS_REPLY_NIL || reply->type == REDIS_REPLY_ERROR) {
|
||||
if (NULL == reply || reply->type == REDIS_REPLY_NIL ||
|
||||
reply->type == REDIS_REPLY_ERROR) {
|
||||
log_fatal(maat_inst->logger, MODULE_MAAT_COMMAND,
|
||||
"Set file failed, maybe Redis is busy.");
|
||||
freeReplyObject(reply);
|
||||
@@ -373,7 +403,8 @@ int maat_cmd_set_file(struct maat *maat_inst, const char *key, const char *value
|
||||
return 1;
|
||||
}
|
||||
|
||||
long long maat_cmd_incrby(struct maat *maat_inst, const char *key, int increment)
|
||||
long long maat_cmd_incrby(struct maat *maat_inst, const char *key,
|
||||
int increment)
|
||||
{
|
||||
long long result = 0;
|
||||
|
||||
@@ -382,7 +413,10 @@ long long maat_cmd_incrby(struct maat *maat_inst, const char *key, int increment
|
||||
return -1;
|
||||
}
|
||||
|
||||
redisReply *data_reply = maat_wrap_redis_command(write_ctx, "INCRBY %s %d", key, increment);
|
||||
redisReply *data_reply =
|
||||
maat_wrap_redis_command(write_ctx, NULL, "INCRBY %s %d",
|
||||
key, increment);
|
||||
|
||||
if (data_reply->type == REDIS_REPLY_INTEGER) {
|
||||
result = data_reply->integer;
|
||||
} else {
|
||||
|
||||
@@ -402,9 +402,10 @@ get_inc_key_list(long long instance_version, long long target_version,
|
||||
return 0;
|
||||
}
|
||||
|
||||
redisReply *tmp_reply= maat_wrap_redis_command(c, "ZSCORE %s %s",
|
||||
mr_status_sset,
|
||||
reply->element[0]->str);
|
||||
redisReply *tmp_reply =
|
||||
maat_wrap_redis_command(c, logger, "ZSCORE %s %s",
|
||||
mr_status_sset,
|
||||
reply->element[0]->str);
|
||||
if (tmp_reply->type != REDIS_REPLY_STRING) {
|
||||
log_fatal(logger, MODULE_REDIS_MONITOR,
|
||||
"[%s:%d] ZSCORE %s %s failed Version: %lld->%lld",
|
||||
@@ -419,12 +420,15 @@ get_inc_key_list(long long instance_version, long long target_version,
|
||||
}
|
||||
|
||||
long long nearest_rule_version = maat_read_redis_integer(tmp_reply);
|
||||
freeReplyObject(tmp_reply);
|
||||
tmp_reply = NULL;
|
||||
|
||||
if (nearest_rule_version < 0) {
|
||||
log_fatal(logger, MODULE_REDIS_MONITOR,
|
||||
"[%s:%d]get nearest_rule_version failed, reply->type:%d",
|
||||
__FUNCTION__, __LINE__, tmp_reply->type);
|
||||
freeReplyObject(tmp_reply);
|
||||
return -1;
|
||||
}
|
||||
freeReplyObject(tmp_reply);
|
||||
|
||||
if (nearest_rule_version != instance_version + 1) {
|
||||
log_info(logger, MODULE_REDIS_MONITOR,
|
||||
@@ -601,7 +605,7 @@ FULL_UPDATE:
|
||||
reply = NULL;
|
||||
}
|
||||
|
||||
reply = maat_wrap_redis_command(c, "EXEC");
|
||||
reply = maat_wrap_redis_command(c, logger, "EXEC");
|
||||
if (NULL == reply) {
|
||||
log_fatal(logger, MODULE_REDIS_MONITOR,
|
||||
"[%s:%d] Redis Communication error: %s",
|
||||
@@ -844,8 +848,9 @@ void maat_rewrite_table_line_with_foreign(struct serial_rule *s_rule)
|
||||
s_rule->table_line = rewrite_line;
|
||||
}
|
||||
|
||||
static void expected_reply_add(struct expected_reply* expected, int s_rule_seq,
|
||||
int type, long long integer)
|
||||
static void
|
||||
expected_reply_add(struct expected_reply* expected, int s_rule_seq,
|
||||
int type, long long integer)
|
||||
{
|
||||
int i = expected->possible_reply_num;
|
||||
assert(i < REDIS_REPLY_SIZE);
|
||||
@@ -855,13 +860,15 @@ static void expected_reply_add(struct expected_reply* expected, int s_rule_seq,
|
||||
expected->possible_reply_num++;
|
||||
}
|
||||
|
||||
static int redlock_try_lock(redisContext *c, const char *lock_name,
|
||||
long long expire)
|
||||
static int
|
||||
redlock_try_lock(redisContext *c, const char *lock_name,
|
||||
long long expire)
|
||||
{
|
||||
int ret = 0;
|
||||
|
||||
redisReply *reply =
|
||||
maat_wrap_redis_command(c, "SET %s locked NX PX %lld", lock_name, expire);
|
||||
maat_wrap_redis_command(c, NULL, "SET %s locked NX PX %lld",
|
||||
lock_name, expire);
|
||||
if (reply->type == REDIS_REPLY_NIL) {
|
||||
ret = 0;
|
||||
} else {
|
||||
@@ -891,7 +898,7 @@ exec_serial_rule_begin(redisContext* c, size_t rule_num,
|
||||
}
|
||||
|
||||
if (rule_num > renew_rule_num) {
|
||||
data_reply = maat_wrap_redis_command(c, "INCRBY MAAT_PRE_VER 1");
|
||||
data_reply = maat_wrap_redis_command(c, NULL, "INCRBY MAAT_PRE_VER 1");
|
||||
*transaction_version = maat_read_redis_integer(data_reply);
|
||||
freeReplyObject(data_reply);
|
||||
data_reply = NULL;
|
||||
@@ -901,7 +908,7 @@ exec_serial_rule_begin(redisContext* c, size_t rule_num,
|
||||
}
|
||||
|
||||
if (*renew_allowed == 1 || rule_num > renew_rule_num) {
|
||||
data_reply = maat_wrap_redis_command(c, "MULTI");
|
||||
data_reply = maat_wrap_redis_command(c, NULL, "MULTI");
|
||||
freeReplyObject(data_reply);
|
||||
data_reply = NULL;
|
||||
ret = 0;
|
||||
@@ -912,7 +919,7 @@ exec_serial_rule_begin(redisContext* c, size_t rule_num,
|
||||
|
||||
static void redlock_unlock(redisContext *c, const char *lock_name)
|
||||
{
|
||||
redisReply *reply = maat_wrap_redis_command(c, "DEL %s", lock_name);
|
||||
redisReply *reply = maat_wrap_redis_command(c, NULL, "DEL %s", lock_name);
|
||||
freeReplyObject(reply);
|
||||
reply = NULL;
|
||||
}
|
||||
@@ -941,7 +948,7 @@ exec_serial_rule_end(redisContext *c, const char *transaction_list,
|
||||
|
||||
if (strlen(transaction_list) > 0) {
|
||||
data_reply =
|
||||
maat_wrap_redis_command(c, "eval %s 4 MAAT_VERSION %s %s %s %lld",
|
||||
maat_wrap_redis_command(c, NULL, "eval %s 4 MAAT_VERSION %s %s %s %lld",
|
||||
lua_exec_done, mr_status_sset, mr_version_sset,
|
||||
transaction_list, server_time);
|
||||
freeReplyObject(data_reply);
|
||||
@@ -950,7 +957,7 @@ exec_serial_rule_end(redisContext *c, const char *transaction_list,
|
||||
(*cnt)++;
|
||||
}
|
||||
|
||||
data_reply = maat_wrap_redis_command(c, "EXEC");
|
||||
data_reply = maat_wrap_redis_command(c, NULL, "EXEC");
|
||||
|
||||
return data_reply;
|
||||
}
|
||||
@@ -1225,7 +1232,7 @@ cleanup_update_status(redisContext *c, struct log_handle *logger)
|
||||
return;
|
||||
}
|
||||
|
||||
redisReply *reply = maat_wrap_redis_command(c, "MULTI");
|
||||
redisReply *reply = maat_wrap_redis_command(c, logger, "MULTI");
|
||||
freeReplyObject(reply);
|
||||
reply = NULL;
|
||||
|
||||
@@ -1246,7 +1253,7 @@ cleanup_update_status(redisContext *c, struct log_handle *logger)
|
||||
}
|
||||
|
||||
redisReply *sub_reply = NULL;
|
||||
reply = maat_wrap_redis_command(c, "EXEC");
|
||||
reply = maat_wrap_redis_command(c, logger, "EXEC");
|
||||
if (reply->type != REDIS_REPLY_ARRAY) {
|
||||
goto error_out;
|
||||
}
|
||||
@@ -1269,7 +1276,7 @@ cleanup_update_status(redisContext *c, struct log_handle *logger)
|
||||
reply = NULL;
|
||||
|
||||
//To deal with maat_version reset to 0, do NOT use -inf as lower bound intentionally.
|
||||
reply = maat_wrap_redis_command(c, "ZREMRANGEBYSCORE %s %lld %lld",
|
||||
reply = maat_wrap_redis_command(c, logger, "ZREMRANGEBYSCORE %s %lld %lld",
|
||||
mr_status_sset, version_lower_bound,
|
||||
version_upper_bound);
|
||||
entry_num = maat_read_redis_integer(reply);
|
||||
@@ -1298,7 +1305,7 @@ check_maat_expiration(redisContext *c, struct log_handle *logger)
|
||||
}
|
||||
|
||||
redisReply *data_reply =
|
||||
maat_wrap_redis_command(c, "ZRANGEBYSCORE %s -inf %lld",
|
||||
maat_wrap_redis_command(c, logger, "ZRANGEBYSCORE %s -inf %lld",
|
||||
mr_expire_sset, server_time);
|
||||
if (data_reply->type != REDIS_REPLY_ARRAY ||
|
||||
0 == data_reply->elements) {
|
||||
|
||||
100
src/maat_rule.c
100
src/maat_rule.c
@@ -55,7 +55,8 @@ maat_runtime_create(long long version, struct maat *maat_inst)
|
||||
maat_rt->sequence_map = maat_kv_store_new();
|
||||
maat_rt->logger = maat_inst->logger;
|
||||
maat_rt->ref_garbage_bin = maat_inst->garbage_bin;
|
||||
maat_rt->ref_cnt = alignment_int64_array_alloc(maat_inst->opts.nr_worker_thread);
|
||||
maat_rt->ref_cnt =
|
||||
alignment_int64_array_alloc(maat_inst->opts.nr_worker_thread);
|
||||
|
||||
return maat_rt;
|
||||
}
|
||||
@@ -79,17 +80,20 @@ static void maat_runtime_destroy(struct maat_runtime *maat_rt)
|
||||
FREE(maat_rt);
|
||||
}
|
||||
|
||||
static void maat_runtime_commit(struct maat_runtime *maat_rt, int update_type,
|
||||
long long maat_rt_version, struct log_handle *logger)
|
||||
static void
|
||||
maat_runtime_commit(struct maat_runtime *maat_rt, int update_type,
|
||||
long long maat_rt_version, struct log_handle *logger)
|
||||
{
|
||||
for (size_t i = 0; i < maat_rt->max_table_num; i++) {
|
||||
table_manager_commit_runtime(maat_rt->ref_tbl_mgr, i, update_type, maat_rt_version);
|
||||
table_manager_commit_runtime(maat_rt->ref_tbl_mgr, i,
|
||||
update_type, maat_rt_version);
|
||||
}
|
||||
|
||||
maat_rt->last_update_time = time(NULL);
|
||||
}
|
||||
|
||||
static void maat_start_cb(long long new_version, int update_type, void *u_param)
|
||||
static void
|
||||
maat_start_cb(long long new_version, int update_type, void *u_param)
|
||||
{
|
||||
size_t i = 0;
|
||||
enum table_type table_type = TABLE_TYPE_INVALID;
|
||||
@@ -99,13 +103,16 @@ static void maat_start_cb(long long new_version, int update_type, void *u_param)
|
||||
maat_inst->new_version = new_version;
|
||||
|
||||
if (update_type == MAAT_UPDATE_TYPE_FULL) {
|
||||
maat_inst->creating_maat_rt = maat_runtime_create(new_version, maat_inst);
|
||||
maat_inst->creating_maat_rt =
|
||||
maat_runtime_create(new_version, maat_inst);
|
||||
|
||||
for (i = 0; i < max_table_cnt; i++) {
|
||||
table_type = table_manager_get_table_type(maat_inst->tbl_mgr, i);
|
||||
if (table_type == TABLE_TYPE_COMPILE) {
|
||||
// compile runtime need a reference to maat runtime
|
||||
void *compile_rt = table_manager_get_updating_runtime(maat_inst->tbl_mgr, i);
|
||||
void *compile_rt =
|
||||
table_manager_get_updating_runtime(maat_inst->tbl_mgr, i);
|
||||
|
||||
compile_runtime_init(compile_rt, maat_inst->creating_maat_rt);
|
||||
}
|
||||
}
|
||||
@@ -117,12 +124,14 @@ static void maat_start_cb(long long new_version, int update_type, void *u_param)
|
||||
table_type = table_manager_get_table_type(maat_inst->tbl_mgr, i);
|
||||
if (table_type == TABLE_TYPE_PLUGIN) {
|
||||
void *schema = table_manager_get_schema(maat_inst->tbl_mgr, i);
|
||||
plugin_table_all_callback_start((struct plugin_schema *)schema, update_type);
|
||||
plugin_table_all_callback_start((struct plugin_schema *)schema,
|
||||
update_type);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static int maat_update_cb(const char *table_name, const char *line, void *u_param)
|
||||
static int
|
||||
maat_update_cb(const char *table_name, const char *line, void *u_param)
|
||||
{
|
||||
if (NULL == table_name || NULL == line || NULL == u_param) {
|
||||
return 0;
|
||||
@@ -144,12 +153,15 @@ static int maat_update_cb(const char *table_name, const char *line, void *u_para
|
||||
|
||||
// find conjunction id for table_id
|
||||
long long conj_parent_table_ids[MAX_CONJ_PARENTS_NUM];
|
||||
int conj_parent_table_cnt = table_manager_get_conj_parent_table_ids(maat_inst->tbl_mgr, table_name,
|
||||
conj_parent_table_ids, MAX_CONJ_PARENTS_NUM);
|
||||
int conj_parent_table_cnt =
|
||||
table_manager_get_conj_parent_table_ids(maat_inst->tbl_mgr, table_name,
|
||||
conj_parent_table_ids,
|
||||
MAX_CONJ_PARENTS_NUM);
|
||||
if (conj_parent_table_cnt > 0) {
|
||||
for (int i = 0; i < conj_parent_table_cnt; i++) {
|
||||
int ret = table_manager_update_runtime(maat_inst->tbl_mgr, table_name,
|
||||
(int)conj_parent_table_ids[i], line, update_type);
|
||||
(int)conj_parent_table_ids[i],
|
||||
line, update_type);
|
||||
if (ret < 0) {
|
||||
log_fatal(maat_inst->logger, MODULE_MAAT_RULE,
|
||||
"[%s:%d] table<%s> update runtime error for rule:%s",
|
||||
@@ -176,20 +188,23 @@ static long long maat_runtime_rule_num(struct maat_runtime *maat_rt)
|
||||
long long total = 0;
|
||||
|
||||
for (size_t i = 0; i < maat_rt->max_table_num; i++) {
|
||||
long long rule_cnt = table_manager_runtime_rule_count(maat_rt->ref_tbl_mgr, i);
|
||||
long long rule_cnt =
|
||||
table_manager_runtime_rule_count(maat_rt->ref_tbl_mgr, i);
|
||||
total += rule_cnt;
|
||||
|
||||
if (rule_cnt != 0) {
|
||||
log_info(maat_rt->logger, MODULE_MAAT_RULE,
|
||||
"table:<%s> rule_count:%lld",
|
||||
table_manager_get_table_name(maat_rt->ref_tbl_mgr, i), rule_cnt);
|
||||
table_manager_get_table_name(maat_rt->ref_tbl_mgr, i),
|
||||
rule_cnt);
|
||||
}
|
||||
}
|
||||
|
||||
return total;
|
||||
}
|
||||
|
||||
static void maat_plugin_table_all_callback_finish(struct table_manager *tbl_mgr)
|
||||
static void
|
||||
maat_plugin_table_all_callback_finish(struct table_manager *tbl_mgr)
|
||||
{
|
||||
size_t max_table_cnt = table_manager_table_size(tbl_mgr);
|
||||
enum table_type table_type = TABLE_TYPE_INVALID;
|
||||
@@ -205,7 +220,8 @@ static void maat_plugin_table_all_callback_finish(struct table_manager *tbl_mgr)
|
||||
}
|
||||
}
|
||||
|
||||
static void maat_plugin_table_garbage_collect_routine(struct table_manager *tbl_mgr)
|
||||
static void
|
||||
maat_plugin_table_garbage_collect_routine(struct table_manager *tbl_mgr)
|
||||
{
|
||||
size_t max_table_cnt = table_manager_table_size(tbl_mgr);
|
||||
enum table_type table_type = TABLE_TYPE_INVALID;
|
||||
@@ -254,9 +270,14 @@ static void maat_finish_cb(void *u_param)
|
||||
maat_plugin_table_all_callback_finish(maat_inst->tbl_mgr);
|
||||
|
||||
if (maat_inst->creating_maat_rt != NULL) {
|
||||
maat_runtime_commit(maat_inst->creating_maat_rt, MAAT_UPDATE_TYPE_FULL,
|
||||
maat_inst->creating_maat_rt->version, maat_inst->logger);
|
||||
maat_inst->creating_maat_rt->rule_num = maat_runtime_rule_num(maat_inst->creating_maat_rt);
|
||||
maat_runtime_commit(maat_inst->creating_maat_rt,
|
||||
MAAT_UPDATE_TYPE_FULL,
|
||||
maat_inst->creating_maat_rt->version,
|
||||
maat_inst->logger);
|
||||
|
||||
maat_inst->creating_maat_rt->rule_num =
|
||||
maat_runtime_rule_num(maat_inst->creating_maat_rt);
|
||||
|
||||
log_info(maat_inst->logger, MODULE_MAAT_RULE,
|
||||
"Full config version %llu load %d entries complete",
|
||||
maat_inst->creating_maat_rt->version,
|
||||
@@ -265,7 +286,10 @@ static void maat_finish_cb(void *u_param)
|
||||
maat_inst->maat_rt->version = maat_inst->maat_version;
|
||||
maat_runtime_commit(maat_inst->maat_rt, MAAT_UPDATE_TYPE_INC,
|
||||
maat_inst->maat_rt->version, maat_inst->logger);
|
||||
maat_inst->maat_rt->rule_num = maat_runtime_rule_num(maat_inst->maat_rt);
|
||||
|
||||
maat_inst->maat_rt->rule_num =
|
||||
maat_runtime_rule_num(maat_inst->maat_rt);
|
||||
|
||||
log_info(maat_inst->logger, MODULE_MAAT_RULE,
|
||||
"Inc config version %llu load %d entries complete",
|
||||
maat_inst->maat_rt->version,
|
||||
@@ -290,21 +314,23 @@ void maat_read_full_config(struct maat *maat_inst)
|
||||
redis_ctx = &(maat_inst->opts.redis_ctx);
|
||||
log_info(maat_inst->logger, MODULE_MAAT_RULE,
|
||||
"Maat initiate from Redis %s:%hu db%d",
|
||||
redis_ctx->redis_ip, redis_ctx->redis_port, redis_ctx->redis_db);
|
||||
redis_ctx->redis_ip, redis_ctx->redis_port,
|
||||
redis_ctx->redis_db);
|
||||
redis_ctx->read_ctx = maat_connect_redis(redis_ctx->redis_ip,
|
||||
redis_ctx->redis_port,
|
||||
redis_ctx->redis_db,
|
||||
maat_inst->logger);
|
||||
redis_ctx->redis_port,
|
||||
redis_ctx->redis_db,
|
||||
maat_inst->logger);
|
||||
if (redis_ctx->read_ctx != NULL) {
|
||||
redis_monitor_traverse(maat_inst->maat_version, redis_ctx,
|
||||
maat_start_cb, maat_update_cb, maat_finish_cb,
|
||||
maat_inst);
|
||||
maat_start_cb, maat_update_cb,
|
||||
maat_finish_cb, maat_inst);
|
||||
}
|
||||
|
||||
if (NULL == maat_inst->creating_maat_rt) {
|
||||
log_fatal(maat_inst->logger, MODULE_MAAT_RULE,
|
||||
"[%s:%d] At initiation: NO effective rule in redis %s:%hu db%d",
|
||||
__FUNCTION__, __LINE__, redis_ctx->redis_ip, redis_ctx->redis_port,
|
||||
"[%s:%d] At initiation: NO effective rule in redis"
|
||||
" %s:%hu db%d", __FUNCTION__, __LINE__,
|
||||
redis_ctx->redis_ip, redis_ctx->redis_port,
|
||||
redis_ctx->redis_db);
|
||||
}
|
||||
break;
|
||||
@@ -326,7 +352,8 @@ void maat_read_full_config(struct maat *maat_inst)
|
||||
if (ret < 0) {
|
||||
log_fatal(maat_inst->logger, MODULE_MAAT_RULE,
|
||||
"[%s:%d] Maat re-initiate with JSON file %s failed: %s",
|
||||
__FUNCTION__, __LINE__, maat_inst->opts.json_ctx.json_file, err_str);
|
||||
__FUNCTION__, __LINE__, maat_inst->opts.json_ctx.json_file,
|
||||
err_str);
|
||||
}
|
||||
|
||||
config_monitor_traverse(maat_inst->maat_version,
|
||||
@@ -347,13 +374,15 @@ void maat_read_full_config(struct maat *maat_inst)
|
||||
maat_inst->maat_rt = maat_inst->creating_maat_rt;
|
||||
maat_inst->creating_maat_rt = NULL;
|
||||
maat_inst->is_running = 1;
|
||||
|
||||
if (maat_inst->maat_rt != NULL) {
|
||||
maat_inst->maat_version = maat_inst->maat_rt->version;
|
||||
maat_inst->last_full_version = maat_inst->maat_rt->version;
|
||||
}
|
||||
}
|
||||
|
||||
long long maat_runtime_get_sequence(struct maat_runtime *maat_rt, const char *key)
|
||||
long long maat_runtime_get_sequence(struct maat_runtime *maat_rt,
|
||||
const char *key)
|
||||
{
|
||||
if (NULL == maat_rt || NULL == key) {
|
||||
return -1;
|
||||
@@ -406,7 +435,8 @@ void *rule_monitor_loop(void *arg)
|
||||
/* if deferred load on */
|
||||
if (maat_inst->opts.deferred_load_on != 0) {
|
||||
log_info(maat_inst->logger, MODULE_MAAT_RULE,
|
||||
"Deferred Loading ON, updating in %s:%d", __FUNCTION__, __LINE__);
|
||||
"Deferred Loading ON, updating in %s:%d",
|
||||
__FUNCTION__, __LINE__);
|
||||
maat_read_full_config(maat_inst);
|
||||
}
|
||||
pthread_mutex_unlock(&(maat_inst->background_update_mutex));
|
||||
@@ -484,8 +514,9 @@ void *rule_monitor_loop(void *arg)
|
||||
old_maat_rt->version, maat_inst->maat_rt->version);
|
||||
}
|
||||
|
||||
maat_inst->stat->zombie_rs_stream += alignment_int64_array_sum(old_maat_rt->ref_cnt,
|
||||
maat_inst->opts.nr_worker_thread);
|
||||
maat_inst->stat->zombie_rs_stream +=
|
||||
alignment_int64_array_sum(old_maat_rt->ref_cnt,
|
||||
maat_inst->opts.nr_worker_thread);
|
||||
maat_garbage_bagging(maat_inst->garbage_bin, old_maat_rt, NULL,
|
||||
garbage_maat_runtime_destroy);
|
||||
}
|
||||
@@ -514,7 +545,8 @@ void *rule_monitor_loop(void *arg)
|
||||
maat_plugin_table_garbage_collect_routine(maat_inst->tbl_mgr);
|
||||
|
||||
if ((1 == maat_inst->opts.stat_on) && (time(NULL) % 2 == 0)) {
|
||||
maat_stat_output(maat_inst->stat, maat_inst->maat_version, maat_inst->opts.perf_on);
|
||||
maat_stat_output(maat_inst->stat, maat_inst->maat_version,
|
||||
maat_inst->opts.perf_on);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user