diff --git a/kni_comm.c b/kni_comm.c index abb2d2f..9ad72ff 100644 --- a/kni_comm.c +++ b/kni_comm.c @@ -530,6 +530,8 @@ int kni_filestate2_init() g_kni_fs2_info.field_id[FS_INTERCEPT]=FS_register(g_kni_fs2_info.handler, FS_STYLE_FIELD, FS_CALC_CURRENT,"link_intercept"); g_kni_fs2_info.field_id[FS_RATELIMIT]=FS_register(g_kni_fs2_info.handler, FS_STYLE_FIELD, FS_CALC_CURRENT,"link_ratelimit"); g_kni_fs2_info.field_id[FS_NOT_HIT]=FS_register(g_kni_fs2_info.handler, FS_STYLE_FIELD, FS_CALC_CURRENT,"link_not_hit"); + g_kni_fs2_info.field_id[FS_RATELIMIT_UDP]=FS_register(g_kni_fs2_info.handler, FS_STYLE_FIELD, FS_CALC_CURRENT,"ratelimit_udp_pkt"); + g_kni_fs2_info.field_id[FS_REPLACE_UDP]=FS_register(g_kni_fs2_info.handler, FS_STYLE_FIELD, FS_CALC_CURRENT,"replace_udp_pkt"); g_kni_fs2_info.field_id[FS_REPAIR_TOTAL]=FS_register(g_kni_fs2_info.handler, FS_STYLE_FIELD, FS_CALC_CURRENT,"repair_total"); g_kni_fs2_info.field_id[FS_REPAIR_SOCK_ERR]=FS_register(g_kni_fs2_info.handler, FS_STYLE_FIELD, FS_CALC_CURRENT,"repair_sock_err"); g_kni_fs2_info.field_id[FS_REPAIR_SET_ERR]=FS_register(g_kni_fs2_info.handler, FS_STYLE_FIELD, FS_CALC_CURRENT,"repair_set_err"); diff --git a/kni_comm.h b/kni_comm.h index 402d6af..f42473d 100644 --- a/kni_comm.h +++ b/kni_comm.h @@ -25,6 +25,8 @@ enum kni_FS_COLUME FS_INTERCEPT, FS_RATELIMIT, FS_NOT_HIT, + FS_RATELIMIT_UDP, + FS_REPLACE_UDP, FS_REPAIR_TOTAL, FS_REPAIR_SOCK_ERR, FS_REPAIR_SET_ERR, @@ -53,8 +55,6 @@ enum kni_FS_COLUME FS_REPLAY_WINDOW, FS_HTABLE_ADD, FS_HTABLE_DEL, - - FS2_COLUMN_NUM }; diff --git a/kni_entry.c b/kni_entry.c index 5a17c70..9eb984f 100644 --- a/kni_entry.c +++ b/kni_entry.c @@ -12,7 +12,7 @@ -int g_kni_version_VERSION_20181210; +int g_kni_version_VERSION_20181211; struct kni_var_comm g_kni_comminfo; struct kni_var_struct g_kni_structinfo; @@ -59,15 +59,25 @@ int kni_scan_domain(char* domain,int domain_len,int thread_seq,struct kni_pme_in { int string_scan_num=0; int found_pos; - struct Maat_rule_t maat_result[KNI_MAX_SAMENUM]; + + + string_scan_num=Maat_full_scan_string(g_kni_maatinfo.maat_feather,g_kni_maatinfo.tableid_domain,CHARSET_GBK,domain,domain_len,&(pmeinfo->maat_result[pmeinfo->maat_result_num]),&found_pos,KNI_MAX_SAMENUM-pmeinfo->maat_result_num,&(pmeinfo->mid),thread_seq); + kni_process_maatresult(string_scan_num,&(pmeinfo->maat_result[pmeinfo->maat_result_num]),pmeinfo); - string_scan_num=Maat_full_scan_string(g_kni_maatinfo.maat_feather,g_kni_maatinfo.tableid_domain,CHARSET_GBK,domain,domain_len,maat_result,&found_pos,KNI_MAX_SAMENUM,&(pmeinfo->mid),thread_seq); - kni_process_maatresult(string_scan_num,maat_result,pmeinfo); if(string_scan_num <= 0) { - string_scan_num=Maat_full_scan_string(g_kni_maatinfo.ipd_dyn_maat_feather,g_kni_maatinfo.tableid_dynamic_domain,CHARSET_GBK,domain,domain_len,maat_result,&found_pos,KNI_MAX_SAMENUM,&(pmeinfo->mid),thread_seq); - kni_process_maatresult(string_scan_num,maat_result,pmeinfo); + string_scan_num=Maat_full_scan_string(g_kni_maatinfo.ipd_dyn_maat_feather,g_kni_maatinfo.tableid_dynamic_domain,CHARSET_GBK,domain,domain_len,&(pmeinfo->maat_result[pmeinfo->maat_result_num]),&found_pos,KNI_MAX_SAMENUM-pmeinfo->maat_result_num,&(pmeinfo->mid),thread_seq); + kni_process_maatresult(string_scan_num,&(pmeinfo->maat_result[pmeinfo->maat_result_num]),pmeinfo); + + if(string_scan_num > 0) + { + pmeinfo->maat_result_num += string_scan_num; + } + } + else + { + pmeinfo->maat_result_num += string_scan_num; } return string_scan_num; @@ -103,11 +113,16 @@ default:ipscan_num =0 or =1,not >1 int kni_scan_ip(struct ipaddr* addr,int thread_seq,int protocol,struct kni_pme_info* pmeinfo) { int ipscan_num = 0; - struct Maat_rule_t maat_result[KNI_MAX_SAMENUM]; - ipscan_num = Maat_scan_proto_addr(g_kni_maatinfo.maat_feather,g_kni_maatinfo.tableid_ip,addr,protocol,maat_result,KNI_MAX_SAMENUM,&(pmeinfo->mid),thread_seq); + ipscan_num = Maat_scan_proto_addr(g_kni_maatinfo.maat_feather,g_kni_maatinfo.tableid_ip,addr,protocol,&(pmeinfo->maat_result[pmeinfo->maat_result_num]),KNI_MAX_SAMENUM-pmeinfo->maat_result_num,&(pmeinfo->mid),thread_seq); + + kni_process_maatresult(ipscan_num,&(pmeinfo->maat_result[pmeinfo->maat_result_num]),pmeinfo); + + if(ipscan_num >=0) + { + pmeinfo->maat_result_num +=ipscan_num; + } - kni_process_maatresult(ipscan_num,maat_result,pmeinfo); //20181030add ,ipscan_action is monitor,use this keyringid;ipscan_action is replace,udp data not ipscan and pktscan pmeinfo->ipsscan_action = pmeinfo->action; @@ -510,18 +525,18 @@ int kni_protocol_identify(const struct streaminfo* pstream,const void* a_packet, -char kni_process_udppkt(unsigned char routdir,struct kni_pme_info* pmeinfo,int thread_seq,const void* a_packet) + char kni_process_udppkt(unsigned char routdir,struct kni_pme_info* pmeinfo,int thread_seq,const void* a_packet,const struct streaminfo* pstream) { char ret = APP_STATE_FAWPKT|APP_STATE_DROPME; switch(pmeinfo->action) { case KNI_ACTION_RATELIMIT: - ret = kni_process_ratelimit(thread_seq,a_packet,pmeinfo); + ret = kni_process_ratelimit(thread_seq,pstream,a_packet,pmeinfo); break; case KNI_ACTION_REPLACE: - ret = kni_process_replace(routdir,thread_seq,a_packet,pmeinfo); + ret = kni_process_replace(routdir,thread_seq,pstream,a_packet,pmeinfo); break; case KNI_ACTION_HALFHIT: @@ -570,7 +585,7 @@ char kni_first_tcpdata(const struct streaminfo* pstream,const void* a_packet,str case KNI_ACTION_RATELIMIT: kni_filestate2_set(pstream->threadnum,FS_RATELIMIT,0,1); kni_log_info((char*)KNI_MODULE_INFO,&(pstream->addr),pstream->type,NULL,(char*)"RATELIMITE",(char*)"RATELIMITE",pmeinfo); - ret = kni_process_ratelimit(pstream->threadnum,a_packet,pmeinfo); + ret = kni_process_ratelimit(pstream->threadnum,pstream,a_packet,pmeinfo); return ret; case KNI_ACTION_NONE: @@ -725,7 +740,7 @@ char kni_pending_opstate(const struct streaminfo* pstream,struct kni_pme_info* p { kni_scan_pktbin((char*)(pstream->pudpdetail->pdata),pstream->pudpdetail->datalen,thread_seq,pmeinfo); - ret = kni_process_udppkt(pstream->routedir,pmeinfo,thread_seq,a_packet); + ret = kni_process_udppkt(pstream->routedir,pmeinfo,thread_seq,a_packet,pstream); } @@ -817,7 +832,7 @@ char kni_data_opstate(const struct streaminfo* pstream,struct kni_pme_info* pmei } else if(pmeinfo->action == KNI_ACTION_RATELIMIT) { - ret = kni_process_ratelimit(pstream->threadnum,(void*)a_packet,pmeinfo); + ret = kni_process_ratelimit(pstream->threadnum,pstream,(void*)a_packet,pmeinfo); return ret; } } @@ -832,7 +847,7 @@ char kni_data_opstate(const struct streaminfo* pstream,struct kni_pme_info* pmei kni_scan_pktbin((char*)(pstream->pudpdetail->pdata),pstream->pudpdetail->datalen,thread_seq,pmeinfo); } - ret = kni_process_udppkt(pstream->routedir,pmeinfo,thread_seq,a_packet); + ret = kni_process_udppkt(pstream->routedir,pmeinfo,thread_seq,a_packet,pstream); } return ret; @@ -1066,7 +1081,7 @@ extern "C" char kni_ipv4_entry(const struct streaminfo *pstream,unsigned char ro Maat_clean_status(&(pmeinfo.mid)); - ret = kni_process_udppkt(routedir,&pmeinfo,thread_seq,ipv4_hdr); + ret = kni_process_udppkt(routedir,&pmeinfo,thread_seq,ipv4_hdr,pstream); return ret; @@ -1116,7 +1131,7 @@ extern "C" char kni_ipv6_entry(const struct streaminfo *pstream,unsigned char ro Maat_clean_status(&(pmeinfo.mid)); - ret = kni_process_udppkt(routedir,&pmeinfo,thread_seq,ipv6_hdr); + ret = kni_process_udppkt(routedir,&pmeinfo,thread_seq,ipv6_hdr,pstream); return ret; diff --git a/kni_entry.h b/kni_entry.h index 85e78d0..03e6928 100644 --- a/kni_entry.h +++ b/kni_entry.h @@ -68,6 +68,7 @@ #define KNI_MODULE_INFO "kni_info" #define KNI_MODULE_DEBUG "kni_debug" #define KNI_MODULE_SENDFD "send_fds" +#define KNI_MODULE_SENDLOG "kni_sendlog" #define KNI_ACTION_EXIT "exit..." //init profile info @@ -78,6 +79,7 @@ #define KNI_DYNMAAT_MODE "dynmic_maat" #define KNI_STATIC_MAAT_MODE "static_maat" #define KNI_TUN_MODE "tun" +#define KNI_SENDLOG_MODE "send_log" #define KNI_CONF_MODE "Module" #define KNI_CONF_FILENAME_MAIN "./conf/main.conf" @@ -246,6 +248,7 @@ struct kni_switch_info int sendpkt_mode; //0:mesa_sendpkt_option;1:socket int write_listq_switch; //0:no listq;1:has listq int send_fds_mode; //0:has listq;1:no listq + int send_log_switch; //0:not send log;1:send log }; struct kni_http_project @@ -352,12 +355,13 @@ struct kni_pme_info int keyring_id; int ipsscan_action; int protocol; - int maat_result_num; int ser_def_len; int client_fd; //only for log,not real fd int server_fd; //only for log,not read fd + int maat_result_num; scan_status_t mid; char service_defined[KNI_SERVICE_LEN]; //for replace and ratelimited + struct Maat_rule_t maat_result[KNI_MAX_SAMENUM]; struct kni_ratelimit_info ratelimit_info; struct kni_tcpopt_info tcpopt_info[KNI_DIR_DOUBLE]; //for monitor,tcp repair struct kni_wndpro_reply_info lastpkt_info[KNI_DIR_DOUBLE]; //for monitor,reply windows update diff --git a/kni_ratelimit.c b/kni_ratelimit.c index bf33cdf..fe566c6 100644 --- a/kni_ratelimit.c +++ b/kni_ratelimit.c @@ -2,6 +2,7 @@ #include #include #include "kni_entry.h" +#include "kni_sendlog.h" #include "kni_ratelimit.h" @@ -80,17 +81,18 @@ int kni_get_ratelimit(int cfg_id,struct kni_ratelimit_info* ratelimit_info,int s -char kni_process_ratelimit(int thread_seq,const void* a_packet,struct kni_pme_info* pmeinfo) +char kni_process_ratelimit(int thread_seq,const struct streaminfo* pstream,const void* a_packet,struct kni_pme_info* pmeinfo) { if((pmeinfo == NULL) || (g_kni_switch_info.ratelimit_switch == 0)) { return APP_STATE_DROPME; } -// kni_filestate2_set(thread_seq,FS_RATELIMIT,0,1); + kni_filestate2_set(thread_seq,FS_RATELIMIT_UDP,0,1); char ret = APP_STATE_GIVEME; struct kni_ratelimit_info* ratelimit_info = &(pmeinfo->ratelimit_info); + struct kni_log sendlog_msg; if((ratelimit_info->denominator == 0) && (ratelimit_info->molecule == 0)) { @@ -100,6 +102,12 @@ char kni_process_ratelimit(int thread_seq,const void* a_packet,struct kni_pme_in return APP_STATE_DROPME; } + sendlog_msg.stream = pstream; + sendlog_msg.result = pmeinfo->maat_result; + sendlog_msg.result_num = pmeinfo->maat_result_num; + + kni_send_log(&sendlog_msg); + kni_log_debug(RLOG_LV_INFO,(char*)"RATELIMIT",a_packet,(char*)"config_id:%d,molecule:%d,denominator:%d",pmeinfo->cfg_id,ratelimit_info->molecule,ratelimit_info->denominator); } diff --git a/kni_ratelimit.h b/kni_ratelimit.h index d192537..9bbbccd 100644 --- a/kni_ratelimit.h +++ b/kni_ratelimit.h @@ -1,8 +1,7 @@ #ifndef KNI_RATELIMIT_H #define KNI_RATELIMIT_H -char kni_process_ratelimit(int thread_seq,const void* a_packet,struct kni_pme_info* pmeinfo); - +char kni_process_ratelimit(int thread_seq,const struct streaminfo* pstream,const void* a_packet,struct kni_pme_info* pmeinfo); #endif diff --git a/kni_replace.c b/kni_replace.c index 178df2a..c7f0c23 100644 --- a/kni_replace.c +++ b/kni_replace.c @@ -2,6 +2,7 @@ #include #include #include +#include "kni_sendlog.h" #include "kni_replace.h" #include "kni_entry.h" @@ -129,7 +130,7 @@ int kni_build_send_ipv6(unsigned char dir,int thread_seq,struct kni_ipv6_hdr* a_ -char kni_process_replace(unsigned char dir,int thread_seq,const void* a_packet,struct kni_pme_info* pmeinfo) +char kni_process_replace(unsigned char dir,int thread_seq,const struct streaminfo* pstream,const void* a_packet,struct kni_pme_info* pmeinfo) { if(g_kni_switch_info.replace_switch == 0) { @@ -137,10 +138,12 @@ char kni_process_replace(unsigned char dir,int thread_seq,const void* a_packet,s } -// kni_filestate2_set(thread_seq,FS_REPLACE,0,1); + kni_filestate2_set(thread_seq,FS_REPLACE_UDP,0,1); // char ret = APP_STATE_DROPPKT | APP_STATE_DROPME; char ret = APP_STATE_DROPPKT | APP_STATE_GIVEME; + struct kni_log log_msg; + struct kni_replace_info replace_info; memset(&replace_info,0,sizeof(struct kni_replace_info)); @@ -151,6 +154,11 @@ char kni_process_replace(unsigned char dir,int thread_seq,const void* a_packet,s return APP_STATE_DROPME; } + + log_msg.stream = pstream; + log_msg.result = pmeinfo->maat_result; + log_msg.result_num = pmeinfo->maat_result_num; + kni_send_log(&log_msg); kni_log_debug(RLOG_LV_FATAL,(char*)"REPLACE",a_packet,(char*)"config id:%d,original:%s,replace:%s",pmeinfo->cfg_id,replace_info.find,replace_info.replace); if(*(char*)a_packet == 0x45) diff --git a/kni_replace.h b/kni_replace.h index 0c5f63b..bfbdfea 100644 --- a/kni_replace.h +++ b/kni_replace.h @@ -15,8 +15,7 @@ struct kni_replace_info char kni_replace_scan(); -char kni_process_replace(unsigned char dir,int thread_seq,const void* a_packet,struct kni_pme_info* pmeinfo); - +char kni_process_replace(unsigned char dir,int thread_seq,const struct streaminfo* pstream,const void* a_packet,struct kni_pme_info* pmeinfo); #endif diff --git a/kni_sendlog.cpp b/kni_sendlog.cpp new file mode 100644 index 0000000..4c052ef --- /dev/null +++ b/kni_sendlog.cpp @@ -0,0 +1,202 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "cJSON.h" +#include "kni_entry.h" +#include "kni_sendlog.h" + +struct kni_logger* g_kni_sendlog; + +static unsigned int get_ip_by_eth_name(const char *ifname) +{ + int sockfd; + struct ifreq ifr; + unsigned int ip; + + sockfd = socket(AF_INET, SOCK_DGRAM, 0); + if (-1 == sockfd) + { + goto error; + } + + strcpy(ifr.ifr_name,ifname); + if (ioctl(sockfd, SIOCGIFADDR, &ifr) < 0) + { + goto error; + } + + ip = ((struct sockaddr_in*)&(ifr.ifr_addr))->sin_addr.s_addr; + close(sockfd); + return ip; + +error: + close(sockfd); + return INADDR_NONE; +} + + + +static rd_kafka_t * create_kafka_handle(const char* brokerlist) +{ + char kafka_errstr[1024]; + rd_kafka_t *handle=NULL; + rd_kafka_conf_t *rdkafka_conf = NULL; + + rdkafka_conf = rd_kafka_conf_new(); + rd_kafka_conf_set(rdkafka_conf, "queue.buffering.max.messages", "1000000", kafka_errstr, sizeof(kafka_errstr)); + rd_kafka_conf_set(rdkafka_conf, "topic.metadata.refresh.interval.ms", "600000",kafka_errstr, sizeof(kafka_errstr)); + rd_kafka_conf_set(rdkafka_conf, "security.protocol", "MG", kafka_errstr, sizeof(kafka_errstr)); + + //The conf object is freed by this function and must not be used or destroyed by the application sub-sequently. + handle = rd_kafka_new(RD_KAFKA_PRODUCER, rdkafka_conf, kafka_errstr, sizeof(kafka_errstr)); + rdkafka_conf=NULL; + if (handle==NULL) + { + return NULL; + } + if (rd_kafka_brokers_add(handle, brokerlist) == 0) + { + rd_kafka_destroy(handle); + return NULL; + } + return handle; +} + +struct kni_logger* kni_sendlog_init() +{ + int ret=-1; + char nic_name[64]={0}; + + g_kni_sendlog=ALLOC(struct kni_logger,1); + + MESA_handle_runtime_log(g_kni_comminfo.logger, RLOG_LV_FATAL,KNI_MODULE_INIT,"kni log is inititating from %s section %s.", KNI_CONF_FILENAME, KNI_SENDLOG_MODE); + + MESA_load_profile_int_def(KNI_CONF_FILENAME, KNI_SENDLOG_MODE, "send_log_switch",&(g_kni_switch_info.send_log_switch),0); + if(g_kni_switch_info.send_log_switch == 0) + { + goto error_out; + } + + MESA_load_profile_string_def(KNI_CONF_FILENAME, KNI_SENDLOG_MODE, "NIC_NAME",nic_name,sizeof(nic_name),"eth0"); + g_kni_sendlog->local_ip_nr=get_ip_by_eth_name(nic_name); + if(g_kni_sendlog->local_ip_nr==INADDR_NONE) + { + MESA_handle_runtime_log(g_kni_comminfo.logger, RLOG_LV_FATAL,KNI_MODULE_INIT,"%s get NIC_NAME: %s error.", __FUNCTION__, nic_name); + goto error_out; + } + inet_ntop(AF_INET,&(g_kni_sendlog->local_ip_nr),g_kni_sendlog->local_ip_str,sizeof(g_kni_sendlog->local_ip_str)); + + MESA_load_profile_int_def(KNI_CONF_FILENAME, KNI_SENDLOG_MODE, "ENTRANCE_ID",&(g_kni_sendlog->entry_id),0); + + ret=MESA_load_profile_string_def(KNI_CONF_FILENAME, KNI_SENDLOG_MODE,"KAFKA_BROKERLIST", g_kni_sendlog->brokerlist, sizeof(g_kni_sendlog->brokerlist), NULL); + if(ret<0) + { + MESA_handle_runtime_log(g_kni_comminfo.logger, RLOG_LV_FATAL,KNI_MODULE_INIT,"kni log init failed, no brokerlist in profile %s section %s.", KNI_CONF_FILENAME, KNI_SENDLOG_MODE); + goto error_out; + } + g_kni_sendlog->kafka_handle=create_kafka_handle(g_kni_sendlog->brokerlist); + if(g_kni_sendlog->kafka_handle==NULL) + { + MESA_handle_runtime_log(g_kni_comminfo.logger, RLOG_LV_FATAL,KNI_MODULE_INIT,"kni log init failed. Cannot create lafka handle with brokerlist: %s.", g_kni_sendlog->brokerlist); + goto error_out; + } + g_kni_sendlog->topic_name="PXY-KNI-LOG"; + g_kni_sendlog->kafka_topic = rd_kafka_topic_new(g_kni_sendlog->kafka_handle,g_kni_sendlog->topic_name, NULL); + return g_kni_sendlog; + +error_out: + free(g_kni_sendlog); + return NULL; +} + +int kni_send_log(const struct kni_log* log_msg) +{ + if(g_kni_switch_info.send_log_switch == 0) + { + return 0; + } + + const struct layer_addr* addr=&(log_msg->stream->addr); + const char* tmp_val=NULL; + cJSON *common_obj=NULL, *per_hit_obj=NULL; + char* log_payload=NULL; + int kafka_status=0; + int send_cnt=0; + time_t cur_time; + char src_ip_str[MAX(INET6_ADDRSTRLEN,INET_ADDRSTRLEN)] = {0}; + char dst_ip_str[MAX(INET6_ADDRSTRLEN,INET_ADDRSTRLEN)] = {0}; + + + common_obj=cJSON_CreateObject(); + cur_time = time(NULL); + + cJSON_AddNumberToObject(common_obj, "found_time", cur_time); + cJSON_AddNumberToObject(common_obj, "recv_time", cur_time); + + switch(addr->addrtype) + { + case ADDR_TYPE_IPV4: + cJSON_AddNumberToObject(common_obj, "addr_type", 4); + inet_ntop(AF_INET, &addr->tuple4_v4->saddr, src_ip_str, sizeof(src_ip_str)); + inet_ntop(AF_INET, &addr->tuple4_v4->daddr, dst_ip_str, sizeof(dst_ip_str)); + cJSON_AddStringToObject(common_obj, "s_ip", src_ip_str); + cJSON_AddStringToObject(common_obj, "d_ip", dst_ip_str); + cJSON_AddNumberToObject(common_obj, "s_port", ntohs(addr->tuple4_v4->source)); + cJSON_AddNumberToObject(common_obj, "d_port", ntohs(addr->tuple4_v4->dest)); + cJSON_AddStringToObject(common_obj, "trans_proto", "IPv4_TCP"); + break; + case ADDR_TYPE_IPV6: + cJSON_AddNumberToObject(common_obj, "addr_type", 6); + inet_ntop(AF_INET6, &addr->tuple4_v6->saddr, src_ip_str, sizeof(src_ip_str)); + inet_ntop(AF_INET6, &addr->tuple4_v6->daddr, dst_ip_str, sizeof(dst_ip_str)); + cJSON_AddStringToObject(common_obj, "s_ip", src_ip_str); + cJSON_AddStringToObject(common_obj, "d_ip", dst_ip_str); + cJSON_AddNumberToObject(common_obj, "s_port", ntohs(addr->tuple4_v6->source)); + cJSON_AddNumberToObject(common_obj, "d_port", ntohs(addr->tuple4_v6->dest)); + cJSON_AddStringToObject(common_obj, "trans_proto", "IPv6_TCP"); + break; + default: + break; + } + cJSON_AddNumberToObject(common_obj, "direction", 0); + cJSON_AddNumberToObject(common_obj, "stream_dir", 3); //1:c2s, 2:s2c, 3:double + cJSON_AddStringToObject(common_obj, "cap_ip", g_kni_sendlog->local_ip_str); + cJSON_AddNumberToObject(common_obj, "entrance_id", g_kni_sendlog->entry_id); + cJSON_AddNumberToObject(common_obj, "device_id", 0); + cJSON_AddStringToObject(common_obj, "user_region", "null"); + + for(size_t i=0; iresult_num; i++) + { + if(log_msg->result[i].do_log==0) + { + continue; + } + per_hit_obj=cJSON_Duplicate(common_obj, 1); + cJSON_AddNumberToObject(per_hit_obj, "cfg_id", log_msg->result[i].config_id); + cJSON_AddNumberToObject(per_hit_obj, "service", log_msg->result[i].service_id); + log_payload = cJSON_Print(per_hit_obj); + + fprintf(stderr, "%s\n", log_payload); + kafka_status = rd_kafka_produce(g_kni_sendlog->kafka_topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, + log_payload, strlen(log_payload), NULL, 0, NULL); + free(log_payload); + cJSON_Delete(per_hit_obj); + if(kafka_status<0) + { + MESA_handle_runtime_log(g_kni_comminfo.logger, RLOG_LV_FATAL,KNI_MODULE_INIT,"Kafka produce failed: %s", rd_kafka_err2name(rd_kafka_last_error())); + } + send_cnt++; + } + + cJSON_Delete(common_obj); + return send_cnt; +} diff --git a/kni_sendlog.h b/kni_sendlog.h new file mode 100644 index 0000000..d105c63 --- /dev/null +++ b/kni_sendlog.h @@ -0,0 +1,34 @@ +#include +#include +#include "kni_entry.h" + + +struct kni_log +{ + const struct streaminfo *stream; + const Maat_rule_t*result; + size_t result_num; +}; + +struct kni_logger +{ + char local_ip_str[INET6_ADDRSTRLEN]; + int entry_id; + + unsigned int local_ip_nr; + rd_kafka_t *kafka_handle; + rd_kafka_topic_t* kafka_topic; + char brokerlist[KNI_CONF_MAXLEN]; + const char* topic_name; + + unsigned long long send_cnt; + char local_log_path[KNI_CONF_MAXLEN]; +}; + + +struct kni_logger* kni_sendlog_init(const char* profile, const char* section, void* local_logger); +//return 0 if SUCCESS, otherwise return -1 +int kni_send_log(const struct kni_log* log_msg); + + +