implement rcu for g2g runtime & fix maat_stat bug
This commit is contained in:
372
src/maat_group.c
372
src/maat_group.c
@@ -35,7 +35,6 @@ struct group2group_schema {
|
||||
struct maat_group {
|
||||
igraph_integer_t vertex_id;
|
||||
long long group_id;
|
||||
int ref_by_compile_cnt;
|
||||
int ref_by_super_group_cnt;
|
||||
int ref_by_sub_group_cnt;
|
||||
|
||||
@@ -50,7 +49,7 @@ struct maat_group_topology {
|
||||
struct maat_group *hash_group_by_id; //key: group_id, value: struct maat_group *.
|
||||
struct maat_group *hash_group_by_vertex; //key: vetex_id, value: struct maat_group *. Multimap (Items with multiple keys).
|
||||
igraph_t group_graph;
|
||||
igraph_integer_t group_graph_vcount;
|
||||
igraph_integer_t group_graph_vcount;
|
||||
igraph_vector_t dfs_vids;
|
||||
igraph_integer_t grp_vertex_id_generator;
|
||||
|
||||
@@ -59,11 +58,12 @@ struct maat_group_topology {
|
||||
|
||||
struct group2group_runtime {
|
||||
struct maat_group_topology *group_topo;
|
||||
struct maat_group_topology *updating_group_topo;
|
||||
long long version;
|
||||
long long rule_num;
|
||||
long long update_err_cnt;
|
||||
int updating_flag;
|
||||
pthread_rwlock_t rwlock;
|
||||
|
||||
struct maat_garbage_bin *ref_garbage_bin;
|
||||
struct log_handle *logger;
|
||||
};
|
||||
@@ -121,6 +121,12 @@ void group2group_schema_free(void *g2g_schema)
|
||||
FREE(g2g_schema);
|
||||
}
|
||||
|
||||
void group_vertex_free(struct maat_group *group)
|
||||
{
|
||||
FREE(group->top_group_ids);
|
||||
FREE(group);
|
||||
}
|
||||
|
||||
struct maat_group_topology *maat_group_topology_new(struct log_handle *logger)
|
||||
{
|
||||
struct maat_group_topology *group_topo = ALLOC(struct maat_group_topology, 1);
|
||||
@@ -137,30 +143,6 @@ struct maat_group_topology *maat_group_topology_new(struct log_handle *logger)
|
||||
return group_topo;
|
||||
}
|
||||
|
||||
void *group2group_runtime_new(void *g2g_schema, size_t max_thread_num,
|
||||
struct maat_garbage_bin *garbage_bin,
|
||||
struct log_handle *logger)
|
||||
{
|
||||
if (NULL == g2g_schema) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
struct group2group_runtime *g2g_rt = ALLOC(struct group2group_runtime, 1);
|
||||
|
||||
g2g_rt->group_topo = maat_group_topology_new(logger);
|
||||
g2g_rt->ref_garbage_bin = garbage_bin;
|
||||
g2g_rt->logger = logger;
|
||||
pthread_rwlock_init(&g2g_rt->rwlock, NULL);
|
||||
|
||||
return g2g_rt;
|
||||
}
|
||||
|
||||
void group_vertex_free(struct maat_group *group)
|
||||
{
|
||||
FREE(group->top_group_ids);
|
||||
FREE(group);
|
||||
}
|
||||
|
||||
void maat_group_topology_free(struct maat_group_topology *group_topo)
|
||||
{
|
||||
struct maat_group *group = NULL, *tmp_group = NULL;
|
||||
@@ -176,6 +158,65 @@ void maat_group_topology_free(struct maat_group_topology *group_topo)
|
||||
FREE(group_topo);
|
||||
}
|
||||
|
||||
struct maat_group *maat_group_clone(struct maat_group *group)
|
||||
{
|
||||
struct maat_group *copy_group = ALLOC(struct maat_group, 1);
|
||||
|
||||
copy_group->group_id = group->group_id;
|
||||
copy_group->vertex_id = group->vertex_id;
|
||||
copy_group->ref_by_sub_group_cnt = group->ref_by_sub_group_cnt;
|
||||
copy_group->ref_by_super_group_cnt = group->ref_by_super_group_cnt;
|
||||
copy_group->top_group_cnt = group->top_group_cnt;
|
||||
if (copy_group->top_group_cnt > 0) {
|
||||
copy_group->top_group_ids = ALLOC(long long, copy_group->top_group_cnt);
|
||||
memcpy(copy_group->top_group_ids, group->top_group_ids,
|
||||
copy_group->top_group_cnt * sizeof(long long));
|
||||
}
|
||||
|
||||
return copy_group;
|
||||
}
|
||||
|
||||
struct maat_group_topology *maat_group_topology_clone(struct maat_group_topology *src_group_topo)
|
||||
{
|
||||
if (NULL == src_group_topo) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
struct maat_group_topology *copy_group_topo = ALLOC(struct maat_group_topology, 1);
|
||||
|
||||
struct maat_group *group = NULL, *tmp_group = NULL;
|
||||
HASH_ITER(hh_group_id, copy_group_topo->hash_group_by_id, group, tmp_group) {
|
||||
struct maat_group *copy_group = maat_group_clone(group);
|
||||
|
||||
HASH_ADD(hh_group_id, copy_group_topo->hash_group_by_id, group_id,
|
||||
sizeof(copy_group->group_id), copy_group);
|
||||
HASH_ADD(hh_vertex_id, copy_group_topo->hash_group_by_vertex, vertex_id,
|
||||
sizeof(copy_group->vertex_id), copy_group);
|
||||
}
|
||||
|
||||
igraph_copy(&(copy_group_topo->group_graph), &(src_group_topo->group_graph));
|
||||
copy_group_topo->grp_vertex_id_generator = src_group_topo->grp_vertex_id_generator;
|
||||
|
||||
return copy_group_topo;
|
||||
}
|
||||
|
||||
void *group2group_runtime_new(void *g2g_schema, size_t max_thread_num,
|
||||
struct maat_garbage_bin *garbage_bin,
|
||||
struct log_handle *logger)
|
||||
{
|
||||
if (NULL == g2g_schema) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
struct group2group_runtime *g2g_rt = ALLOC(struct group2group_runtime, 1);
|
||||
|
||||
g2g_rt->group_topo = maat_group_topology_new(logger);
|
||||
g2g_rt->ref_garbage_bin = garbage_bin;
|
||||
g2g_rt->logger = logger;
|
||||
|
||||
return g2g_rt;
|
||||
}
|
||||
|
||||
void group2group_runtime_free(void *g2g_runtime)
|
||||
{
|
||||
if (NULL == g2g_runtime) {
|
||||
@@ -184,33 +225,19 @@ void group2group_runtime_free(void *g2g_runtime)
|
||||
|
||||
struct group2group_runtime *g2g_rt = (struct group2group_runtime *)g2g_runtime;
|
||||
|
||||
pthread_rwlock_wrlock(&g2g_rt->rwlock);
|
||||
if (g2g_rt->group_topo != NULL) {
|
||||
maat_group_topology_free(g2g_rt->group_topo);
|
||||
g2g_rt->group_topo = NULL;
|
||||
}
|
||||
pthread_rwlock_unlock(&g2g_rt->rwlock);
|
||||
pthread_rwlock_destroy(&g2g_rt->rwlock);
|
||||
|
||||
if (g2g_rt->updating_group_topo != NULL) {
|
||||
maat_group_topology_free(g2g_rt->updating_group_topo);
|
||||
g2g_rt->updating_group_topo = NULL;
|
||||
}
|
||||
|
||||
FREE(g2g_rt);
|
||||
}
|
||||
|
||||
void maat_group_ref_inc(struct group2group_runtime *g2g_rt, struct maat_group *group)
|
||||
{
|
||||
pthread_rwlock_wrlock(&(g2g_rt->rwlock));
|
||||
g2g_rt->updating_flag = 1;
|
||||
group->ref_by_compile_cnt++;
|
||||
pthread_rwlock_unlock(&(g2g_rt->rwlock));
|
||||
}
|
||||
|
||||
void maat_group_ref_dec(struct group2group_runtime *g2g_rt, struct maat_group *group)
|
||||
{
|
||||
pthread_rwlock_wrlock(&(g2g_rt->rwlock));
|
||||
g2g_rt->updating_flag = 1;
|
||||
group->ref_by_compile_cnt--;
|
||||
pthread_rwlock_unlock(&(g2g_rt->rwlock));
|
||||
}
|
||||
|
||||
struct group2group_item *
|
||||
group2group_item_new(const char *line, struct group2group_schema *g2g_schema,
|
||||
const char *table_name, struct log_handle *logger)
|
||||
@@ -260,15 +287,9 @@ size_t print_igraph_vector(igraph_vector_t *v, char *buff, size_t sz) {
|
||||
return printed;
|
||||
}
|
||||
|
||||
struct maat_group *_group2group_runtime_add_group(void *g2g_runtime, long long group_id, int lock_flag)
|
||||
struct maat_group *group_topology_add_group(struct maat_group_topology *group_topo,
|
||||
long long group_id)
|
||||
{
|
||||
struct group2group_runtime *g2g_rt = (struct group2group_runtime *)g2g_runtime;
|
||||
|
||||
if (1 == lock_flag) {
|
||||
pthread_rwlock_wrlock(&(g2g_rt->rwlock));
|
||||
}
|
||||
|
||||
struct maat_group_topology *group_topo = g2g_rt->group_topo;
|
||||
assert(group_topo != NULL);
|
||||
|
||||
struct maat_group *group = ALLOC(struct maat_group, 1);
|
||||
@@ -280,37 +301,22 @@ struct maat_group *_group2group_runtime_add_group(void *g2g_runtime, long long g
|
||||
|
||||
HASH_ADD(hh_group_id, group_topo->hash_group_by_id, group_id, sizeof(group->group_id), group);
|
||||
HASH_ADD(hh_vertex_id, group_topo->hash_group_by_vertex, vertex_id, sizeof(group->vertex_id), group);
|
||||
g2g_rt->updating_flag = 1;
|
||||
if (1 == lock_flag) {
|
||||
pthread_rwlock_unlock(&(g2g_rt->rwlock));
|
||||
}
|
||||
|
||||
return group;
|
||||
}
|
||||
|
||||
struct maat_group *group2group_runtime_add_group(void *g2g_runtime, long long group_id)
|
||||
void group_topology_remove_group(struct maat_group_topology *group_topo,
|
||||
struct maat_group *group)
|
||||
{
|
||||
if (NULL == g2g_runtime) {
|
||||
return NULL;
|
||||
if (NULL == group_topo || NULL == group) {
|
||||
return;
|
||||
}
|
||||
|
||||
return _group2group_runtime_add_group(g2g_runtime, group_id, 1);
|
||||
}
|
||||
|
||||
void _group2group_runtime_remove_group(void *g2g_runtime, struct maat_group *group, int lock_flag)
|
||||
{
|
||||
igraph_vector_t v;
|
||||
char buff[4096] = {0};
|
||||
struct group2group_runtime *g2g_rt = (struct group2group_runtime *)g2g_runtime;
|
||||
|
||||
if (1 == lock_flag) {
|
||||
pthread_rwlock_wrlock(&(g2g_rt->rwlock));
|
||||
}
|
||||
|
||||
struct maat_group_topology *group_topo = g2g_rt->group_topo;
|
||||
assert(group_topo != NULL);
|
||||
assert(group->ref_by_super_group_cnt == 0);
|
||||
|
||||
assert(group->ref_by_compile_cnt == 0 && group->ref_by_super_group_cnt == 0);
|
||||
igraph_vector_init(&v, 8);
|
||||
igraph_neighbors(&group_topo->group_graph, &v, group->vertex_id, IGRAPH_ALL);
|
||||
if (igraph_vector_size(&v) > 0) {
|
||||
@@ -330,86 +336,45 @@ void _group2group_runtime_remove_group(void *g2g_runtime, struct maat_group *gro
|
||||
HASH_DELETE(hh_vertex_id, group_topo->hash_group_by_vertex, group);
|
||||
|
||||
group_vertex_free(group);
|
||||
g2g_rt->updating_flag = 1;
|
||||
if (1 == lock_flag) {
|
||||
pthread_rwlock_unlock(&(g2g_rt->rwlock));
|
||||
}
|
||||
}
|
||||
|
||||
void group2group_runtime_remove_group(void *g2g_runtime, struct maat_group *group)
|
||||
struct maat_group *group_topology_find_group(struct maat_group_topology *group_topo,
|
||||
long long group_id)
|
||||
{
|
||||
if (NULL == g2g_runtime || NULL == group) {
|
||||
return;
|
||||
}
|
||||
|
||||
_group2group_runtime_remove_group(g2g_runtime, group, 1);
|
||||
}
|
||||
|
||||
struct maat_group *_group2group_runtime_find_group(void *g2g_runtime, long long group_id, int lock_flag)
|
||||
{
|
||||
if (NULL == g2g_runtime) {
|
||||
if (NULL == group_topo || group_id < 0) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
struct group2group_runtime *g2g_rt = (struct group2group_runtime *)g2g_runtime;
|
||||
if (1 == lock_flag) {
|
||||
pthread_rwlock_rdlock(&(g2g_rt->rwlock));
|
||||
}
|
||||
|
||||
struct maat_group_topology *group_topo = g2g_rt->group_topo;
|
||||
assert(group_topo != NULL);
|
||||
|
||||
struct maat_group *group = NULL;
|
||||
HASH_FIND(hh_group_id, group_topo->hash_group_by_id, &group_id, sizeof(group_id), group);
|
||||
|
||||
if (1 == lock_flag) {
|
||||
pthread_rwlock_unlock(&(g2g_rt->rwlock));
|
||||
}
|
||||
|
||||
return group;
|
||||
}
|
||||
|
||||
struct maat_group *group2group_runtime_find_group(void *g2g_runtime, long long group_id)
|
||||
int group_topology_add_group_to_group(struct maat_group_topology *group_topo,
|
||||
long long group_id, long long super_group_id)
|
||||
{
|
||||
if (NULL == g2g_runtime) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return _group2group_runtime_find_group(g2g_runtime, group_id, 1);
|
||||
}
|
||||
|
||||
int group2group_runtime_add_group_to_group(void *g2g_runtime, long long group_id,
|
||||
long long super_group_id)
|
||||
{
|
||||
if (NULL == g2g_runtime) {
|
||||
if (NULL == group_topo) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
int ret = 0;
|
||||
igraph_integer_t edge_id;
|
||||
struct group2group_runtime *g2g_rt = (struct group2group_runtime *)g2g_runtime;
|
||||
|
||||
pthread_rwlock_wrlock(&(g2g_rt->rwlock));
|
||||
struct maat_group_topology *group_topo = g2g_rt->group_topo;
|
||||
assert(group_topo != NULL);
|
||||
|
||||
struct maat_group *group = _group2group_runtime_find_group(g2g_runtime, group_id, 0);
|
||||
|
||||
struct maat_group *group = group_topology_find_group(group_topo, group_id);
|
||||
if (NULL == group) {
|
||||
group = _group2group_runtime_add_group(g2g_runtime, group_id, 0);
|
||||
group = group_topology_add_group(group_topo, group_id);
|
||||
}
|
||||
|
||||
struct maat_group *super_group = _group2group_runtime_find_group(g2g_runtime,
|
||||
super_group_id, 0);
|
||||
struct maat_group *super_group = group_topology_find_group(group_topo, super_group_id);
|
||||
if (NULL == super_group) {
|
||||
super_group = _group2group_runtime_add_group(g2g_runtime, super_group_id, 0);
|
||||
super_group = group_topology_add_group(group_topo, super_group_id);
|
||||
}
|
||||
|
||||
ret = igraph_get_eid(&group_topo->group_graph, &edge_id, group->vertex_id,
|
||||
igraph_integer_t edge_id;
|
||||
int ret = igraph_get_eid(&group_topo->group_graph, &edge_id, group->vertex_id,
|
||||
super_group->vertex_id, IGRAPH_DIRECTED, /*error*/ 0);
|
||||
|
||||
//No duplicated edges between two groups.
|
||||
if (edge_id > 0) {
|
||||
log_error(g2g_rt->logger, MODULE_GROUP,
|
||||
log_error(group_topo->logger, MODULE_GROUP,
|
||||
"[%s:%d] Add group %d to group %d failed, relation already exisited.",
|
||||
__FUNCTION__, __LINE__, group->group_id, super_group->group_id);
|
||||
ret = -1;
|
||||
@@ -420,45 +385,36 @@ int group2group_runtime_add_group_to_group(void *g2g_runtime, long long group_id
|
||||
super_group->ref_by_sub_group_cnt++;
|
||||
ret = 0;
|
||||
}
|
||||
|
||||
g2g_rt->updating_flag = 1;
|
||||
pthread_rwlock_unlock(&(g2g_rt->rwlock));
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int group2group_runtime_remove_group_from_group(void *g2g_runtime, long long group_id,
|
||||
long long super_group_id)
|
||||
int group_topology_remove_group_from_group(struct maat_group_topology *group_topo,
|
||||
long long group_id, long long super_group_id)
|
||||
{
|
||||
if (NULL == g2g_runtime) {
|
||||
if (NULL == group_topo) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
struct group2group_runtime *g2g_rt = (struct group2group_runtime *)g2g_runtime;
|
||||
|
||||
pthread_rwlock_wrlock(&(g2g_rt->rwlock));
|
||||
//No hash write operation, LOCK protection is unnecessary.
|
||||
struct maat_group *group = _group2group_runtime_find_group(g2g_runtime, group_id, 0);
|
||||
struct maat_group *group = group_topology_add_group(group_topo, group_id);
|
||||
if (NULL == group) {
|
||||
log_error(g2g_rt->logger, MODULE_GROUP,
|
||||
log_error(group_topo->logger, MODULE_GROUP,
|
||||
"[%s:%d] Del group %d from group %d failed, group %d not exisited.",
|
||||
__FUNCTION__, __LINE__, group_id, super_group_id, group_id);
|
||||
pthread_rwlock_unlock(&(g2g_rt->rwlock));
|
||||
return -1;
|
||||
}
|
||||
|
||||
struct maat_group *super_group = _group2group_runtime_find_group(g2g_runtime,
|
||||
super_group_id, 0);
|
||||
struct maat_group *super_group = group_topology_add_group(group_topo, super_group_id);
|
||||
if (NULL == super_group) {
|
||||
log_error(g2g_rt->logger, MODULE_GROUP,
|
||||
log_error(group_topo->logger, MODULE_GROUP,
|
||||
"[%s:%d] Del group %d from group %d failed, superior group %d not exisited.",
|
||||
__FUNCTION__, __LINE__, group_id, super_group_id, super_group_id);
|
||||
pthread_rwlock_unlock(&(g2g_rt->rwlock));
|
||||
return -1;
|
||||
}
|
||||
|
||||
igraph_es_t es;
|
||||
igraph_integer_t edge_num_before = 0, edge_num_after = 0;
|
||||
struct maat_group_topology *group_topo = g2g_rt->group_topo;
|
||||
|
||||
edge_num_before = igraph_ecount(&group_topo->group_graph);
|
||||
// The edges between the given pairs of vertices will be included in the edge selection.
|
||||
@@ -477,15 +433,12 @@ int group2group_runtime_remove_group_from_group(void *g2g_runtime, long long gro
|
||||
igraph_es_destroy(&es);
|
||||
|
||||
if (ret != IGRAPH_SUCCESS || edge_num_before - edge_num_after != 1) {
|
||||
pthread_rwlock_unlock(&(g2g_rt->rwlock));
|
||||
assert(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
group->ref_by_super_group_cnt--;
|
||||
super_group->ref_by_sub_group_cnt--;
|
||||
g2g_rt->updating_flag = 1;
|
||||
pthread_rwlock_unlock(&(g2g_rt->rwlock));
|
||||
|
||||
return 0;
|
||||
}
|
||||
@@ -505,9 +458,9 @@ static size_t effective_vertices_count(igraph_vector_t *vids)
|
||||
return i;
|
||||
}
|
||||
|
||||
int group2group_runtime_build_top_groups(void *g2g_runtime, long long maat_rt_version)
|
||||
int group_topology_build_top_groups(struct maat_group_topology *group_topo)
|
||||
{
|
||||
if (NULL == g2g_runtime) {
|
||||
if (NULL == group_topo) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
@@ -516,18 +469,12 @@ int group2group_runtime_build_top_groups(void *g2g_runtime, long long maat_rt_ve
|
||||
int tmp_vid = 0;
|
||||
size_t top_group_cnt = 0;
|
||||
long long *temp_group_ids = NULL;
|
||||
struct group2group_runtime *g2g_rt = (struct group2group_runtime *)g2g_runtime;
|
||||
|
||||
pthread_rwlock_wrlock(&(g2g_rt->rwlock));
|
||||
struct maat_group_topology *group_topo = g2g_rt->group_topo;
|
||||
assert(group_topo != NULL);
|
||||
|
||||
igraph_bool_t is_dag;
|
||||
igraph_is_dag(&(group_topo->group_graph), &is_dag);
|
||||
if (!is_dag) {
|
||||
log_error(g2g_rt->logger, MODULE_GROUP, "[%s:%d] Sub group cycle detected!",
|
||||
log_error(group_topo->logger, MODULE_GROUP, "[%s:%d] Sub group cycle detected!",
|
||||
__FUNCTION__, __LINE__);
|
||||
pthread_rwlock_unlock(&(g2g_rt->rwlock));
|
||||
return -1;
|
||||
}
|
||||
|
||||
@@ -539,58 +486,48 @@ int group2group_runtime_build_top_groups(void *g2g_runtime, long long maat_rt_ve
|
||||
temp_group_ids = NULL;
|
||||
|
||||
//Orphan, Not reference by any one, free it.
|
||||
if (0 == group->ref_by_compile_cnt
|
||||
&& 0 == group->ref_by_super_group_cnt
|
||||
if (0 == group->ref_by_super_group_cnt
|
||||
&& 0 == group->ref_by_sub_group_cnt) {
|
||||
|
||||
FREE(group->top_group_ids);
|
||||
_group2group_runtime_remove_group(g2g_runtime, group, 0);
|
||||
group_topology_remove_group(group_topo, group);
|
||||
continue;
|
||||
}
|
||||
|
||||
//A group is need to build top groups when it has items and referenced by superior groups or compiles.
|
||||
if (group->ref_by_compile_cnt > 0 || group->ref_by_super_group_cnt > 0) {
|
||||
if (0 == group->ref_by_super_group_cnt) {
|
||||
//fast path, group is only referenced by compile rules.
|
||||
top_group_cnt = 1;
|
||||
temp_group_ids = ALLOC(long long, top_group_cnt);
|
||||
temp_group_ids[0] = group->group_id;
|
||||
} else {
|
||||
igraph_vector_t *vids = &(group_topo->dfs_vids);
|
||||
igraph_dfs(&group_topo->group_graph, group->vertex_id, IGRAPH_OUT,
|
||||
0, vids, NULL, NULL, NULL, NULL, NULL, NULL);
|
||||
|
||||
temp_group_ids = ALLOC(long long, effective_vertices_count(vids));
|
||||
|
||||
for (size_t i = 0; i < (size_t)igraph_vector_size(vids); i++) {
|
||||
tmp_vid = (int) VECTOR(*vids)[i];
|
||||
if (tmp_vid < 0) {
|
||||
break;
|
||||
}
|
||||
//A group is referenced by superior groups.
|
||||
if (group->ref_by_super_group_cnt > 0) {
|
||||
igraph_vector_t *vids = &(group_topo->dfs_vids);
|
||||
igraph_dfs(&group_topo->group_graph, group->vertex_id, IGRAPH_OUT,
|
||||
0, vids, NULL, NULL, NULL, NULL, NULL, NULL);
|
||||
|
||||
HASH_FIND(hh_vertex_id, group_topo->hash_group_by_vertex, &tmp_vid,
|
||||
sizeof(tmp_vid), super_group);
|
||||
temp_group_ids = ALLOC(long long, effective_vertices_count(vids));
|
||||
|
||||
//including itself
|
||||
if (super_group->ref_by_compile_cnt > 0) {
|
||||
temp_group_ids[top_group_cnt] = super_group->group_id;
|
||||
top_group_cnt++;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
for (size_t i = 0; i < (size_t)igraph_vector_size(vids); i++) {
|
||||
tmp_vid = (int)VECTOR(*vids)[i];
|
||||
if (tmp_vid < 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
free(group->top_group_ids);
|
||||
HASH_FIND(hh_vertex_id, group_topo->hash_group_by_vertex, &tmp_vid,
|
||||
sizeof(tmp_vid), super_group);
|
||||
|
||||
temp_group_ids[top_group_cnt] = super_group->group_id;
|
||||
top_group_cnt++;
|
||||
}
|
||||
}
|
||||
|
||||
FREE(group->top_group_ids);
|
||||
group->top_group_cnt = top_group_cnt;
|
||||
group->top_group_ids = ALLOC(long long, group->top_group_cnt);
|
||||
memcpy(group->top_group_ids, temp_group_ids, sizeof(long long)*group->top_group_cnt);
|
||||
|
||||
FREE(temp_group_ids);
|
||||
}
|
||||
igraph_vector_destroy(&group_topo->dfs_vids);
|
||||
g2g_rt->version = maat_rt_version;
|
||||
g2g_rt->updating_flag = 0;
|
||||
pthread_rwlock_unlock(&(g2g_rt->rwlock));
|
||||
if (top_group_cnt > 0) {
|
||||
group->top_group_ids = ALLOC(long long, group->top_group_cnt);
|
||||
memcpy(group->top_group_ids, temp_group_ids, sizeof(long long)*group->top_group_cnt);
|
||||
}
|
||||
|
||||
if (temp_group_ids != NULL) {
|
||||
FREE(temp_group_ids);
|
||||
}
|
||||
}
|
||||
igraph_vector_destroy(&(group_topo->dfs_vids));
|
||||
|
||||
return 0;
|
||||
}
|
||||
@@ -620,10 +557,16 @@ int group2group_runtime_update(void *g2g_runtime, void *g2g_schema,
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (0 == g2g_rt->updating_flag) {
|
||||
assert(g2g_rt->updating_group_topo == NULL);
|
||||
g2g_rt->updating_group_topo = maat_group_topology_clone(g2g_rt->group_topo);
|
||||
g2g_rt->updating_flag = 1;
|
||||
}
|
||||
|
||||
if (0 == is_valid) {
|
||||
//delete
|
||||
ret = group2group_runtime_remove_group_from_group(g2g_runtime, g2g_item->group_id,
|
||||
g2g_item->super_group_id);
|
||||
ret = group_topology_remove_group_from_group(g2g_rt->updating_group_topo, g2g_item->group_id,
|
||||
g2g_item->super_group_id);
|
||||
if (0 == ret) {
|
||||
g2g_rt->rule_num--;
|
||||
} else {
|
||||
@@ -631,8 +574,8 @@ int group2group_runtime_update(void *g2g_runtime, void *g2g_schema,
|
||||
}
|
||||
} else {
|
||||
//add
|
||||
ret = group2group_runtime_add_group_to_group(g2g_runtime, g2g_item->group_id,
|
||||
g2g_item->super_group_id);
|
||||
ret = group_topology_add_group_to_group(g2g_rt->updating_group_topo, g2g_item->group_id,
|
||||
g2g_item->super_group_id);
|
||||
if (0 == ret) {
|
||||
g2g_rt->rule_num++;
|
||||
} else {
|
||||
@@ -644,6 +587,12 @@ int group2group_runtime_update(void *g2g_runtime, void *g2g_schema,
|
||||
return ret;
|
||||
}
|
||||
|
||||
void garbage_maat_group_topology_free(void *data, void *arg)
|
||||
{
|
||||
struct maat_group_topology *group_topo = (struct maat_group_topology *)data;
|
||||
maat_group_topology_free(group_topo);
|
||||
}
|
||||
|
||||
int group2group_runtime_commit(void *g2g_runtime, const char *table_name, long long maat_rt_version)
|
||||
{
|
||||
if (NULL == g2g_runtime) {
|
||||
@@ -655,7 +604,7 @@ int group2group_runtime_commit(void *g2g_runtime, const char *table_name, long l
|
||||
return 0;
|
||||
}
|
||||
|
||||
int ret = group2group_runtime_build_top_groups(g2g_runtime, maat_rt_version);
|
||||
int ret = group_topology_build_top_groups(g2g_rt->updating_group_topo);
|
||||
if (ret < 0) {
|
||||
log_error(g2g_rt->logger, MODULE_GROUP,
|
||||
"[%s:%d] table[%s] group2group runtime commit failed",
|
||||
@@ -663,6 +612,15 @@ int group2group_runtime_commit(void *g2g_runtime, const char *table_name, long l
|
||||
return -1;
|
||||
}
|
||||
|
||||
struct maat_group_topology *old_group_topo = g2g_rt->group_topo;
|
||||
g2g_rt->group_topo = g2g_rt->updating_group_topo;
|
||||
g2g_rt->updating_group_topo = NULL;
|
||||
g2g_rt->updating_flag = 0;
|
||||
|
||||
maat_garbage_bagging(g2g_rt->ref_garbage_bin, old_group_topo, NULL,
|
||||
garbage_maat_group_topology_free);
|
||||
g2g_rt->version = maat_rt_version;
|
||||
|
||||
log_info(g2g_rt->logger, MODULE_GROUP,
|
||||
"table[%s] commit %zu g2g rules and rebuild top_groups completed, version:%lld",
|
||||
table_name, g2g_rt->rule_num, g2g_rt->version);
|
||||
@@ -700,9 +658,8 @@ int group2group_runtime_get_top_groups(void *g2g_runtime, long long *group_ids,
|
||||
size_t top_group_index = 0;
|
||||
struct group2group_runtime *g2g_rt = (struct group2group_runtime *)g2g_runtime;
|
||||
|
||||
pthread_rwlock_rdlock(&(g2g_rt->rwlock));
|
||||
for (size_t i = 0; i < n_group_ids; i++) {
|
||||
struct maat_group *group = _group2group_runtime_find_group(g2g_runtime, group_ids[i], 0);
|
||||
struct maat_group *group = group_topology_find_group(g2g_rt->group_topo, group_ids[i]);
|
||||
if (!group) {
|
||||
continue;
|
||||
}
|
||||
@@ -711,7 +668,6 @@ int group2group_runtime_get_top_groups(void *g2g_runtime, long long *group_ids,
|
||||
top_group_ids[top_group_index++] = group->top_group_ids[j];
|
||||
}
|
||||
}
|
||||
pthread_rwlock_unlock(&(g2g_rt->rwlock));
|
||||
|
||||
return top_group_index;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user