#include #include #include #include #include #include #include #include #include #include #include #include "cJSON.h" #include "kni_entry.h" #include "kni_sendlog.h" struct kni_logger* g_kni_sendlog; static unsigned int get_ip_by_eth_name(const char *ifname) { int sockfd; struct ifreq ifr; unsigned int ip; sockfd = socket(AF_INET, SOCK_DGRAM, 0); if (-1 == sockfd) { goto error; } strcpy(ifr.ifr_name,ifname); if (ioctl(sockfd, SIOCGIFADDR, &ifr) < 0) { goto error; } ip = ((struct sockaddr_in*)&(ifr.ifr_addr))->sin_addr.s_addr; close(sockfd); return ip; error: close(sockfd); return INADDR_NONE; } static rd_kafka_t * create_kafka_handle(const char* brokerlist) { char kafka_errstr[1024]; rd_kafka_t *handle=NULL; rd_kafka_conf_t *rdkafka_conf = NULL; rdkafka_conf = rd_kafka_conf_new(); rd_kafka_conf_set(rdkafka_conf, "queue.buffering.max.messages", "1000000", kafka_errstr, sizeof(kafka_errstr)); rd_kafka_conf_set(rdkafka_conf, "topic.metadata.refresh.interval.ms", "600000",kafka_errstr, sizeof(kafka_errstr)); rd_kafka_conf_set(rdkafka_conf, "security.protocol", "MG", kafka_errstr, sizeof(kafka_errstr)); //The conf object is freed by this function and must not be used or destroyed by the application sub-sequently. handle = rd_kafka_new(RD_KAFKA_PRODUCER, rdkafka_conf, kafka_errstr, sizeof(kafka_errstr)); rdkafka_conf=NULL; if (handle==NULL) { return NULL; } if (rd_kafka_brokers_add(handle, brokerlist) == 0) { rd_kafka_destroy(handle); return NULL; } return handle; } struct kni_logger* kni_sendlog_init() { int ret=-1; char nic_name[64]={0}; g_kni_sendlog=ALLOC(struct kni_logger,1); MESA_handle_runtime_log(g_kni_comminfo.logger, RLOG_LV_FATAL,KNI_MODULE_INIT,"kni log is inititating from %s section %s.", KNI_CONF_FILENAME, KNI_SENDLOG_MODE); MESA_load_profile_int_def(KNI_CONF_FILENAME, KNI_SENDLOG_MODE, "send_log_switch",&(g_kni_switch_info.send_log_switch),0); if(g_kni_switch_info.send_log_switch == 0) { goto error_out; } MESA_load_profile_string_def(KNI_CONF_FILENAME, KNI_SENDLOG_MODE, "NIC_NAME",nic_name,sizeof(nic_name),"eth0"); g_kni_sendlog->local_ip_nr=get_ip_by_eth_name(nic_name); if(g_kni_sendlog->local_ip_nr==INADDR_NONE) { MESA_handle_runtime_log(g_kni_comminfo.logger, RLOG_LV_FATAL,KNI_MODULE_INIT,"%s get NIC_NAME: %s error.", __FUNCTION__, nic_name); goto error_out; } inet_ntop(AF_INET,&(g_kni_sendlog->local_ip_nr),g_kni_sendlog->local_ip_str,sizeof(g_kni_sendlog->local_ip_str)); MESA_load_profile_int_def(KNI_CONF_FILENAME, KNI_SENDLOG_MODE, "ENTRANCE_ID",&(g_kni_sendlog->entry_id),0); ret=MESA_load_profile_string_def(KNI_CONF_FILENAME, KNI_SENDLOG_MODE,"KAFKA_BROKERLIST", g_kni_sendlog->brokerlist, sizeof(g_kni_sendlog->brokerlist), NULL); if(ret<0) { MESA_handle_runtime_log(g_kni_comminfo.logger, RLOG_LV_FATAL,KNI_MODULE_INIT,"kni log init failed, no brokerlist in profile %s section %s.", KNI_CONF_FILENAME, KNI_SENDLOG_MODE); goto error_out; } g_kni_sendlog->kafka_handle=create_kafka_handle(g_kni_sendlog->brokerlist); if(g_kni_sendlog->kafka_handle==NULL) { MESA_handle_runtime_log(g_kni_comminfo.logger, RLOG_LV_FATAL,KNI_MODULE_INIT,"kni log init failed. Cannot create lafka handle with brokerlist: %s.", g_kni_sendlog->brokerlist); goto error_out; } g_kni_sendlog->topic_name="PXY-KNI-LOG"; g_kni_sendlog->kafka_topic = rd_kafka_topic_new(g_kni_sendlog->kafka_handle,g_kni_sendlog->topic_name, NULL); return g_kni_sendlog; error_out: free(g_kni_sendlog); return NULL; } int kni_send_log(const struct kni_log* log_msg) { if(g_kni_switch_info.send_log_switch == 0) { return 0; } const struct layer_addr* addr=&(log_msg->stream->addr); const char* tmp_val=NULL; cJSON *common_obj=NULL, *per_hit_obj=NULL; char* log_payload=NULL; int kafka_status=0; int send_cnt=0; time_t cur_time; char src_ip_str[MAX(INET6_ADDRSTRLEN,INET_ADDRSTRLEN)] = {0}; char dst_ip_str[MAX(INET6_ADDRSTRLEN,INET_ADDRSTRLEN)] = {0}; common_obj=cJSON_CreateObject(); cur_time = time(NULL); cJSON_AddNumberToObject(common_obj, "found_time", cur_time); cJSON_AddNumberToObject(common_obj, "recv_time", cur_time); switch(addr->addrtype) { case ADDR_TYPE_IPV4: cJSON_AddNumberToObject(common_obj, "addr_type", 4); inet_ntop(AF_INET, &addr->tuple4_v4->saddr, src_ip_str, sizeof(src_ip_str)); inet_ntop(AF_INET, &addr->tuple4_v4->daddr, dst_ip_str, sizeof(dst_ip_str)); cJSON_AddStringToObject(common_obj, "s_ip", src_ip_str); cJSON_AddStringToObject(common_obj, "d_ip", dst_ip_str); cJSON_AddNumberToObject(common_obj, "s_port", ntohs(addr->tuple4_v4->source)); cJSON_AddNumberToObject(common_obj, "d_port", ntohs(addr->tuple4_v4->dest)); cJSON_AddStringToObject(common_obj, "trans_proto", "IPv4_TCP"); break; case ADDR_TYPE_IPV6: cJSON_AddNumberToObject(common_obj, "addr_type", 6); inet_ntop(AF_INET6, &addr->tuple4_v6->saddr, src_ip_str, sizeof(src_ip_str)); inet_ntop(AF_INET6, &addr->tuple4_v6->daddr, dst_ip_str, sizeof(dst_ip_str)); cJSON_AddStringToObject(common_obj, "s_ip", src_ip_str); cJSON_AddStringToObject(common_obj, "d_ip", dst_ip_str); cJSON_AddNumberToObject(common_obj, "s_port", ntohs(addr->tuple4_v6->source)); cJSON_AddNumberToObject(common_obj, "d_port", ntohs(addr->tuple4_v6->dest)); cJSON_AddStringToObject(common_obj, "trans_proto", "IPv6_TCP"); break; default: break; } cJSON_AddNumberToObject(common_obj, "direction", 0); cJSON_AddNumberToObject(common_obj, "stream_dir", 3); //1:c2s, 2:s2c, 3:double cJSON_AddStringToObject(common_obj, "cap_ip", g_kni_sendlog->local_ip_str); cJSON_AddNumberToObject(common_obj, "entrance_id", g_kni_sendlog->entry_id); cJSON_AddNumberToObject(common_obj, "device_id", 0); cJSON_AddStringToObject(common_obj, "user_region", "null"); for(size_t i=0; iresult_num; i++) { if(log_msg->result[i].do_log==0) { continue; } per_hit_obj=cJSON_Duplicate(common_obj, 1); cJSON_AddNumberToObject(per_hit_obj, "cfg_id", log_msg->result[i].config_id); cJSON_AddNumberToObject(per_hit_obj, "service", log_msg->result[i].service_id); log_payload = cJSON_Print(per_hit_obj); fprintf(stderr, "%s\n", log_payload); kafka_status = rd_kafka_produce(g_kni_sendlog->kafka_topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, log_payload, strlen(log_payload), NULL, 0, NULL); free(log_payload); cJSON_Delete(per_hit_obj); if(kafka_status<0) { MESA_handle_runtime_log(g_kni_comminfo.logger, RLOG_LV_FATAL,KNI_MODULE_INIT,"Kafka produce failed: %s", rd_kafka_err2name(rd_kafka_last_error())); } send_cnt++; } cJSON_Delete(common_obj); return send_cnt; }