Update lirenjie_vxlan_sapp.c,kafka初始化添加参数,修改函数push_data_to_kafka(功能待测)

This commit is contained in:
李仁杰
2019-07-31 20:06:52 +08:00
parent ac3109b014
commit 87d2527694

View File

@@ -12,39 +12,62 @@
#include <signal.h>
#include <netinet/ip.h>
#include <netinet/ip6.h>
#include <netinet/udp.h>
#include "MESA_handle_logger.h"
#include "MESA_prof_load.h"
#include "stream.h"
#include "gdev_keepalive.h"
#include "rdkafka.h"
int version_20190730;
#define DEFAULT_RETURN_VALUE (APP_STATE_GIVEME | APP_STATE_DROPPKT)
extern unsigned char vxlan_sport_map_to_service_id(unsigned short sport_host_order);
extern unsigned char vxlan_id_map_to_service_id(int vxlan_id_host_order);
extern int platform_register_action_judge(char (*action_cb_fun)(int net_conn_mode, char plug_action));
extern int g_business_plug_type;
#define MAX_LOG_INFO_LEN 256
#define MAX_TRAFFIC_INFO_LEN 1024
const char *module_name = "lirenjie_vxlan";
const char *tuple_log_path = "./log/lirenjie_vxlan/ip_tuple.log";
const char *kafka_log_path = "./log/lirenjie_vxlan/kafka.log";
const char *gdev_conf_path = "./conf/gdev.conf";
const char *entrance_id_path = "./conf/lrj_vxlan_sapp.conf";
void *runtime_log_handler;
void *kafka_log_handler;
char vx_ip_header_dst_ip[INET_ADDRSTRLEN]; // conf/gdev.conf sendto_gdev_ip
unsigned int entrance_id; /* 局点ID 读配置文件 ./conf/lrj_vxlan_sapp.conf 默认为0*/
unsigned char flow_type; /* 回流0/回注1 读配置文件 ./conf/lrj_vxlan_sapp.conf 默认为0*/
static const char *module_name = "lirenjie_vxlan";
static const char *tuple_log_path = "./log/lirenjie_vxlan/ip_tuple.log";
static const char *kafka_log_path = "./log/lirenjie_vxlan/kafka.log";
static const char *gdev_conf_path = "./conf/gdev.conf";
static const char *entrance_id_path = "./conf/lrj_vxlan_sapp.conf";
static void *runtime_log_handler;
static void *kafka_log_handler;
//char vx_ip_header_dst_ip[INET_ADDRSTRLEN]; // conf/gdev.conf sendto_gdev_ip
static unsigned int sapp_keepalive_reflux_ip_net; /* 本端业务保活、回流IP */
#define IDENTIFY_LOCAL_IP_SUBNET_MASK (0xFFFFFF00) /* 一个局点N台前端机, 用IP段识别是否前端机IP, 用于区别流量时回流还是回注 */
static unsigned int entrance_id; /* 局点ID 读配置文件 ./conf/lrj_vxlan_sapp.conf 默认为0*/
enum flow_type_t{
FLOW_TYPE_REFLUX = 0, /* 回流 */
FLOW_TYPE_INJECT = 1, /* 回注 */
};
static unsigned int flow_type; /* 回流0/回注1 读配置文件 ./conf/lrj_vxlan_sapp.conf 默认为0*/
/* kafka */
const int PRODUCER_INIT_FAILED = -1;
const int PRODUCER_INIT_SUCCESS = 0;
const int PUSH_DATA_FAILED = -1;
const int PUSH_DATA_SUCCESS = 0;
int partition;
static const int PRODUCER_INIT_FAILED = -1;
static const int PRODUCER_INIT_SUCCESS = 0;
static const int PUSH_DATA_FAILED = -1;
static const int PUSH_DATA_SUCCESS = 0;
static int partition;
//rd
rd_kafka_t *kafka_producer;
rd_kafka_conf_t *conf;
static rd_kafka_t *kafka_producer;
static rd_kafka_conf_t *conf;
// topic
rd_kafka_topic_t *rkt;
rd_kafka_topic_conf_t *topic_conf;
static rd_kafka_topic_t *rkt;
static rd_kafka_topic_conf_t *topic_conf;
// char errstr[512]={0};
char *brokers = "10.172.208.1:9092,10.172.208.2:9092,10.172.208.2:9092,10.172.208.4:9092,10.172.208.5:9092";
char *topic = "G_BACK_TRAFFIC_STATISTIC";
//static char *brokers = "10.172.208.1:9092,10.172.208.2:9092,10.172.208.2:9092,10.172.208.4:9092,10.172.208.5:9092";
//static char *brokers = "10.208.133.126:9092,10.208.133.133:9092,10.208.133.135:9092,10.208.133.141:9092";
char brokers[128];
static char *topic = "G_BACK_TRAFFIC_STATISTIC_new";
struct traffic_info
{
@@ -57,11 +80,13 @@ struct traffic_info
//struct tm *systime; // date YYYY-MM-DD %04d-%02d-%02d systime->tm_year + 1900, systime->tm_mon + 1, systime->tm_mday
// time_t stat_time; // 秒级
struct timeval stat_time; //微妙级
char vx_ip_header_src_ip[INET_ADDRSTRLEN];
unsigned int vx_ip_header_src_ip_net;
unsigned int vx_ip_header_dst_ip_net;
unsigned short vx_UDP_header_src_port;
unsigned short vx_UDP_header_dst_port;
struct layer_addr addr;
unsigned char service_id; /* Vlan ID或特定的标签值对应的VPN号 */
//unsigned char service_id; /* Vlan ID或特定的标签值对应的VPN号 */
int vxlan_vpn_id;
unsigned short vx_type; //IPv4=0x0800 IPv6=0x86DD Arp=0x0806
/* ipv4 src_ip dst_ip identification fragment_offset*/
char ipv4_sip[INET_ADDRSTRLEN];
@@ -90,10 +115,10 @@ static void logger(const rd_kafka_t *rk, int level, const char *fac, const char
level, fac, rk ? rd_kafka_name(rk) : NULL, buf);
}
int init_kafka(int partition_, char *brokers_, char *topic_)
static int init_kafka(int partition_, char *brokers_, char *topic_)
{
char tmp[16];
char errstr[512];
char errstr[1024];
partition = partition_;
/* Kafka configuration */
conf = rd_kafka_conf_new();
@@ -102,13 +127,16 @@ int init_kafka(int partition_, char *brokers_, char *topic_)
/* Quick termination */
snprintf(tmp, sizeof(tmp), "%i", SIGIO);
rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);
rd_kafka_conf_set(conf, "queue.buffering.max.messages", "1000000", kafka_errstr, sizeof(kafka_errstr));
rd_kafka_conf_set(conf, "topic.metadata.refresh.interval.ms", "600000",kafka_errstr, sizeof(kafka_errstr));
rd_kafka_conf_set(conf, "request.required.acks", "1", kafka_errstr, sizeof(kafka_errstr));
/*topic configuration*/
topic_conf = rd_kafka_topic_conf_new();
if (conf == NULL)
{
//fprintf(stderr, "***** Failed to create new conf *******\n");
MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"***** Failed to create new conf *******");
MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name,"***** Failed to create new conf *******");
return PRODUCER_INIT_FAILED;
}
kafka_producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, (size_t)sizeof(errstr));
@@ -116,8 +144,8 @@ int init_kafka(int partition_, char *brokers_, char *topic_)
{
/*fprintf(stderr, "***** kafka_producer is null *******\n");
fprintf(stderr, "*****Failed to create new producer: %s*******\n", errstr);*/
MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"***** kafka_producer is null, *******");
MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"*****Failed to create new producer: %s*******\n", errstr);
MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name,"***** kafka_producer is null, *******");
MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name,"*****Failed to create new producer: %s*******\n", errstr);
return PRODUCER_INIT_FAILED;
}
@@ -127,7 +155,7 @@ int init_kafka(int partition_, char *brokers_, char *topic_)
if (rd_kafka_brokers_add(kafka_producer, brokers_) == 0)
{
//fprintf(stderr, "****** No valid brokers specified********\n");
MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"****** No valid brokers specified********");
MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name,"****** No valid brokers specified********");
return PRODUCER_INIT_FAILED;
}
/* Create topic */
@@ -136,13 +164,13 @@ int init_kafka(int partition_, char *brokers_, char *topic_)
return PRODUCER_INIT_SUCCESS;
}
void kafka_destroy()
static void kafka_destroy()
{
rd_kafka_topic_destroy(rkt);
rd_kafka_destroy(kafka_producer);
}
int push_data_to_kafka(char *buffer, int buf_len)
static int push_data_to_kafka(char *buffer, int buf_len)
{
int ret;
if (buffer == NULL)
@@ -150,6 +178,7 @@ int push_data_to_kafka(char *buffer, int buf_len)
return 0;
}
ret = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, buffer, (size_t)buf_len, NULL, 0, NULL);
//ret = rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, buffer, (size_t)buf_len, NULL, 0, NULL);
if (ret == -1)
{
/*fprintf(stderr,
@@ -161,17 +190,19 @@ int push_data_to_kafka(char *buffer, int buf_len)
rd_kafka_topic_name(rkt), partition,
rd_kafka_err2str(rd_kafka_last_error()));
/* Poll to handle delivery reports */
rd_kafka_poll(kafka_producer, 0);
//rd_kafka_poll(kafka_producer, 0);
return PUSH_DATA_FAILED;
}
/*fprintf(stderr, "%% Sent %zd bytes to topic "
"%s partition %i\n",
buf_len, rd_kafka_topic_name(rkt), partition);*/
MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"%% Sent %zd bytes to topic %s partition %i",
buf_len, rd_kafka_topic_name(rkt), partition);
rd_kafka_poll(kafka_producer, 0);
//rd_kafka_poll(kafka_producer, 0);
return PUSH_DATA_SUCCESS;
}
#if 0
unsigned char get_service_id(struct streaminfo *pstream)
{
int ret;
@@ -236,8 +267,9 @@ unsigned char get_service_id(struct streaminfo *pstream)
return service_id;
}
#endif
unsigned short get_proto_type(struct streaminfo *pstream)
static unsigned short get_proto_type(struct streaminfo *pstream)
{
if (pstream->addr.addrtype == ADDR_TYPE_IPV4)
{
@@ -256,7 +288,24 @@ unsigned short get_proto_type(struct streaminfo *pstream)
}
}
unsigned char get_service_id_from_vxlanid(struct streaminfo *pstream)
static int get_vpnid_from_stream(struct streaminfo *pstream)
{
int vpn_id_net_order;
int ret;
ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_VXLAN_VPNID, &vpn_id_net_order);
if (ret >= 0)
{
return ntohl(vpn_id_net_order);
}
else
{
return 0;
}
}
#if 0
static unsigned char get_service_id_from_vxlanid(struct streaminfo *pstream)
{
int vxlan_id; /* 由vxlan_id获取当前包所属业务号 vxlan_id_map_to_service_id*/
int ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_VXLAN_ID, &vxlan_id);
@@ -272,7 +321,8 @@ unsigned char get_service_id_from_vxlanid(struct streaminfo *pstream)
}
}
unsigned char get_service_id_from_sport(struct streaminfo *pstream)
static unsigned char get_service_id_from_sport(struct streaminfo *pstream)
{
int ret;
unsigned short vxlan_sport; /* 由源端口获取当前包所属业务号 vxlan_sport_map_to_service_id*/
@@ -291,22 +341,58 @@ unsigned char get_service_id_from_sport(struct streaminfo *pstream)
return 0;
}
}
#endif
void get_vx_ip_header_src_ip(struct streaminfo *pstream, struct traffic_info *tinfo)
static int is_same_sub_net(unsigned int packet_gdev_ip_net, unsigned int local_gdev_ip_net)
{
int gdev_ip;
int ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_GDEV_IP, &gdev_ip);
if((ntohl(packet_gdev_ip_net) & IDENTIFY_LOCAL_IP_SUBNET_MASK) ==
(ntohl(local_gdev_ip_net) & IDENTIFY_LOCAL_IP_SUBNET_MASK)){
return 1;
}
return 0;
}
static int get_vxlan_ip_addr(struct streaminfo *pstream, struct traffic_info *tinfo)
{
int gdev_ip_net;
int ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_GDEV_IP, &gdev_ip_net);
if (ret >= 0)
{
inet_ntop(AF_INET, &gdev_ip, tinfo->vx_ip_header_src_ip, INET_ADDRSTRLEN);
if(is_same_sub_net(gdev_ip_net, sapp_keepalive_reflux_ip_net)){
flow_type = FLOW_TYPE_INJECT; /* 从驱动获取的GDEV IP(vxlan->srcip), 和本机IP在一个网段, 说明是回注包 */
}else{
flow_type = FLOW_TYPE_REFLUX; /* 从驱动获取的GDEV IP(vxlan->srcip), 和本机IP不在一个网段, 说明是回流包 */
}
#if 0
if(gdev_ip_net == sapp_keepalive_reflux_ip_net){ /* add by lijia 20190611 */
MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, "vxlan src and dst ip is equal!");
ret = -1;
}
#endif
if(FLOW_TYPE_REFLUX == flow_type){
tinfo->vx_ip_header_src_ip_net = gdev_ip_net;
tinfo->vx_ip_header_dst_ip_net = sapp_keepalive_reflux_ip_net;
ret = 0;
}else{
tinfo->vx_ip_header_src_ip_net = sapp_keepalive_reflux_ip_net;
tinfo->vx_ip_header_dst_ip_net = gdev_ip_net;
ret = 0;
}
}
else
{
memset(tinfo->vx_ip_header_src_ip, 0, INET_ADDRSTRLEN);
tinfo->vx_ip_header_src_ip_net = 0;
tinfo->vx_ip_header_dst_ip_net = 0;
ret = -1;
}
return ret;
}
unsigned char get_vx_UDP_header_src_port(struct streaminfo *pstream)
static unsigned short get_vx_UDP_header_src_port(struct streaminfo *pstream)
{
unsigned short vxlan_sport;
int ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_VXLAN_SPORT, &vxlan_sport);
@@ -321,7 +407,8 @@ unsigned char get_vx_UDP_header_src_port(struct streaminfo *pstream)
}
/* 获取pstream中的四元组 */
void get_tuple4(struct streaminfo *pstream, unsigned char service_id)
#if 0
static void get_tuple4(struct streaminfo *pstream, unsigned char service_id)
{
// printf("%s\n", addr_type_to_string((pstream->addr).addrtype));
if (pstream->addr.addrtype == ADDR_TYPE_IPV4)
@@ -380,8 +467,9 @@ void get_tuple4(struct streaminfo *pstream, unsigned char service_id)
{
}
}
#endif
void get_ip_detail(struct streaminfo *pstream, struct traffic_info *tinfo, const void *rawpkt)
static void get_ip_detail(struct streaminfo *pstream, struct traffic_info *tinfo, const void *rawpkt)
{
// printf("%s\n", addr_type_to_string((pstream->addr).addrtype));
if (pstream->addr.addrtype == ADDR_TYPE_IPV4)
@@ -393,7 +481,7 @@ void get_ip_detail(struct streaminfo *pstream, struct traffic_info *tinfo, const
struct ip *ip_hdr = (struct ip *)rawpkt;
tinfo->ipv4_id = ntohs(ip_hdr->ip_id);
tinfo->ipv4_off = ntohs(ip_hdr->ip_off);
#if 0
char info[MAX_LOG_INFO_LEN] = {0};
if (tinfo->service_id > 0)
{
@@ -408,6 +496,8 @@ void get_ip_detail(struct streaminfo *pstream, struct traffic_info *tinfo, const
tinfo->ipv4_dip, ntohs(tuple4_v4->dest));
}
// MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info);
#endif
}
else if (pstream->addr.addrtype == ADDR_TYPE_IPV6)
{
@@ -421,6 +511,7 @@ void get_ip_detail(struct streaminfo *pstream, struct traffic_info *tinfo, const
tinfo->ipv6_load_length = ntohs(ip6_head->ip6_plen);
tinfo->ipv6_next_msg_head = ip6_head->ip6_nxt;
tinfo->ipv6_limit = ip6_head->ip6_hlim;
#if 0
char info[MAX_LOG_INFO_LEN] = {0};
if (tinfo->service_id > 0)
@@ -435,6 +526,7 @@ void get_ip_detail(struct streaminfo *pstream, struct traffic_info *tinfo, const
}
// MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info);
// push_data_to_kafka(info,sizeof(info));
#endif
}
else if (pstream->addr.addrtype == ADDR_TYPE_ARP)
{
@@ -442,26 +534,29 @@ void get_ip_detail(struct streaminfo *pstream, struct traffic_info *tinfo, const
}
}
void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *pstream)
static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *pstream)
{
char protocol[8];
char vxlan_sip_ip_str[INET_ADDRSTRLEN];
char vxlan_dip_ip_str[INET_ADDRSTRLEN];
switch (tinfo->protocol)
{
case 1:
sprintf(protocol, "%s", "IPv4_TCP");
break;
case 2:
sprintf(protocol, "%s", "IPv4_UDP");
break;
case 3:
sprintf(protocol, "%s", "IPv6_TCP");
break;
case 4:
sprintf(protocol, "%s", "IPv6_UDP");
break;
default:
sprintf(protocol, "%s", "others");
break;
case 1:
sprintf(protocol, "%s", "IPv4_TCP");
break;
case 2:
sprintf(protocol, "%s", "IPv4_UDP");
break;
case 3:
sprintf(protocol, "%s", "IPv6_TCP");
break;
case 4:
sprintf(protocol, "%s", "IPv6_UDP");
break;
default:
sprintf(protocol, "%s", "others");
break;
}
/*
printf("\"proto_type\":%d,protocol:%s, entrance_id:%d, c2s_pkt_num:%d, s2c_pkt_num:%d, c2s_byte_len:%d, s2c_byte_len:%d, ",
@@ -476,6 +571,9 @@ void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *pstream)
tinfo->ipv6_sip,tinfo->ipv6_dip,tinfo->ipv6_bus_type,tinfo->ipv6_flow_flag,tinfo->ipv6_load_length,tinfo->ipv6_next_msg_head,tinfo->ipv6_limit);
*/
char info[MAX_TRAFFIC_INFO_LEN] = {0};
inet_ntop(AF_INET, &tinfo->vx_ip_header_src_ip_net, vxlan_sip_ip_str, INET_ADDRSTRLEN);
inet_ntop(AF_INET, &tinfo->vx_ip_header_dst_ip_net, vxlan_dip_ip_str, INET_ADDRSTRLEN);
switch (tinfo->addr.addrtype)
{
@@ -483,14 +581,16 @@ void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *pstream)
snprintf(info, MAX_TRAFFIC_INFO_LEN,
"{\"proto_type\":%d,\"protocol\":\"%s\",\"entrance_id\":%d,\"c2s_pkt_num\":%d,\"s2c_pkt_num\":%d,\"c2s_byte_len\":%d,\"s2c_byte_len\":%d,"
"\"stat_time\":%ld,\"vx_type\":\"0x%04X\",\"vx_ip_header_src_ip\":\"%s\",\"vx_ip_header_dst_ip\":\"%s\","
"\"vx_udp_header_src_port\":%d,\"vx_udp_header_dst_port\":%d,\"vx_vlan_id\":%d,"
"\"ipv4_src_ip\":\"%s\",\"ipv4_dst_ip\":\"%s\",\"ipv4_identification\":%d,\"ipv4_fragment_offset\":%d,"
"\"vx_udp_header_src_port\":%u,\"vx_udp_header_dst_port\":%u,\"vx_vlan_id\":%u,"
"\"ipv4_src_ip\":\"%s\",\"ipv4_dst_ip\":\"%s\",\"ipv4_identification\":%d,\"ipv4_fragment_offset\":%u,"
"\"ipv6_src_ip\":\"\",\"ipv6_dst_ip\":\"\",\"ipv6_bus_type\":\"\",\"ipv6_flow_flag\":\"\",\"ipv6_load_length\":0,"
"\"ipv6_next_msg_head\":0,\"ipv6_limit\":0,\"flow_type\":%d}",
tinfo->PROTO_TYPE,protocol, entrance_id, tinfo->C2S_pkt_num, tinfo->S2C_pkt_num, tinfo->C2S_bytes, tinfo->S2C_bytes,
tinfo->stat_time.tv_sec*1000 + tinfo->stat_time.tv_usec/1000, tinfo->vx_type, tinfo->vx_ip_header_src_ip, vx_ip_header_dst_ip,
tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->service_id,
tinfo->ipv4_sip, tinfo->ipv4_dip, tinfo->ipv4_id, tinfo->ipv4_off,flow_type);
tinfo->stat_time.tv_sec*1000 + tinfo->stat_time.tv_usec/1000, tinfo->vx_type, vxlan_sip_ip_str, vxlan_dip_ip_str,
tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->vxlan_vpn_id,
tinfo->ipv4_sip, tinfo->ipv4_dip, tinfo->ipv4_id, tinfo->ipv4_off,
//ipv6 stat is NULL,
flow_type);
MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info);
push_data_to_kafka(info,strlen(info));
break;
@@ -498,13 +598,13 @@ void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *pstream)
snprintf(info, MAX_TRAFFIC_INFO_LEN,
"{\"proto_type\":%d,\"protocol\":\"%s\",\"entrance_id\":%d,\"c2s_pkt_num\":%d,\"s2c_pkt_num\":%d,\"c2s_byte_len\":%d,\"s2c_byte_len\":%d,"
"\"stat_time\":%ld,\"vx_type\":\"0x%04X\",\"vx_ip_header_src_ip\":\"%s\",\"vx_ip_header_dst_ip\":\"%s\","
"\"vx_udp_header_src_port\":%d,\"vx_udp_header_dst_port\":%d,\"vx_vlan_id\":%d,"
"\"vx_udp_header_src_port\":%u,\"vx_udp_header_dst_port\":%u,\"vx_vlan_id\":%u,"
"\"ipv4_src_ip\":\"\",\"ipv4_dst_ip\":\"\",\"ipv4_identification\":0,\"ipv4_fragment_offset\":0,"
"\"ipv6_src_ip\":\"%s\",\"ipv6_dst_ip\":\"%s\",\"ipv6_bus_type\":\"0x%02X\",\"ipv6_flow_flag\":\"0x%05X\",\"ipv6_load_length\":%d,"
"\"ipv6_next_msg_head\":%d,\"ipv6_limit\":%d,\"flow_type\":%d}",
tinfo->PROTO_TYPE,protocol, entrance_id, tinfo->C2S_pkt_num, tinfo->S2C_pkt_num, tinfo->C2S_bytes, tinfo->S2C_bytes,
tinfo->stat_time.tv_sec*1000 + tinfo->stat_time.tv_usec/1000, tinfo->vx_type, tinfo->vx_ip_header_src_ip, vx_ip_header_dst_ip,
tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->service_id,
tinfo->stat_time.tv_sec*1000 + tinfo->stat_time.tv_usec/1000, tinfo->vx_type, vxlan_sip_ip_str, vxlan_dip_ip_str,
tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->vxlan_vpn_id,
tinfo->ipv6_sip, tinfo->ipv6_dip, tinfo->ipv6_bus_type, tinfo->ipv6_flow_flag, tinfo->ipv6_load_length,
tinfo->ipv6_next_msg_head, tinfo->ipv6_limit, flow_type);
MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info);
@@ -519,8 +619,9 @@ void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *pstream)
"\"ipv6_src_ip\":\"\",\"ipv6_dst_ip\":\"\",\"ipv6_bus_type\":\"\",\"ipv6_flow_flag\":\"\",\"ipv6_load_length\":0,"
"\"ipv6_next_msg_head\":0,\"ipv6_limit\":0,\"flow_type\":%d}",
tinfo->PROTO_TYPE,protocol, entrance_id, tinfo->C2S_pkt_num, tinfo->S2C_pkt_num, tinfo->C2S_bytes, tinfo->S2C_bytes,
tinfo->stat_time.tv_sec*1000 + tinfo->stat_time.tv_usec/1000, tinfo->vx_type, tinfo->vx_ip_header_src_ip, vx_ip_header_dst_ip,
tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->service_id,flow_type);
tinfo->stat_time.tv_sec*1000 + tinfo->stat_time.tv_usec/1000, tinfo->vx_type, vxlan_sip_ip_str, vxlan_dip_ip_str,
tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->vxlan_vpn_id,
flow_type);
MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info);
push_data_to_kafka(info,strlen(info));
break;
@@ -531,8 +632,8 @@ void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *pstream)
char TCP_ENTRY_ALL(struct streaminfo *pstream, void **pme, int thread_seq, const void *raw_pkt)
{
printf("TCP_ENTRY_ALL SUCCESS!!!\n");
return APP_STATE_GIVEME;
//printf("TCP_ENTRY_ALL SUCCESS!!!\n");
return DEFAULT_RETURN_VALUE;
}
static int tcp_flow_id = -1;
@@ -554,32 +655,35 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
{
tinfo = (struct traffic_info *)calloc(1, sizeof(struct traffic_info));
//tinfo->service_id = get_service_id_from_sport(pstream); //获取vx_lan_id字段具体方法待定
tinfo->service_id = get_service_id_from_vxlanid(pstream);
//tinfo->service_id = get_service_id_from_vxlanid(pstream);
tinfo->vxlan_vpn_id = get_vpnid_from_stream(pstream);
/* PROTOCOL */
switch (pstream->addr.addrtype)
{
case ADDR_TYPE_IPV4:
tinfo->protocol = 1; // IPV4_TCP
tinfo->vx_type = 0x0800;
break;
case ADDR_TYPE_IPV6:
tinfo->protocol = 3; // IPV6_TCP
tinfo->vx_type = 0x86DD;
break;
case ADDR_TYPE_ARP:
tinfo->protocol = 0;
tinfo->vx_type = 0x0806;
break;
default:
tinfo->protocol = 0;
tinfo->vx_type = 0;
break;
case ADDR_TYPE_IPV4:
tinfo->protocol = 1; // IPV4_TCP
tinfo->vx_type = 0x0800;
break;
case ADDR_TYPE_IPV6:
tinfo->protocol = 3; // IPV6_TCP
tinfo->vx_type = 0x86DD;
break;
case ADDR_TYPE_ARP:
tinfo->protocol = 0;
tinfo->vx_type = 0x0806;
break;
default:
tinfo->protocol = 0;
tinfo->vx_type = 0;
break;
}
/* vx_UDP_header_src_port dst_port*/
tinfo->vx_UDP_header_src_port = get_vx_UDP_header_src_port(pstream);
tinfo->vx_UDP_header_dst_port = 4789;
/* vx_ip_header_src_ip dst_ip*/
get_vx_ip_header_src_ip(pstream, tinfo);
if(get_vxlan_ip_addr(pstream, tinfo) < 0){
goto error_drop;
}
/* IPv4、IPv6头部信息 */
get_ip_detail(pstream, tinfo, raw_pkt);
/* 应用层协议类型 用目的端口表示 */
@@ -603,7 +707,7 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
}
}
*/
if (pstream->opstate == OP_STATE_CLOSE && tinfo->service_id > 0)
if (pstream->opstate == OP_STATE_CLOSE && tinfo->vxlan_vpn_id > 0)
{
//printf("TCP_ENTRY SUCCESS!!!\n");
/* 获取包数字节数 */
@@ -633,15 +737,48 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
free(tinfo);
// printf("\n");
}
return APP_STATE_GIVEME;
return DEFAULT_RETURN_VALUE;
error_drop:
free(tinfo);
return APP_STATE_DROPME | APP_STATE_DROPPKT;
}
/*
add by lijia 20190604.
*/
static inline int is_gdev_keepalive_pkt(const struct ip *iphdr)
{
const struct udphdr *udh;
if(NULL == iphdr){
return 0;
}
if(iphdr->ip_p != 17){
return 0;
}
udh = (struct udphdr *)((char *)iphdr + iphdr->ip_hl*4);
if(udh->dest == ntohs(3784)){
return 1;
}
return 0;
}
static int udp_flow_id = -1;
char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const void *raw_pkt)
{
struct udpdetail *pdetail = (struct udpdetail *)pstream->pdetail;
struct traffic_info *tinfo;
if(is_gdev_keepalive_pkt((const struct ip *)raw_pkt) != 0){ //add by lijia 20190604, drop BFD keepalive packet.
return APP_STATE_DROPME;
}
if (-1 == udp_flow_id)
{
udp_flow_id = project_customer_register(PROJECT_REQ_UDP_FLOW, "struct");
@@ -655,7 +792,9 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
{
tinfo = (struct traffic_info *)calloc(1, sizeof(struct traffic_info));
//tinfo->service_id = get_service_id_from_sport(pstream); //获取vx_lan_id字段具体方法待定
tinfo->service_id = get_service_id_from_vxlanid(pstream);
//tinfo->service_id = get_service_id_from_vxlanid(pstream);
tinfo->vxlan_vpn_id = get_vpnid_from_stream(pstream);
/* PROTOCOL */
switch (pstream->addr.addrtype)
{
@@ -680,7 +819,9 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
tinfo->vx_UDP_header_src_port = get_vx_UDP_header_src_port(pstream);
tinfo->vx_UDP_header_dst_port = 4789;
/* vx_ip_header_src_ip dst_ip*/
get_vx_ip_header_src_ip(pstream, tinfo);
if(get_vxlan_ip_addr(pstream, tinfo) < 0){
goto error_drop;
}
/* IPv4、IPv6头部信息 */
get_ip_detail(pstream, tinfo, raw_pkt);
/* 应用层协议类型 用目的端口表示 */
@@ -690,7 +831,7 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
}
tinfo = (struct traffic_info *)(*pme);
if (pstream->opstate == OP_STATE_CLOSE && tinfo->service_id > 0)
if (pstream->opstate == OP_STATE_CLOSE && tinfo->vxlan_vpn_id > 0)
{
//printf("UDP_ENTRY SUCCESS!!!\n");
@@ -728,7 +869,16 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
free(tinfo);
// printf("\n");
}
return APP_STATE_GIVEME;
return DEFAULT_RETURN_VALUE;
error_drop:
free(tinfo);
return APP_STATE_DROPME | APP_STATE_DROPPKT;
}
static char return_action_cb_fun(int net_conn_mode, char plug_action)
{
return 0; //所有包默认都DROP
}
int CHAR_INIT()
@@ -736,8 +886,14 @@ int CHAR_INIT()
int demo_plugid = 51;
runtime_log_handler = NULL;
kafka_log_handler = NULL;
runtime_log_handler = MESA_create_runtime_log_handle(tuple_log_path, RLOG_LV_INFO);
kafka_log_handler = MESA_create_runtime_log_handle(kafka_log_path, RLOG_LV_INFO);
char str_tmp[128];
int log_level = 30;
MESA_load_profile_int_def(entrance_id_path, "LOG", "log_level", &log_level, 30);
runtime_log_handler = MESA_create_runtime_log_handle(tuple_log_path, log_level);
kafka_log_handler = MESA_create_runtime_log_handle(kafka_log_path, log_level);
if (runtime_log_handler == NULL || kafka_log_handler == NULL)
{
/* code */
@@ -747,9 +903,17 @@ int CHAR_INIT()
/* ENTRANCE_ID */
MESA_load_profile_uint_def(entrance_id_path, "SETTING", "ENTRANCE_ID", &entrance_id, 0);
/* FLOW_TYPE */
MESA_load_profile_uint_def(entrance_id_path, "SETTING", "FLOW_TYPE", &flow_type, 0);
MESA_load_profile_uint_def(entrance_id_path, "SETTING", "FLOW_TYPE", &flow_type, (int)FLOW_TYPE_REFLUX);
/* Brokers */
MESA_load_profile_string_def(entrance_id_path, "SETTING", "BROKERS", brokers, sizeof(brokers),"#");
/* vx_ip_header_dst_ip */
MESA_load_profile_string_def(gdev_conf_path, "Module", "sendto_gdev_ip", vx_ip_header_dst_ip, INET_ADDRSTRLEN, "0.0.0.0");
MESA_load_profile_string_def(gdev_conf_path, "Module", "sendto_gdev_ip", str_tmp, sizeof(str_tmp), "#");
if('#' == str_tmp[0]){
MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name,"can't get %s->sendto_gdev_ip!!", gdev_conf_path);
return -1;
}
inet_pton(AF_INET, str_tmp, &sapp_keepalive_reflux_ip_net);
/* kafka初始化 */
if (init_kafka(0, brokers, topic) != PRODUCER_INIT_SUCCESS)
{
@@ -757,6 +921,9 @@ int CHAR_INIT()
return -1;
}
platform_register_action_judge(return_action_cb_fun);
g_business_plug_type = 0; /* 非常规办法, 这是串联插件总控的内部变量, 0:JC; 1:GK, 0默认丢弃所有包 */
// 函数实现自定义
// 只要求函数返回值为插件ID
//printf("INIT SUCCESS!!!\n");
@@ -777,3 +944,4 @@ void LRJ_APP_DESTROY()
MESA_destroy_runtime_log_handle(kafka_log_handler);
printf("TEST_APP_DESTORY out...\n");
}