third draft

This commit is contained in:
liuwentan
2023-07-06 18:58:15 +08:00
parent 2d6ffdd166
commit 9d373ad454
41 changed files with 81287 additions and 455 deletions

View File

@@ -184,8 +184,8 @@ static int get_foreign_keys_define(redisContext *ctx, struct serial_rule *rule_l
return rule_with_foreign_key;
}
int maat_cmd_get_foreign_keys_by_prefix(redisContext *ctx, struct serial_rule *rule_list,
int rule_num, const char* dir, struct log_handle *logger)
int maat_get_foreign_keys_by_prefix(redisContext *ctx, struct serial_rule *rule_list,
int rule_num, const char* dir, struct log_handle *logger)
{
int j = 0;
int foreign_key_size = 0;
@@ -243,7 +243,7 @@ static int _get_maat_redis_value(redisContext *c, struct serial_rule *rule_list,
}
for (i = 0; i < rule_num; i++) {
ret = maat_cmd_wrap_redis_get_reply(c, &reply);
ret = maat_wrap_redis_get_reply(c, &reply);
if (ret == REDIS_ERR) {
log_error(logger, MODULE_REDIS_MONITOR,
"[%s:%d] Redis GET %s:%s,%lld failed, redis server error",
@@ -289,7 +289,7 @@ static int _get_maat_redis_value(redisContext *c, struct serial_rule *rule_list,
for (i = 0; i < failed_cnt; i++) {
idx = retry_ids[i];
ret = maat_cmd_wrap_redis_get_reply(c, &reply);
ret = maat_wrap_redis_get_reply(c, &reply);
if (ret == REDIS_ERR) {
log_error(logger, MODULE_REDIS_MONITOR,
"[%s:%d] redis command %s failed, redis server error",
@@ -321,8 +321,8 @@ static int _get_maat_redis_value(redisContext *c, struct serial_rule *rule_list,
return 0;
}
int maat_cmd_get_redis_value(redisContext *c, struct serial_rule *rule_list,
int rule_num, int print_process, struct log_handle *logger)
int maat_get_redis_value(redisContext *c, struct serial_rule *rule_list,
int rule_num, int print_process, struct log_handle *logger)
{
int max_redis_batch = 4096;
int success_cnt = 0;
@@ -377,9 +377,9 @@ static int get_inc_key_list(long long instance_version, long long target_version
return 0;
}
redisReply *tmp_reply= maat_cmd_wrap_redis_command(c, "ZSCORE %s %s",
mr_status_sset,
reply->element[0]->str);
redisReply *tmp_reply= maat_wrap_redis_command(c, "ZSCORE %s %s",
mr_status_sset,
reply->element[0]->str);
if (tmp_reply->type != REDIS_REPLY_STRING) {
log_error(logger, MODULE_REDIS_MONITOR,
"[%s:%d] ZSCORE %s %s failed Version: %lld->%lld",
@@ -393,7 +393,7 @@ static int get_inc_key_list(long long instance_version, long long target_version
return -1;
}
long long nearest_rule_version = maat_cmd_read_redis_integer(tmp_reply);
long long nearest_rule_version = maat_read_redis_integer(tmp_reply);
freeReplyObject(tmp_reply);
tmp_reply = NULL;
@@ -444,6 +444,39 @@ static int get_inc_key_list(long long instance_version, long long target_version
return rule_num;
}
void maat_clear_rule_cache(struct serial_rule *s_rule)
{
if (s_rule->table_line != NULL) {
FREE(s_rule->table_line);
}
if (s_rule->n_foreign > 0) {
for (int i = 0; i < s_rule->n_foreign; i++) {
FREE(s_rule->f_keys[i].filename);
FREE(s_rule->f_keys[i].key);
}
FREE(s_rule->f_keys);
}
memset(s_rule, 0, sizeof(struct serial_rule));
}
void maat_set_serial_rule(struct serial_rule *rule, enum maat_operation op,
long long rule_id, const char *table_name,
const char *line, long long timeout)
{
memset(rule, 0, sizeof(struct serial_rule));
rule->op = op;
rule->rule_id = rule_id;
rule->timeout = timeout;
assert(strlen(table_name) < sizeof(rule->table_name));
strncpy(rule->table_name, table_name, sizeof(rule->table_name));
if (line != NULL) {
rule->table_line = maat_strdup(line);
}
}
static void serial_rule_free(struct serial_rule *s_rule)
{
if (NULL == s_rule) {
@@ -559,10 +592,10 @@ static int recovery_history_version(const struct serial_rule *current, int curre
return ret;
}
int maat_cmd_get_rm_key_list(redisContext *c, long long instance_version,
long long desired_version, long long *new_version,
struct table_manager *tbl_mgr, struct serial_rule **list,
int *update_type, int cumulative_off, struct log_handle *logger)
int maat_get_rm_key_list(redisContext *c, long long instance_version,
long long desired_version, long long *new_version,
struct table_manager *tbl_mgr, struct serial_rule **list,
int *update_type, int cumulative_off, struct log_handle *logger)
{
int rule_num = 0;
long long target_version = 0;
@@ -585,7 +618,7 @@ int maat_cmd_get_rm_key_list(redisContext *c, long long instance_version,
return -1;
}
long long redis_version = maat_cmd_read_redis_integer(reply);
long long redis_version = maat_read_redis_integer(reply);
if (redis_version < 0) {
if (reply->type == REDIS_REPLY_ERROR) {
log_error(logger, MODULE_REDIS_MONITOR,
@@ -667,12 +700,12 @@ FULL_UPDATE:
size_t i = 0;
//consume reply "OK" and "QUEUED".
for (i = 0; i < append_cmd_cnt; i++) {
maat_cmd_wrap_redis_get_reply(c, &reply);
maat_wrap_redis_get_reply(c, &reply);
freeReplyObject(reply);
reply = NULL;
}
reply = maat_cmd_wrap_redis_command(c, "EXEC");
reply = maat_wrap_redis_command(c, "EXEC");
if (NULL == reply) {
log_error(logger, MODULE_REDIS_MONITOR,
"[%s:%d] Redis Communication error: %s",
@@ -689,7 +722,7 @@ FULL_UPDATE:
return -1;
}
*new_version = maat_cmd_read_redis_integer(reply->element[0]);
*new_version = maat_read_redis_integer(reply->element[0]);
redisReply *sub_reply = reply->element[1];
if (sub_reply->type != REDIS_REPLY_ARRAY) {
log_error(logger, MODULE_REDIS_MONITOR,
@@ -829,7 +862,7 @@ static void _get_foreign_conts(redisContext *c, struct serial_rule *rule_list,
redisReply *reply = NULL;
for (i = 0; i < key_num; i++) {
ret = maat_cmd_wrap_redis_get_reply(c, &reply);
ret = maat_wrap_redis_get_reply(c, &reply);
if (ret == REDIS_ERR) {
log_error(logger, MODULE_REDIS_MONITOR,
"[%s:%d] Get %s,%lld foreign key %s content failed, redis server error",
@@ -874,8 +907,8 @@ static void _get_foreign_conts(redisContext *c, struct serial_rule *rule_list,
return;
}
void maat_cmd_get_foreign_conts(redisContext *c, struct serial_rule *rule_list,
int rule_num, int print_fn, struct log_handle *logger)
void maat_get_foreign_conts(redisContext *c, struct serial_rule *rule_list,
int rule_num, int print_fn, struct log_handle *logger)
{
int max_redis_batch = 4096;
int success_cnt = 0;
@@ -893,7 +926,7 @@ static int invalidate_line(char *line, int column_seq)
return -1;
}
int offset = maat_cmd_get_valid_flag_offset(line, column_seq);
int offset = maat_get_valid_flag_offset(line, column_seq);
if (offset < 0) {
return -1;
}
@@ -903,7 +936,7 @@ static int invalidate_line(char *line, int column_seq)
return 0;
}
void maat_cmd_rewrite_table_line_with_foreign(struct serial_rule *s_rule)
void maat_rewrite_table_line_with_foreign(struct serial_rule *s_rule)
{
int i = 0;
size_t fn_size = 0;
@@ -950,8 +983,8 @@ static int redlock_try_lock(redisContext *c, const char *lock_name,
{
int ret = 0;
redisReply *reply = maat_cmd_wrap_redis_command(c, "SET %s locked NX PX %lld",
lock_name, expire);
redisReply *reply = maat_wrap_redis_command(c, "SET %s locked NX PX %lld",
lock_name, expire);
if (reply->type == REDIS_REPLY_NIL) {
ret = 0;
} else {
@@ -979,8 +1012,8 @@ static long long exec_serial_rule_begin(redisContext* c, size_t rule_num,
}
if (rule_num > renew_rule_num) {
data_reply = maat_cmd_wrap_redis_command(c, "INCRBY MAAT_PRE_VER 1");
*transaction_version = maat_cmd_read_redis_integer(data_reply);
data_reply = maat_wrap_redis_command(c, "INCRBY MAAT_PRE_VER 1");
*transaction_version = maat_read_redis_integer(data_reply);
freeReplyObject(data_reply);
data_reply = NULL;
if (*transaction_version < 0) {
@@ -989,7 +1022,7 @@ static long long exec_serial_rule_begin(redisContext* c, size_t rule_num,
}
if (*renew_allowed == 1 || rule_num > renew_rule_num) {
data_reply = maat_cmd_wrap_redis_command(c, "MULTI");
data_reply = maat_wrap_redis_command(c, "MULTI");
freeReplyObject(data_reply);
data_reply = NULL;
ret = 0;
@@ -1000,7 +1033,7 @@ static long long exec_serial_rule_begin(redisContext* c, size_t rule_num,
static void redlock_unlock(redisContext *c, const char *lock_name)
{
redisReply *reply = maat_cmd_wrap_redis_command(c, "DEL %s", lock_name);
redisReply *reply = maat_wrap_redis_command(c, "DEL %s", lock_name);
freeReplyObject(reply);
reply = NULL;
}
@@ -1027,7 +1060,7 @@ static redisReply* exec_serial_rule_end(redisContext *c, const char *transaction
}
if (strlen(transaction_list) > 0) {
data_reply = maat_cmd_wrap_redis_command(c, "eval %s 4 MAAT_VERSION %s %s %s %lld",
data_reply = maat_wrap_redis_command(c, "eval %s 4 MAAT_VERSION %s %s %s %lld",
lua_exec_done,
mr_status_sset,
mr_version_sset,
@@ -1039,7 +1072,7 @@ static redisReply* exec_serial_rule_end(redisContext *c, const char *transaction
(*cnt)++;
}
data_reply = maat_cmd_wrap_redis_command(c, "EXEC");
data_reply = maat_wrap_redis_command(c, "EXEC");
return data_reply;
}
@@ -1154,7 +1187,7 @@ static void exec_serial_rule(redisContext *c, const char *transaction_list,
}
for (i = 0; i < append_cmd_cnt; i++) {
maat_cmd_wrap_redis_get_reply(c, &data_reply);
maat_wrap_redis_get_reply(c, &data_reply);
freeReplyObject(data_reply);
data_reply=NULL;
}
@@ -1260,7 +1293,7 @@ int maat_cmd_write_rule(redisContext *c, struct serial_rule *s_rule,
}
if (transaction_version > 0) {
transaction_finished_version = maat_cmd_read_redis_integer(transaction_reply->element[multi_cmd_cnt - 1]);
transaction_finished_version = maat_read_redis_integer(transaction_reply->element[multi_cmd_cnt - 1]);
log_info(logger, MODULE_REDIS_MONITOR,
"Redis transaction MAAT_PRE_VER = %lld , MAAT_VERSION = %lld",
transaction_version, transaction_finished_version);
@@ -1297,12 +1330,12 @@ static void cleanup_update_status(redisContext *c, struct log_handle *logger)
long long version_num = 0;
long long entry_num = 0;
long long server_time = maat_cmd_redis_server_time_s(c);
long long server_time = maat_redis_server_time_s(c);
if (!server_time) {
return;
}
redisReply *reply = maat_cmd_wrap_redis_command(c, "MULTI");
redisReply *reply = maat_wrap_redis_command(c, "MULTI");
freeReplyObject(reply);
reply = NULL;
@@ -1317,13 +1350,13 @@ static void cleanup_update_status(redisContext *c, struct log_handle *logger)
//consume reply "OK" and "QUEUED".
for(int i = 0; i < append_cmd_cnt; i++) {
maat_cmd_wrap_redis_get_reply(c, &reply);
maat_wrap_redis_get_reply(c, &reply);
freeReplyObject(reply);
reply = NULL;
}
redisReply *sub_reply = NULL;
reply = maat_cmd_wrap_redis_command(c, "EXEC");
reply = maat_wrap_redis_command(c, "EXEC");
if (reply->type != REDIS_REPLY_ARRAY) {
goto error_out;
}
@@ -1338,16 +1371,16 @@ static void cleanup_update_status(redisContext *c, struct log_handle *logger)
goto error_out;
}
version_lower_bound = maat_cmd_read_redis_integer(sub_reply->element[0]);
version_upper_bound = maat_cmd_read_redis_integer(sub_reply->element[sub_reply->elements-1]);
version_lower_bound = maat_read_redis_integer(sub_reply->element[0]);
version_upper_bound = maat_read_redis_integer(sub_reply->element[sub_reply->elements-1]);
freeReplyObject(reply);
reply = NULL;
//To deal with maat_version reset to 0, do NOT use -inf as lower bound intentionally.
reply = maat_cmd_wrap_redis_command(c, "ZREMRANGEBYSCORE %s %lld %lld",
reply = maat_wrap_redis_command(c, "ZREMRANGEBYSCORE %s %lld %lld",
mr_status_sset, version_lower_bound,
version_upper_bound);
entry_num = maat_cmd_read_redis_integer(reply);
entry_num = maat_read_redis_integer(reply);
freeReplyObject(reply);
reply = NULL;
@@ -1365,12 +1398,12 @@ static void check_maat_expiration(redisContext *c, struct log_handle *logger)
{
UNUSED int ret = 0;
long long server_time = maat_cmd_redis_server_time_s(c);
long long server_time = maat_redis_server_time_s(c);
if (!server_time) {
return;
}
redisReply *data_reply= maat_cmd_wrap_redis_command(c, "ZRANGEBYSCORE %s -inf %lld",
redisReply *data_reply= maat_wrap_redis_command(c, "ZRANGEBYSCORE %s -inf %lld",
mr_expire_sset, server_time);
if (data_reply->type != REDIS_REPLY_ARRAY || 0 == data_reply->elements) {
freeReplyObject(data_reply);
@@ -1442,7 +1475,7 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx,
}
log_info(maat_inst->logger, MODULE_REDIS_MONITOR, "Reconnecting...");
mr_ctx->read_ctx = maat_cmd_connect_redis(mr_ctx->redis_ip,
mr_ctx->read_ctx = maat_connect_redis(mr_ctx->redis_ip,
mr_ctx->redis_port,
mr_ctx->redis_db,
maat_inst->logger);
@@ -1457,7 +1490,7 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx,
long long new_version = 0;
int update_type = MAAT_UPDATE_TYPE_INC;
int rule_num = maat_cmd_get_rm_key_list(mr_ctx->read_ctx, version,
int rule_num = maat_get_rm_key_list(mr_ctx->read_ctx, version,
maat_inst->load_specific_version,
&new_version, maat_inst->tbl_mgr,
&rule_list, &update_type,
@@ -1477,8 +1510,8 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx,
}
if (rule_num > 0) {
ret = maat_cmd_get_redis_value(mr_ctx->read_ctx, rule_list, rule_num,
0, maat_inst->logger);
ret = maat_get_redis_value(mr_ctx->read_ctx, rule_list, rule_num,
0, maat_inst->logger);
//redis communication error
if (ret < 0) {
redisFree(mr_ctx->read_ctx);
@@ -1504,8 +1537,8 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx,
ret = get_foreign_keys_define(mr_ctx->read_ctx, rule_list, rule_num,
maat_inst, maat_inst->opts.foreign_cont_dir);
if (ret > 0) {
maat_cmd_get_foreign_conts(mr_ctx->read_ctx, rule_list, rule_num, 0,
maat_inst->logger);
maat_get_foreign_conts(mr_ctx->read_ctx, rule_list, rule_num, 0,
maat_inst->logger);
}
}
@@ -1539,7 +1572,7 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx,
}
if (rule_list[i].n_foreign > 0) {
maat_cmd_rewrite_table_line_with_foreign(rule_list+i);
maat_rewrite_table_line_with_foreign(rule_list+i);
}
update_fn(rule_list[i].table_name, rule_list[i].table_line, u_param);
@@ -1556,7 +1589,7 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx,
clean_up:
for (i = 0; i < rule_num; i++) {
maat_cmd_clear_rule_cache(rule_list + i);
maat_clear_rule_cache(rule_list + i);
}
FREE(rule_list);