修复百度上不去问题

This commit is contained in:
崔一鸣
2019-06-09 20:00:44 +08:00
parent bfc9e83fd5
commit c510a81e01
5 changed files with 229 additions and 53 deletions

View File

@@ -146,8 +146,10 @@ struct kni_handle{
struct kni_maat_handle *maat_handle;
struct kni_send_logger *send_logger;
MESA_htable_handle traceid2pme_htable;
MESA_htable_handle keepalive_replay_htable;
int tfe_count;
uint32_t local_ipv4;
int keepalive_replay_switch;
void *local_logger;
};
@@ -156,6 +158,18 @@ struct traceid2pme_search_cb_args{
void *logger;
};
struct keepalive_replay_htable_value{
int has_replayed;
uint32_t first_data_len;
};
struct keepalive_replay_search_cb_args{
marsio_buff_t *rx_buff;
struct kni_marsio_handle *marsio_handle;
struct iphdr *raw_packet_iphdr;
int tfe_id;
};
static void pme_info_destroy(void *data){
struct pme_info *pmeinfo = (struct pme_info *)data;
void *logger = g_kni_handle->local_logger;
@@ -197,7 +211,7 @@ static struct pme_info* pme_info_new(const struct streaminfo *stream, int thread
KNI_LOG_ERROR(logger, "Failed at init pthread mutex, stream_traceid is %s", pmeinfo->stream_traceid);
goto error_out;
}
kni_stream_addr_trans((struct ipaddr*)(&stream->addr), stream_addr, sizeof(stream_addr));
kni_stream_addr_trans(&(stream->addr), stream_addr, sizeof(stream_addr));
KNI_LOG_INFO(logger, "stream addr is %s, stream traceid is %s", stream_addr, pmeinfo->stream_traceid);
//FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TOT_STM], 0, FS_OP_ADD, 1);
return pmeinfo;
@@ -314,7 +328,7 @@ static int sendlog_to_kafka(struct pme_info *pmeinfo, void *local_logger){
KNI_LOG_DEBUG(local_logger, "log_msg is %s\n", log_msg);
ret = kni_send_logger_sendlog(g_kni_handle->send_logger, log_msg, strlen(log_msg));
if(ret < 0){
KNI_LOG_ERROR(local_logger, "Failed at kni_send_logger_sendlog, ret is %d, strem_traceid is %s",
KNI_LOG_ERROR(local_logger, "Failed at knisend_logger_sendlog, ret is %d, strem_traceid is %s",
ret, pmeinfo->stream_traceid);
goto error_out;
}
@@ -553,7 +567,7 @@ static int send_to_tfe(struct kni_marsio_handle *handle, char *raw_data, int raw
struct mr_sendpath *dev_eth_sendpath = handle->tfe_instance_list[tfe_id]->dev_eth_sendpath;
char *src_mac = handle->src_mac_addr;
char *dst_mac = handle->tfe_instance_list[tfe_id]->mac_addr;
//only send one packet
//only send one packet, alloc_ret <= nr_send <= BURST_MAX
int nr_send = 1;
int alloc_ret = marsio_buff_malloc_device(dev_eth_handler, tx_buffs, nr_send, 0, thread_seq);
if (alloc_ret < 0){
@@ -601,7 +615,7 @@ static char data_opstate(const struct streaminfo *stream, struct pme_info *pmein
int len = pktinfo->ip_totlen;
int ret;
char stream_addr[KNI_SYMBOL_MAX] = "";
kni_stream_addr_trans((struct ipaddr*)(&stream->addr), stream_addr, sizeof(stream_addr));
kni_stream_addr_trans(&(stream->addr), stream_addr, sizeof(stream_addr));
//pmeinfo->action has only 3 value: KNI_ACTION_NONE KNI_ACTION_INTERCEPT KNI_ACTION_BYPASS
switch (pmeinfo->action){
case KNI_ACTION_NONE:
@@ -684,13 +698,17 @@ static char data_opstate(const struct streaminfo *stream, struct pme_info *pmein
return APP_STATE_FAWPKT | APP_STATE_DROPME;
}
int key_size;
struct stream_tuple4_v4 *c2s_key = NULL;
struct stream_tuple4_v4 s2c_key;
struct keepalive_replay_htable_value *c2s_value = NULL;
struct keepalive_replay_htable_value *s2c_value = NULL;
switch(pmeinfo->action){
case KNI_ACTION_BYPASS:
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1);
return APP_STATE_FAWPKT | APP_STATE_GIVEME;
case KNI_ACTION_INTERCEPT:
//only intercept: add to hash table
//only intercept: add to traceid2pme htable
key_size = strnlen(pmeinfo->stream_traceid, sizeof(pmeinfo->stream_traceid));
ret = MESA_htable_add(g_kni_handle->traceid2pme_htable, (const unsigned char *)(pmeinfo->stream_traceid),
key_size, (const void*)pmeinfo);
@@ -704,6 +722,45 @@ static char data_opstate(const struct streaminfo *stream, struct pme_info *pmein
"table is traceid2pme_htable, key is %s", pmeinfo->stream_traceid);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_ID2PME_ADD_SUCC], 0, FS_OP_ADD, 1);
}
//interctp: add to keepalive_replay_htable
//c2s
c2s_key = stream->addr.tuple4_v4;
key_size = sizeof(*c2s_key);
c2s_value = ALLOC(struct keepalive_replay_htable_value, 1);
c2s_value->first_data_len = pktinfo->data_len;
ret = MESA_htable_add(g_kni_handle->keepalive_replay_htable, (const unsigned char *)c2s_key,
key_size, (const void*)c2s_value);
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_htable: failed at MESA_htable_add,"
"table is keepalive_replay_htable, dir is c2s, stream is %s", stream_addr);
//FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_ID2PME_ADD_FAIL], 0, FS_OP_ADD, 1);
}
else{
KNI_LOG_DEBUG(logger, "MESA_htable: succeed at MESA_htable_add,"
"table is keepalive_replay_htable, dir is c2s, stream is %s", stream_addr);
//FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_ID2PME_ADD_SUCC], 0, FS_OP_ADD, 1);
}
//s2c
key_size = sizeof(s2c_key);
s2c_key.saddr = c2s_key->daddr;
s2c_key.daddr = c2s_key->saddr;
s2c_key.source = c2s_key->dest;
s2c_key.dest = c2s_key->source;
s2c_value = ALLOC(struct keepalive_replay_htable_value, 1);
ret = MESA_htable_add(g_kni_handle->keepalive_replay_htable, (const unsigned char *)(&s2c_key),
key_size, (const void*)s2c_value);
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_htable: failed at MESA_htable_add,"
"table is keepalive_replay_htable, dir is s2c, stream is %s", stream_addr);
//FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_ID2PME_ADD_FAIL], 0, FS_OP_ADD, 1);
}
else{
KNI_LOG_DEBUG(logger, "MESA_htable: succeed at MESA_htable_add,"
"table is keepalive_replay_htable, dir is s2c, stream is %s", stream_addr);
//FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_ID2PME_ADD_SUCC], 0, FS_OP_ADD, 1);
}
//action = KNI_ACTION_INTERCEPT, sendto tfe
buf = add_cmsg_to_packet(pmeinfo, pktinfo, &len);
ret = send_to_tfe(g_kni_handle->marsio_handle, buf, len, thread_seq, pmeinfo->tfe_id);
@@ -892,29 +949,114 @@ static void kni_marsio_destroy(struct kni_marsio_handle *handle){
handle = NULL;
}
static void sendto_vxlan(marsio_buff_t *rx_buff, struct mr_sendpath *dev_vxlan_sendpath, int thread_seq){
//tag
struct mr_tunnat_ctrlzone mr_ctrlzone;
memset(&mr_ctrlzone, 0, sizeof(mr_ctrlzone));
mr_ctrlzone.action |= (TUNNAT_CZ_ACTION_ENCAP_INNER | TUNNAT_CZ_ACTION_ENCAP_OUTER);
marsio_buff_ctrlzone_set(rx_buff, 0, &mr_ctrlzone, sizeof(struct mr_tunnat_ctrlzone));
//send to vxlan, vxlan handler: recv: 0, send: 1, nr_burst must be 1
int nr_burst = 1;
marsio_send_burst_with_options(dev_vxlan_sendpath, thread_seq, &rx_buff, nr_burst, MARSIO_SEND_OPT_FAST);
}
static long keepalive_replay_search_cb(void *data, const uchar *key, uint size, void *user_args){
void *logger = g_kni_handle->local_logger;
struct keepalive_replay_search_cb_args *args = (struct keepalive_replay_search_cb_args*)user_args;
struct kni_marsio_handle *marsio_handle = args->marsio_handle;
marsio_buff_t *rx_buff = args->rx_buff;
int tfe_id = args->tfe_id;
if(data == NULL){
sendto_vxlan(rx_buff, marsio_handle->dev_vxlan_sendpath, tfe_id);
return 0;
}
struct keepalive_replay_htable_value *value = (struct keepalive_replay_htable_value*)data;
if(value->has_replayed == 1){
sendto_vxlan(rx_buff, marsio_handle->dev_vxlan_sendpath, tfe_id);
return 0;
}
//raw_packet: window update
struct iphdr *raw_packet_iphdr = args->raw_packet_iphdr;
int tot_len = ntohs(raw_packet_iphdr->tot_len);
int iphdr_len = raw_packet_iphdr->ihl * 4;
struct tcphdr *raw_packet_tcphdr = (struct tcphdr*)((char*)raw_packet_iphdr + iphdr_len);
//replay packet
char *replay_packet = ALLOC(char, tot_len);
memcpy(replay_packet, (void*)raw_packet_iphdr, tot_len);
struct iphdr *replay_packet_iphdr = (struct iphdr*)replay_packet;
struct tcphdr *replay_packet_tcphdr = (struct tcphdr*)((char*)replay_packet_iphdr + iphdr_len);
replay_packet_iphdr->saddr = raw_packet_iphdr->daddr;
replay_packet_iphdr->daddr = raw_packet_iphdr->saddr;
replay_packet_tcphdr->source = raw_packet_tcphdr->dest;
replay_packet_tcphdr->dest = raw_packet_tcphdr->source;
replay_packet_tcphdr->seq = htonl(ntohl(raw_packet_tcphdr->ack_seq) + value->first_data_len); //seq = ack + first_data_len
replay_packet_tcphdr->ack_seq = htonl(ntohl(raw_packet_tcphdr->seq) + 1); //ack = seq + 1
replay_packet_iphdr->check = 0;
replay_packet_iphdr->check = kni_ip_checksum((void*)replay_packet_iphdr, iphdr_len);
replay_packet_tcphdr->check = 0;
replay_packet_tcphdr->check = kni_tcp_checksum((void*)replay_packet_tcphdr, tot_len - iphdr_len,
replay_packet_iphdr->saddr, replay_packet_iphdr->daddr);
//send to tfe: thread_seq = g_iThreadNum
int ret = send_to_tfe(marsio_handle, replay_packet, tot_len, g_iThreadNum, tfe_id);
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at send keepalive replay packet to tfe");
}
value->has_replayed = 1;
FREE(&replay_packet);
return 0;
}
void* thread_tfe_data_receiver(void *args){
struct thread_tfe_data_receiver_args *_args = (struct thread_tfe_data_receiver_args*)args;
struct kni_marsio_handle *marsio_handle = _args->marsio_handle;
int tfe_id = _args->tfe_id;
struct mr_vdev *dev_eth_handler = marsio_handle->tfe_instance_list[tfe_id]->dev_eth_handler;
FREE(&args);
marsio_buff_t * rx_buff[BURST_MAX];
marsio_buff_t *rx_buffs[BURST_MAX];
int nr_burst = 1;
//eth_handler: recv: 1
int thread_seq = 0;
while(true){
//receive from tfe
int nr_recv = marsio_recv_burst(dev_eth_handler, thread_seq, rx_buff, nr_burst);
//receive from tfe, nr_recv <= nr_burst <= BURST_MAX
int nr_recv = marsio_recv_burst(dev_eth_handler, thread_seq, rx_buffs, nr_burst);
if(nr_recv <= 0){
continue;
}
//tag
struct mr_tunnat_ctrlzone mr_ctrlzone;
mr_ctrlzone.action |= (TUNNAT_CZ_ACTION_ENCAP_INNER | TUNNAT_CZ_ACTION_ENCAP_OUTER);
for(int i = 0; i < nr_recv; i++){
marsio_buff_ctrlzone_set(rx_buff[i], 0, &mr_ctrlzone, sizeof(struct mr_tunnat_ctrlzone));
if(g_kni_handle->keepalive_replay_switch == 1){
for(int i = 0; i < nr_recv; i++){
struct ethhdr *ether_hdr = (struct ethhdr*)marsio_buff_mtod(rx_buffs[i]);
if(ether_hdr->h_proto == htons(ETH_P_IP)){
struct iphdr *iphdr = (struct iphdr*)((char*)ether_hdr + sizeof(*ether_hdr));
int iphdr_len = iphdr->ihl * 4;
struct tcphdr *tcphdr = (struct tcphdr*)((char*)iphdr + iphdr_len);
struct stream_tuple4_v4 key;
key.saddr = iphdr->saddr;
key.daddr = iphdr->daddr;
key.source = tcphdr->source;
key.dest = tcphdr->dest;
int key_size = sizeof(key);
long cb_ret = -1;
keepalive_replay_search_cb_args cb_args;
memset(&cb_args, 0, sizeof(cb_args));
cb_args.rx_buff = rx_buffs[i];
cb_args.marsio_handle = marsio_handle;
cb_args.raw_packet_iphdr = iphdr;
cb_args.tfe_id = tfe_id;
MESA_htable_search_cb(g_kni_handle->keepalive_replay_htable, (const unsigned char *)(&key),
key_size, keepalive_replay_search_cb, &cb_args, &cb_ret);
}
else{
sendto_vxlan(rx_buffs[i], marsio_handle->dev_vxlan_sendpath, tfe_id);
}
}
}
else{
for(int i = 0; i < nr_recv; i++){
sendto_vxlan(rx_buffs[i], marsio_handle->dev_vxlan_sendpath, tfe_id);
}
}
//send to vxlan
marsio_send_burst_with_options(marsio_handle->dev_vxlan_sendpath, thread_seq, rx_buff, nr_recv, MARSIO_SEND_OPT_FAST);
}
return NULL;
}
@@ -1127,7 +1269,6 @@ static struct kni_marsio_handle* kni_marsio_init(const char* profile){
}
marsio_option_set(mr_inst, MARSIO_OPT_EXIT_WHEN_ERR, &opt_value, sizeof(opt_value));
marsio_init(mr_inst, appsym);
//eth_handler receive thread = 1, send thread = g_iThreadNum
tfe_count = g_kni_handle->tfe_count;
for(int i = 0; i < tfe_count; i++){
//load tfe conf
@@ -1157,8 +1298,8 @@ static struct kni_marsio_handle* kni_marsio_init(const char* profile){
}
KNI_LOG_INFO(logger, "MESA_prof_load, [%s]:\n mac_addr: %s\n dev_eth_symbol: %s",
_section, mac_addr_str, dev_eth_symbol);
//handler
dev_eth_handler = marsio_open_device(mr_inst, dev_eth_symbol, 1, g_iThreadNum);
//eth_handler receive thread = 1, send thread = g_iThreadNum + 1
dev_eth_handler = marsio_open_device(mr_inst, dev_eth_symbol, 1, g_iThreadNum + 1);
if(dev_eth_handler == NULL){
KNI_LOG_ERROR(logger, "Failed at marsio_open_device, dev_symbol is %s", dev_eth_symbol);
goto error_out;
@@ -1174,8 +1315,8 @@ static struct kni_marsio_handle* kni_marsio_init(const char* profile){
tfe_inst->dev_eth_sendpath = dev_eth_sendpath;
handle->tfe_instance_list[i] = tfe_inst;
}
//vxlan_handler: receive: 0 thread, send: 1
dev_vxlan_handler = marsio_open_device(mr_inst, dev_vxlan_symbol, 0, 1);
//vxlan_handler: receive: 0, send: tfe_count
dev_vxlan_handler = marsio_open_device(mr_inst, dev_vxlan_symbol, 0, tfe_count);
if(dev_vxlan_handler == NULL){
KNI_LOG_ERROR(logger, "Failed at marsio_open_device, dev_symbol is %s", dev_vxlan_symbol);
goto error_out;
@@ -1289,6 +1430,10 @@ static int traceid2pme_htable_expire_notify_cb(void *data, int eliminate_type){
return 0;
}
static void keepalive_replay_data_free_cb(void *data)
{
FREE(&data);
}
extern "C" int kni_init(){
const char *profile = "./conf/kni/kni.conf";
@@ -1303,8 +1448,9 @@ extern "C" int kni_init(){
void *local_logger = NULL;
int log_level = -1;
pthread_t thread_id = -1;
int keepalive_replay_switch = -1;
struct thread_tfe_cmsg_receiver_args *cmsg_receiver_args;
MESA_htable_handle traceid2pme_htable = NULL;
MESA_htable_handle traceid2pme_htable = NULL, keepalive_replay_htable = NULL;
int ret = MESA_load_profile_string_nodef(profile, section, "log_path", log_path, sizeof(log_path));
if(ret < 0){
printf("MESA_prof_load: log_path not set, profile is %s, section is %s", profile, section);
@@ -1338,11 +1484,17 @@ extern "C" int kni_init(){
printf("MESA_prof_load: local_eth not set, profile is %s, section is %s", profile, section);
goto error_out;
}
KNI_LOG_INFO(local_logger, "MESA_prof_load, [%s]:\n log_path: %s\n log_level: %d\n tfe_count: %d\n local_eth: %s",
section, log_path, log_level, tfe_count, local_eth);
ret = MESA_load_profile_int_nodef(profile, section, "keepalive_replay_switch", &keepalive_replay_switch);
if(ret < 0){
printf("MESA_prof_load: keepalive_replay_switch not set, profile is %s, section is %s", profile, section);
goto error_out;
}
KNI_LOG_INFO(local_logger, "MESA_prof_load, [%s]:\n log_path: %s\n log_level: %d\n tfe_count: %d\n local_eth: %s\n keepalive_replay_switch: %d",
section, log_path, log_level, tfe_count, local_eth, keepalive_replay_switch);
g_kni_handle = ALLOC(struct kni_handle, 1);
g_kni_handle->local_logger = local_logger;
g_kni_handle->tfe_count = tfe_count;
g_kni_handle->keepalive_replay_switch = keepalive_replay_switch;
//init http_project
id = http_project_init();
@@ -1359,31 +1511,6 @@ extern "C" int kni_init(){
goto error_out;
}
//create thread_tfe_data_receiver
for(int i = 0; i < tfe_count; i++){
struct thread_tfe_data_receiver_args *args = ALLOC(struct thread_tfe_data_receiver_args, 1);
args->logger = local_logger;
args->marsio_handle = g_kni_handle->marsio_handle;
args->tfe_id = i;
int ret = pthread_create(&thread_id, NULL, thread_tfe_data_receiver, (void *)args);
if(unlikely(ret != 0)){
KNI_LOG_ERROR(local_logger, "Failed at pthread_create, thread_func is thread_tfe_data_receiver, ret is %d", ret);
FREE(&args);
goto error_out;
}
}
//create thread_tfe_cmsg_receiver
cmsg_receiver_args = ALLOC(struct thread_tfe_cmsg_receiver_args, 1);
cmsg_receiver_args->logger = local_logger;
strncpy(cmsg_receiver_args->profile, profile, strnlen(profile, sizeof(cmsg_receiver_args->profile) - 1));
ret = pthread_create(&thread_id, NULL, thread_tfe_cmsg_receiver, (void *)cmsg_receiver_args);
if(unlikely(ret != 0)){
KNI_LOG_ERROR(local_logger, "Failed at pthread_create, thread_func is thread_tfe_cmsg_receiver, ret is %d", ret);
FREE(&cmsg_receiver_args);
goto error_out;
}
//init maat
g_kni_handle->maat_handle = kni_maat_init(profile, local_logger);
if(g_kni_handle->maat_handle == NULL){
@@ -1422,6 +1549,42 @@ extern "C" int kni_init(){
goto error_out;
}
g_kni_handle->traceid2pme_htable = traceid2pme_htable;
//init keepalive_replay_htable
if(g_kni_handle->keepalive_replay_switch == 1){
keepalive_replay_htable = kni_create_htable(profile, "keepalive_replay_htable", (void*)keepalive_replay_data_free_cb,
NULL, local_logger);
if(keepalive_replay_htable == NULL){
KNI_LOG_ERROR(local_logger, "Failed at create keepalive_replay_htable");
goto error_out;
}
g_kni_handle->keepalive_replay_htable = keepalive_replay_htable;
}
//create thread_tfe_data_receiver
for(int i = 0; i < tfe_count; i++){
struct thread_tfe_data_receiver_args *args = ALLOC(struct thread_tfe_data_receiver_args, 1);
args->logger = local_logger;
args->marsio_handle = g_kni_handle->marsio_handle;
args->tfe_id = i;
int ret = pthread_create(&thread_id, NULL, thread_tfe_data_receiver, (void *)args);
if(unlikely(ret != 0)){
KNI_LOG_ERROR(local_logger, "Failed at pthread_create, thread_func is thread_tfe_data_receiver, ret is %d", ret);
FREE(&args);
goto error_out;
}
}
//create thread_tfe_cmsg_receiver
cmsg_receiver_args = ALLOC(struct thread_tfe_cmsg_receiver_args, 1);
cmsg_receiver_args->logger = local_logger;
strncpy(cmsg_receiver_args->profile, profile, strnlen(profile, sizeof(cmsg_receiver_args->profile) - 1));
ret = pthread_create(&thread_id, NULL, thread_tfe_cmsg_receiver, (void *)cmsg_receiver_args);
if(unlikely(ret != 0)){
KNI_LOG_ERROR(local_logger, "Failed at pthread_create, thread_func is thread_tfe_cmsg_receiver, ret is %d", ret);
FREE(&cmsg_receiver_args);
goto error_out;
}
return 0;
error_out: