item_uthash -> item_rcu && add foreign cont dir API

This commit is contained in:
liuwentan
2023-03-15 11:36:54 +08:00
parent 33c9c10467
commit 90d0764845
41 changed files with 2789 additions and 1603 deletions

View File

@@ -84,19 +84,118 @@ redisContext *maat_cmd_connect_redis(const char *redis_ip, int redis_port,
return c;
}
struct s_rule_array
long long maat_cmd_read_redis_integer(const redisReply *reply)
{
int cnt;
int size;
struct serial_rule *array;
};
switch (reply->type) {
case REDIS_REPLY_INTEGER:
return reply->integer;
break;
case REDIS_REPLY_ARRAY:
assert(reply->element[0]->type == REDIS_REPLY_INTEGER);
return reply->element[0]->integer;
break;
case REDIS_REPLY_STRING:
return atoll(reply->str);
break;
default:
return -1;
break;
}
return 0;
}
void save_serial_rule(void *data, void *user)
int redis_flushDB(redisContext *ctx, int db_index, struct log_handle *logger)
{
struct s_rule_array *array = (struct s_rule_array *)user;
int i = array->cnt;
memcpy(&(array->array[i]), data, sizeof(struct serial_rule));
array->array[i].op = MAAT_OP_ADD;
long long maat_redis_version = 0;
redisReply *data_reply = maat_cmd_wrap_redis_command(ctx, "WATCH MAAT_VERSION");
freeReplyObject(data_reply);
data_reply = NULL;
data_reply = maat_cmd_wrap_redis_command(ctx, "GET MAAT_VERSION");
if (data_reply->type == REDIS_REPLY_NIL) {
maat_redis_version = 0;
} else {
maat_redis_version = maat_cmd_read_redis_integer(data_reply);
maat_redis_version++;
freeReplyObject(data_reply);
data_reply = NULL;
}
data_reply = maat_cmd_wrap_redis_command(ctx, "DBSIZE");
long long dbsize = maat_cmd_read_redis_integer(data_reply);
freeReplyObject(data_reply);
data_reply = NULL;
data_reply = maat_cmd_wrap_redis_command(ctx, "MULTI");
freeReplyObject(data_reply);
data_reply = NULL;
int append_cmd_cnt = 0;
redisAppendCommand(ctx, "FLUSHDB");
append_cmd_cnt++;
redisAppendCommand(ctx, "SET MAAT_VERSION %lld", maat_redis_version);
append_cmd_cnt++;
redisAppendCommand(ctx, "SET MAAT_PRE_VER %lld", maat_redis_version);
append_cmd_cnt++;
redisAppendCommand(ctx, "SET %s 1", mr_region_id_var);
append_cmd_cnt++;
redisAppendCommand(ctx, "SET %s 1", mr_group_id_var);
append_cmd_cnt++;
redisAppendCommand(ctx, "EXEC");
append_cmd_cnt++;
int ret = 0;
int redis_transaction_success = 1;
for (int i = 0; i < append_cmd_cnt; i++) {
ret = maat_cmd_wrap_redis_get_reply(ctx, &data_reply);
if (ret == REDIS_OK) {
if (0 == mr_transaction_success(data_reply)) {
redis_transaction_success = 0;
}
freeReplyObject(data_reply);
data_reply = NULL;
}
}
if (redis_transaction_success == 1) {
log_info(logger, MODULE_MAAT_COMMAND,
"FlushDB %d, MAAT_VERSION:%llu, DBSize:%llu.",
db_index, (maat_redis_version == 0)?0:(maat_redis_version-1),dbsize);
}
return redis_transaction_success;
}
redisContext *get_redis_ctx_for_write(struct maat *maat_instance)
{
if (NULL == maat_instance->mr_ctx.write_ctx) {
int ret = connect_redis_for_write(&(maat_instance->mr_ctx),
maat_instance->logger);
if(ret!=0)
{
return NULL;
}
}
return maat_instance->mr_ctx.write_ctx;
}
int maat_cmd_flushDB(struct maat *maat_instance)
{
int ret = 0;
redisContext *write_ctx = get_redis_ctx_for_write(maat_instance);
if (NULL == write_ctx) {
return -1;
}
do {
ret = redis_flushDB(maat_instance->mr_ctx.write_ctx,
maat_instance->mr_ctx.redis_db,
maat_instance->logger);
} while(0 == ret);
return 0;
}
void maat_cmd_clear_rule_cache(struct serial_rule *s_rule)
@@ -130,19 +229,6 @@ int connect_redis_for_write(struct source_redis_ctx *mr_ctx,
}
}
redisContext *get_redis_ctx_for_write(struct maat *maat_instance)
{
if (NULL == maat_instance->mr_ctx.write_ctx) {
int ret = connect_redis_for_write(&(maat_instance->mr_ctx),
maat_instance->logger);
if(ret!=0)
{
return NULL;
}
}
return maat_instance->mr_ctx.write_ctx;
}
void maat_cmd_set_serial_rule(struct serial_rule *rule, enum maat_operation op,
long long rule_id, const char *table_name,
const char *line, long long timeout)
@@ -266,26 +352,6 @@ const char *maat_cmd_find_Nth_column(const char *line, int Nth, int *column_len)
return line + start;
}
long long maat_cmd_read_redis_integer(const redisReply *reply)
{
switch (reply->type) {
case REDIS_REPLY_INTEGER:
return reply->integer;
break;
case REDIS_REPLY_ARRAY:
assert(reply->element[0]->type == REDIS_REPLY_INTEGER);
return reply->element[0]->integer;
break;
case REDIS_REPLY_STRING:
return atoll(reply->str);
break;
default:
return -1;
break;
}
return 0;
}
int maat_cmd_wrap_redis_get_reply(redisContext *c, redisReply **reply)
{
return redisGetReply(c, (void **)reply);
@@ -373,4 +439,64 @@ long long maat_cmd_incrby(struct maat *maat_instance, const char *key, int incre
data_reply = NULL;
return result;
}
long long maat_cmd_get_config_version(struct maat *maat_instance)
{
long long new_version = -1;
if (maat_instance->new_version != INVALID_VERSION) {
new_version = maat_instance->new_version;
} else {
new_version = maat_instance->maat_version;
}
return new_version;
}
int maat_cmd_config_is_updating(struct maat *maat_instance)
{
int ret = 0;
if (0 == pthread_mutex_trylock(&(maat_instance->background_update_mutex))) {
ret = 0;
pthread_mutex_unlock(&(maat_instance->background_update_mutex));
} else {
ret = 1;
}
return ret;
}
char *maat_cmd_str_escape(char *dst, int size, const char *src)
{
int i = 0, j = 0;
int len = strlen(src);
for (i = 0, j = 0; i < len && j < size; i++) {
switch (src[i]) {
case '&':
dst[j] = '\\';
dst[j+1] = '&';
j += 2;
break;
case ' ':
dst[j] = '\\';
dst[j+1] = 'b';//space,0x20;
j += 2;
break;
case '\\':
dst[j] = '\\';
dst[j+1] = '\\';
j += 2;
break;
default:
dst[j] = src[i];
j++; //undo the followed i++
break;
}
}
dst[j] = '\0';
return dst;
}