多线程访问pme的时候增加锁

This commit is contained in:
崔一鸣
2019-06-06 17:07:17 +08:00
parent 963d780226
commit 1b03b6d736
5 changed files with 162 additions and 71 deletions

View File

@@ -7,6 +7,7 @@
#include "uuid/uuid.h"
#include "cjson/cJSON.h"
#include "kni_send_logger.h"
#include<pthread.h>
extern int g_iThreadNum;
@@ -22,6 +23,7 @@ struct kni_field_stat_handle *g_kni_fs_handle = NULL;
/* TODO:
1. con_duration_ms: how to calculate
2. sapp return dropme之后还会继续调用close吗
*/
enum kni_protocol{
@@ -30,13 +32,20 @@ enum kni_protocol{
KNI_PROTOCOL_HTTP,
};
enum stream_error{
STREAM_ERROR_PENDING_NO_SYN = -1,
STREAM_ERROR_SINGLE_DIR = -2,
STREAM_ERROR_PROTOCOL_UNKNOWN = -3,
STREAM_ERROR_NO_SYN_ACK = -4,
STREAM_ERROR_INVALID_ACTION = -5,
};
struct http_project{
int host_len;
char host[KNI_DOMAIN_MAX];
};
struct pme_info{
int ref_cnt;
int protocol;
int policy_id;
int maat_hit;
@@ -47,7 +56,8 @@ struct pme_info{
uint16_t client_window;
uint16_t server_window;
int tfe_id;
void *logger;
pthread_mutex_t lock;
enum stream_error error;
char stream_trace_id[STREAM_TRACE_ID_LEN];
char host[KNI_DOMAIN_MAX]; //http only
char sni[KNI_DOMAIN_MAX]; //ssl only
@@ -144,18 +154,51 @@ struct traceid2pme_search_cb_args{
void *logger;
};
static struct pme_info* pme_info_new(const struct streaminfo *stream, int thread_seq, void *logger){
static void pme_info_destroy(void *data){
struct pme_info *pmeinfo = (struct pme_info *)data;
void *logger = g_kni_handle->local_logger;
if(pmeinfo != NULL){
//free client_tcpopt
if(pmeinfo->client_tcpopt != NULL){
FREE(&(pmeinfo->client_tcpopt));
}
//free server tcpopt
if(pmeinfo->server_tcpopt != NULL){
FREE(&(pmeinfo->server_tcpopt));
}
//free layer_addr
layer_addr_free(pmeinfo->addr);
pmeinfo->addr=NULL;
//free lock
pthread_mutex_destroy(&(pmeinfo->lock));
FREE(&pmeinfo);
}
else{
KNI_LOG_ERROR(logger, "Failed at pme_info_destroy, pmeinfo is null");
}
}
static struct pme_info* pme_info_new(const struct streaminfo *stream, int thread_seq){
void *logger = g_kni_handle->local_logger;
struct pme_info* pmeinfo = ALLOC(struct pme_info, 1);
pmeinfo->ref_cnt=1;
pmeinfo->tfe_id = g_kni_handle->tfe_count > 0 ? thread_seq % g_kni_handle->tfe_count : -1;
uuid_t uu;
uuid_generate_random(uu);
uuid_unparse(uu, pmeinfo->stream_trace_id);
pmeinfo->addr = layer_addr_dup(&(stream->addr));
pmeinfo->start_time = time(NULL);
pmeinfo->logger = logger;
//init pme_lock
int ret = pthread_mutex_init(&(pmeinfo->lock), NULL);
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at init pthread mutex");
goto error_out;
}
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TOT_STM], 0, FS_OP_ADD, 1);
return pmeinfo;
error_out:
pme_info_destroy(pmeinfo);
return NULL;
}
static int sendlog_to_kafka(struct pme_info *pmeinfo, void *local_logger){
@@ -274,36 +317,32 @@ error_out:
return -1;
}
static void pme_info_destroy(struct pme_info *pmeinfo){
void *logger = pmeinfo->logger;
assert(pmeinfo->ref_cnt<=2);
pmeinfo->ref_cnt--;
if(pmeinfo->ref_cnt == 0){
int ret = sendlog_to_kafka(pmeinfo, logger);
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at sendlog to kafka");
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_SENDLOG_FAIL], 0, FS_OP_ADD, 1);
//return 0: has been destroy
static int judge_pme_destroy(struct pme_info *pmeinfo){
void *logger = g_kni_handle->local_logger;
if(pmeinfo != NULL){
void *logger = g_kni_handle->local_logger;
if(pmeinfo->sapp_release == 1 && pmeinfo->tfe_release == 1){
int ret = sendlog_to_kafka(pmeinfo, logger);
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at sendlog to kafka, stream_trace_id is %s", pmeinfo->stream_trace_id);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_SENDLOG_FAIL], 0, FS_OP_ADD, 1);
}
else{
KNI_LOG_DEBUG(logger, "Succeed sendlog to kafka, stream_trace_id is %s", pmeinfo->stream_trace_id);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_SENDLOG_SUCC], 0, FS_OP_ADD, 1);
}
pme_info_destroy(pmeinfo);
return 0;
}
else{
KNI_LOG_INFO(logger, "Succeed sendlog to kafka");
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_SENDLOG_SUCC], 0, FS_OP_ADD, 1);
KNI_LOG_DEBUG(logger, "can not destroy pmeinfo, sapp_release = %d, tfe_release = %d", pmeinfo->sapp_release, pmeinfo->tfe_release);
}
if(pmeinfo->client_tcpopt != NULL){
FREE(&(pmeinfo->client_tcpopt));
}
if(pmeinfo->server_tcpopt != NULL){
FREE(&(pmeinfo->server_tcpopt));
}
layer_addr_free(pmeinfo->addr);
pmeinfo->addr=NULL;
FREE(&pmeinfo);
}
/* TODO: segment fault
else{
KNI_LOG_DEBUG(logger, "can not free pmeinfo, sapp_release is %d, tfe_release is %d",
pmeinfo->sapp_release, pmeinfo->tfe_release);
KNI_LOG_ERROR(logger, "Failed at judge_pme_info, pmeinfo is null");
}
*/
return -1;
}
static int protocol_identify(const struct streaminfo* stream, char *buf, int len, struct protocol_identify_result *result){
@@ -500,7 +539,7 @@ static char pending_opstate(const struct streaminfo *stream, struct pme_info *pm
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_NO_SYN_EXP], 0, FS_OP_ADD, 1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
pmeinfo->tfe_release = 1;
pmeinfo->error = STREAM_ERROR_PENDING_NO_SYN;
return APP_STATE_FAWPKT | APP_STATE_DROPME;
}
pmeinfo->client_window = pktinfo->tcphdr->window;
@@ -533,7 +572,6 @@ static char data_opstate(const struct streaminfo *stream, struct pme_info *pmein
return APP_STATE_DROPPKT | APP_STATE_GIVEME;
case KNI_ACTION_BYPASS:
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
pmeinfo->tfe_release = 1;
return APP_STATE_FAWPKT | APP_STATE_GIVEME;
default:
break;
@@ -554,7 +592,7 @@ static char data_opstate(const struct streaminfo *stream, struct pme_info *pmein
KNI_LOG_INFO(logger, "dir is %d, bypass, stream addr is %s", stream->dir, stream_addr);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
pmeinfo->tfe_release = 1;
pmeinfo->error = STREAM_ERROR_SINGLE_DIR;
return APP_STATE_FAWPKT | APP_STATE_DROPME;
}
struct protocol_identify_result protocol_identify_res;
@@ -569,7 +607,7 @@ static char data_opstate(const struct streaminfo *stream, struct pme_info *pmein
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_UNKNOWN_STM], 0, FS_OP_ADD, 1);
pmeinfo->tfe_release = 1;
pmeinfo->error = STREAM_ERROR_PROTOCOL_UNKNOWN;
return APP_STATE_FAWPKT | APP_STATE_DROPME;
case KNI_PROTOCOL_SSL:
strncpy(pmeinfo->sni, protocol_identify_res.domain, strnlen(protocol_identify_res.domain, sizeof(pmeinfo->sni) - 1));
@@ -597,7 +635,7 @@ static char data_opstate(const struct streaminfo *stream, struct pme_info *pmein
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_NO_SA_EXP], 0, FS_OP_ADD, 1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1);
pmeinfo->tfe_release = 1;
pmeinfo->error = STREAM_ERROR_NO_SYN_ACK;
return APP_STATE_FAWPKT | APP_STATE_DROPME;
}
switch(pmeinfo->action){
@@ -625,16 +663,12 @@ static char data_opstate(const struct streaminfo *stream, struct pme_info *pmein
pmeinfo->action, action_str, pmeinfo->policy_id, stream_addr, protocol_identify_res.domain);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1);
pmeinfo->tfe_release = 1;
pmeinfo->error = STREAM_ERROR_INVALID_ACTION;
return APP_STATE_FAWPKT | APP_STATE_DROPME;
}
}
static char close_opstate(const struct streaminfo *stream, struct pme_info *pmeinfo, struct pkt_info *pktinfo, int thread_seq){
//pmeinfo->tfe_release = 1: intercept, tfe end first. so droppkt and dropme
if(pmeinfo->tfe_release == 1){
return APP_STATE_DROPPKT | APP_STATE_DROPME;
}
//close: also need to send to tfe
pmeinfo->end_time = time(NULL);
void *logger = g_kni_handle->local_logger;
@@ -663,9 +697,17 @@ static char close_opstate(const struct streaminfo *stream, struct pme_info *pmei
MESA_htable_search(g_kni_handle->traceid2pme_htable, (const unsigned char*)pmeinfo->stream_trace_id,
strnlen(pmeinfo->stream_trace_id, sizeof(pmeinfo->stream_trace_id)));
return APP_STATE_DROPPKT | APP_STATE_DROPME;
default:
case KNI_ACTION_BYPASS:
pmeinfo->tfe_release = 1;
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
return APP_STATE_FAWPKT | APP_STATE_DROPME;
//will not happen, close has only 2 action:
default:
char action_str[KNI_SYMBOL_MAX];
kni_maat_action_trans(pmeinfo->action, action_str);
KNI_LOG_ERROR(logger, "close_opstate: action %d(%s) is abnormal",
pmeinfo->action, action_str);
return APP_STATE_FAWPKT | APP_STATE_DROPME;
}
}
@@ -673,6 +715,10 @@ static char close_opstate(const struct streaminfo *stream, struct pme_info *pmei
extern "C" char kni_tcpall_entry(const struct streaminfo *stream, void** pme, int thread_seq, const void* a_packet){
void *logger = g_kni_handle->local_logger;
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TOT_PKT], 0, FS_OP_ADD, 1);
int ret, key_size;
struct pme_info *pmeinfo = *(struct pme_info **)pme;
//pktinfo
struct pkt_info pktinfo;
//TODO: ipv6
if(stream->addr.addrtype == ADDR_TYPE_IPV6){
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_IPV6_PKT], 0, FS_OP_ADD, 1);
@@ -685,38 +731,56 @@ extern "C" char kni_tcpall_entry(const struct streaminfo *stream, void** pme, in
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
return APP_STATE_FAWPKT | APP_STATE_GIVEME;
}
struct pme_info *pmeinfo = *(struct pme_info **)pme;
//pktinfo
struct pkt_info *pktinfo = (struct pkt_info*)ALLOC(struct pkt_info, 1);
pktinfo->iphdr = (struct iphdr*)a_packet;
pktinfo->iphdr_len = pktinfo->iphdr->ihl * 4;
pktinfo->ip_totlen = ntohs(pktinfo->iphdr->tot_len);
pktinfo->tcphdr = (struct tcphdr*)((char*)pktinfo->iphdr + pktinfo->iphdr_len);
pktinfo->tcphdr_len = pktinfo->tcphdr->doff * 4;
pktinfo->data = (char*)pktinfo->tcphdr + pktinfo->tcphdr_len;
pktinfo->data_len = pktinfo->ip_totlen - pktinfo->iphdr_len - pktinfo->tcphdr_len;
int ret;
int key_size;
memset(&pktinfo, 0, sizeof(struct pkt_info));
pktinfo.iphdr = (struct iphdr*)a_packet;
pktinfo.iphdr_len = pktinfo.iphdr->ihl * 4;
pktinfo.ip_totlen = ntohs(pktinfo.iphdr->tot_len);
pktinfo.tcphdr = (struct tcphdr*)((char*)pktinfo.iphdr + pktinfo.iphdr_len);
pktinfo.tcphdr_len = pktinfo.tcphdr->doff * 4;
pktinfo.data = (char*)pktinfo.tcphdr + pktinfo.tcphdr_len;
pktinfo.data_len = pktinfo.ip_totlen - pktinfo.iphdr_len - pktinfo.tcphdr_len;
/* for debug
if(stream->pktstate == 2){
printf("stream->pktstate is %d\n", stream->pktstate);
}
return APP_STATE_FAWPKT | APP_STATE_GIVEME;
*/
switch(stream->pktstate){
case OP_STATE_PENDING:
*pme = pmeinfo = pme_info_new(stream, thread_seq, logger);
pmeinfo = pme_info_new(stream, thread_seq);
if(pmeinfo == NULL){
KNI_LOG_ERROR(logger, "Failed at new pmeinfo");
return APP_STATE_FAWPKT | APP_STATE_DROPME;
}
*pme = pmeinfo;
key_size = strnlen(pmeinfo->stream_trace_id, sizeof(pmeinfo->stream_trace_id));
pmeinfo->ref_cnt++;
ret = MESA_htable_add(g_kni_handle->traceid2pme_htable, (const unsigned char *)(pmeinfo->stream_trace_id),
key_size, (const void*)pmeinfo);
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_htable: failed at MESA_htable_add,"
"table is traceid2pme_htable, key is %s", pmeinfo->stream_trace_id);
pme_info_destroy(pmeinfo);
return APP_STATE_FAWPKT | APP_STATE_DROPME;
}
//KNI_LOG_DEBUG(logger, "MESA_htable: succeed at MESA_htable_add, table is traceid2pme_htable, key is %s, key_size is %d",
// pmeinfo->stream_trace_id, key_size);
ret = pending_opstate(stream, pmeinfo, &pktinfo);
if(pmeinfo->error < 0){
goto error_out;
}
KNI_LOG_DEBUG(logger, "MESA_htable: succeed at MESA_htable_add, table is traceid2pme_htable, key is %s, key_size is %d",
pmeinfo->stream_trace_id, key_size);
ret = pending_opstate(stream, pmeinfo, pktinfo);
break;
case OP_STATE_DATA:
ret = data_opstate(stream, pmeinfo, pktinfo, thread_seq);
ret = data_opstate(stream, pmeinfo, &pktinfo, thread_seq);
//exception stream, dropme and destroy pmeinfo
if(pmeinfo->error < 0){
goto error_out;
}
break;
case OP_STATE_CLOSE:
ret = close_opstate(stream, pmeinfo, pktinfo, thread_seq);
//sapp stream close
ret = close_opstate(stream, pmeinfo, &pktinfo, thread_seq);
break;
default:
ret = APP_STATE_FAWPKT | APP_STATE_GIVEME;
@@ -725,11 +789,32 @@ extern "C" char kni_tcpall_entry(const struct streaminfo *stream, void** pme, in
KNI_LOG_ERROR(logger, "Unknown stream opstate %d", stream->pktstate);
break;
}
FREE(&pktinfo);
//close opstate or data_opstate(tfe_release = 1), need sendlog
if((ret & APP_STATE_DROPME)){
//lock
pthread_mutex_lock(&(pmeinfo->lock));
pmeinfo->sapp_release = 1;
pme_info_destroy(pmeinfo);
*pme = NULL;
int ret = judge_pme_destroy(pmeinfo);
if(ret < 0){
pthread_mutex_unlock(&(pmeinfo->lock));
}
}
return ret;
//error: so del htable, pmeinfo destroy. do not sendlog
error_out:
KNI_LOG_ERROR(logger, "stream error = %d, ret is %d", pmeinfo->error, ret);
if(pmeinfo != NULL){
key_size = strnlen(pmeinfo->stream_trace_id, sizeof(pmeinfo->stream_trace_id));
int ret1 = MESA_htable_del(g_kni_handle->traceid2pme_htable, (const unsigned char *)pmeinfo->stream_trace_id, key_size, pme_info_destroy);
if(ret1 < 0){
KNI_LOG_ERROR(logger, "MESA_htable: failed at del, table is %s, key is %s, key_size is %d, ret is %d",
"traceid2pme_htable", pmeinfo->stream_trace_id, key_size, ret1);
}
else{
KNI_LOG_DEBUG(logger, "MESA_htable: succeed at del, table is %s, key is %s, key_size is %d",
"traceid2pme_htable", pmeinfo->stream_trace_id, key_size);
}
}
return ret;
}
@@ -873,7 +958,6 @@ static long traceid2pme_htable_search_cb(void *data, const uchar *key, uint size
wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_PINNING_STATE, sizeof(pmeinfo->pinningst), logger);
wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_CERT_VERIFY, sizeof(pmeinfo->ssl_cert_verify), logger);
wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_ERROR, sizeof(pmeinfo->ssl_error), logger);
pmeinfo->tfe_release = 1;
pmeinfo->end_time = time(NULL);
int key_size = strnlen(pmeinfo->stream_trace_id, sizeof(pmeinfo->stream_trace_id));
int ret = MESA_htable_del(g_kni_handle->traceid2pme_htable, (const unsigned char *)pmeinfo->stream_trace_id,
@@ -964,7 +1048,7 @@ void* thread_tfe_cmsg_receiver(void *args){
//get pme
long cb_ret = -1;
struct traceid2pme_search_cb_args cb_args;
memset((void*)&cb_args, sizeof(cb_args), 0);
memset((void*)&cb_args, 0, sizeof(cb_args));
cb_args.cmsg = cmsg;
cb_args.logger = logger;
MESA_htable_search_cb(g_kni_handle->traceid2pme_htable, (const unsigned char *)stream_trace_id,
@@ -1177,7 +1261,12 @@ static void traceid2pme_htable_data_free_cb(void *data){
void *logger = g_kni_handle->local_logger;
KNI_LOG_DEBUG(logger, "call traceid2pme_htable_data_free_cb");
struct pme_info *pmeinfo = (struct pme_info*)data;
pme_info_destroy(pmeinfo);
pthread_mutex_lock(&(pmeinfo->lock));
pmeinfo->tfe_release = 1;
int ret = judge_pme_destroy(pmeinfo);
if(ret < 0){
pthread_mutex_unlock(&(pmeinfo->lock));
}
}
//eliminate_type: 0:FIFO; 1:LRU
@@ -1190,7 +1279,6 @@ static int traceid2pme_htable_expire_notify_cb(void *data, int eliminate_type){
return 0;
}
KNI_LOG_DEBUG(logger, "Succeed at eliminate pmeinfo, sapp_release is %d", pmeinfo->sapp_release);
pmeinfo->tfe_release = 1;
return 1;
}