只需要将compile_hash设为线程安全,即可保证command线程和update线程的安全访问。
This commit is contained in:
@@ -476,6 +476,10 @@ Maat_feather_t Maat_feather(int max_thread_num,const char* table_info_path,void*
|
|||||||
feather->last_region_saving=aligment_int64_array_alloc(max_thread_num);
|
feather->last_region_saving=aligment_int64_array_alloc(max_thread_num);
|
||||||
feather->maat_version=0;
|
feather->maat_version=0;
|
||||||
feather->last_full_version=0;
|
feather->last_full_version=0;
|
||||||
|
feather->base_grp_seq=0;
|
||||||
|
feather->base_rgn_seq=0;
|
||||||
|
feather->connect_timeout.tv_sec=0;
|
||||||
|
feather->connect_timeout.tv_usec=100*1000; // 100 ms
|
||||||
pthread_mutex_init(&(feather->plugin_table_reg_mutex),NULL);
|
pthread_mutex_init(&(feather->plugin_table_reg_mutex),NULL);
|
||||||
pthread_mutex_init(&(feather->redis_write_lock),NULL);
|
pthread_mutex_init(&(feather->redis_write_lock),NULL);
|
||||||
snprintf(feather->table_info_fn,sizeof(feather->table_info_fn),"%s",table_info_path);
|
snprintf(feather->table_info_fn,sizeof(feather->table_info_fn),"%s",table_info_path);
|
||||||
@@ -590,13 +594,11 @@ int Maat_set_feather_opt(Maat_feather_t feather,enum MAAT_INIT_OPT type,const vo
|
|||||||
int Maat_initiate_feather(Maat_feather_t feather)
|
int Maat_initiate_feather(Maat_feather_t feather)
|
||||||
{
|
{
|
||||||
_Maat_feather_t* _feather=(_Maat_feather_t*)feather;
|
_Maat_feather_t* _feather=(_Maat_feather_t*)feather;
|
||||||
struct timeval timeout;
|
|
||||||
timeout.tv_sec=0;
|
|
||||||
timeout.tv_usec=100*1000; // 100 ms
|
|
||||||
if(strlen(_feather->redis_ip)>0&&_feather->redis_port!=0)
|
if(strlen(_feather->redis_ip)>0&&_feather->redis_port!=0)
|
||||||
{
|
{
|
||||||
_feather->REDIS_MODE_ON=1;
|
_feather->REDIS_MODE_ON=1;
|
||||||
_feather->redis_read_ctx=redisConnectWithTimeout(_feather->redis_ip,_feather->redis_port,timeout);
|
_feather->redis_read_ctx=redisConnectWithTimeout(_feather->redis_ip,_feather->redis_port,_feather->connect_timeout);
|
||||||
if(_feather->redis_read_ctx==NULL)
|
if(_feather->redis_read_ctx==NULL)
|
||||||
{
|
{
|
||||||
MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_module
|
MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_module
|
||||||
@@ -612,7 +614,7 @@ int Maat_initiate_feather(Maat_feather_t feather)
|
|||||||
,maat_finish_cb
|
,maat_finish_cb
|
||||||
, _feather
|
, _feather
|
||||||
,_feather->decrypt_key //Not used.
|
,_feather->decrypt_key //Not used.
|
||||||
,_feather->logger);
|
,_feather);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -788,9 +788,7 @@ int Maat_commit_command(Maat_feather_t feather)
|
|||||||
redisReply* reply=NULL;
|
redisReply* reply=NULL;
|
||||||
struct serial_rule_t* update_status=NULL;
|
struct serial_rule_t* update_status=NULL;
|
||||||
int status_cnt=0;
|
int status_cnt=0;
|
||||||
struct timeval timeout;
|
|
||||||
timeout.tv_sec=0;
|
|
||||||
timeout.tv_usec=100*1000; // 100 ms
|
|
||||||
int ret=0,i=0,redis_ret=REDIS_ERR,retry=0;
|
int ret=0,i=0,redis_ret=REDIS_ERR,retry=0;
|
||||||
long long maat_redis_version=0,new_region_num=0,new_group_num=0;
|
long long maat_redis_version=0,new_region_num=0,new_group_num=0;
|
||||||
int serial_rule_num=0,serial_rule_idx=0;
|
int serial_rule_num=0,serial_rule_idx=0;
|
||||||
@@ -803,7 +801,7 @@ int Maat_commit_command(Maat_feather_t feather)
|
|||||||
|
|
||||||
if(_feather->redis_write_ctx==NULL)
|
if(_feather->redis_write_ctx==NULL)
|
||||||
{
|
{
|
||||||
_feather->redis_write_ctx=redisConnectWithTimeout(_feather->redis_ip, _feather->redis_port,timeout);
|
_feather->redis_write_ctx=redisConnectWithTimeout(_feather->redis_ip, _feather->redis_port,_feather->connect_timeout);
|
||||||
if(_feather->redis_write_ctx==NULL)
|
if(_feather->redis_write_ctx==NULL)
|
||||||
{
|
{
|
||||||
MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_module
|
MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_module
|
||||||
|
|||||||
@@ -977,15 +977,17 @@ struct _Maat_scanner_t* create_maat_scanner(unsigned int version,_Maat_feather_t
|
|||||||
struct _Maat_scanner_t* scanner=NULL;
|
struct _Maat_scanner_t* scanner=NULL;
|
||||||
scanner=(struct _Maat_scanner_t*)calloc(sizeof(struct _Maat_scanner_t),1);
|
scanner=(struct _Maat_scanner_t*)calloc(sizeof(struct _Maat_scanner_t),1);
|
||||||
|
|
||||||
|
//Function Maat_append_command will access compile_hash in user thread.
|
||||||
|
hargs.thread_safe=1;
|
||||||
scanner->compile_hash=MESA_htable_create(&hargs, sizeof(hargs));
|
scanner->compile_hash=MESA_htable_create(&hargs, sizeof(hargs));
|
||||||
MESA_htable_print_crtl(scanner->compile_hash,0);
|
MESA_htable_print_crtl(scanner->compile_hash,0);
|
||||||
|
|
||||||
|
hargs.thread_safe=0;
|
||||||
hargs.data_free=EMPTY_FREE;
|
hargs.data_free=EMPTY_FREE;
|
||||||
scanner->group_hash=MESA_htable_create(&hargs, sizeof(hargs));
|
scanner->group_hash=MESA_htable_create(&hargs, sizeof(hargs));
|
||||||
MESA_htable_print_crtl(scanner->group_hash,0);
|
MESA_htable_print_crtl(scanner->group_hash,0);
|
||||||
|
|
||||||
|
hargs.thread_safe=0;
|
||||||
scanner->region_hash=MESA_htable_create(&hargs, sizeof(hargs));
|
scanner->region_hash=MESA_htable_create(&hargs, sizeof(hargs));
|
||||||
MESA_htable_print_crtl(scanner->region_hash,0);
|
MESA_htable_print_crtl(scanner->region_hash,0);
|
||||||
|
|
||||||
|
|||||||
@@ -368,8 +368,10 @@ struct _Maat_feather_t
|
|||||||
char group_tn[MAX_TABLE_NAME_LEN];
|
char group_tn[MAX_TABLE_NAME_LEN];
|
||||||
pthread_mutex_t plugin_table_reg_mutex;
|
pthread_mutex_t plugin_table_reg_mutex;
|
||||||
unsigned char decrypt_key[MAX_TABLE_NAME_LEN];
|
unsigned char decrypt_key[MAX_TABLE_NAME_LEN];
|
||||||
unsigned char redis_ip[MAX_TABLE_NAME_LEN];;
|
|
||||||
|
unsigned char redis_ip[MAX_TABLE_NAME_LEN];
|
||||||
int redis_port;
|
int redis_port;
|
||||||
|
struct timeval connect_timeout;
|
||||||
redisContext *redis_read_ctx;
|
redisContext *redis_read_ctx;
|
||||||
redisContext *redis_write_ctx; // not thread safe.
|
redisContext *redis_write_ctx; // not thread safe.
|
||||||
int on_redis_writing;
|
int on_redis_writing;
|
||||||
|
|||||||
Reference in New Issue
Block a user