2018-12-11 23:16:41 +08:00
# include <MESA/MESA_handle_logger.h>
# include <MESA/MESA_prof_load.h>
# include <assert.h>
# include <arpa/inet.h>
# include <time.h>
# include <stdio.h>
# include <unistd.h>
# include <sys/ioctl.h>
# include <net/if.h>
# include <pthread.h>
# include <errno.h>
# include "cJSON.h"
# include "kni_entry.h"
# include "kni_sendlog.h"
struct kni_logger * g_kni_sendlog ;
static unsigned int get_ip_by_eth_name ( const char * ifname )
{
int sockfd ;
struct ifreq ifr ;
unsigned int ip ;
sockfd = socket ( AF_INET , SOCK_DGRAM , 0 ) ;
if ( - 1 = = sockfd )
{
goto error ;
}
strcpy ( ifr . ifr_name , ifname ) ;
if ( ioctl ( sockfd , SIOCGIFADDR , & ifr ) < 0 )
{
goto error ;
}
ip = ( ( struct sockaddr_in * ) & ( ifr . ifr_addr ) ) - > sin_addr . s_addr ;
close ( sockfd ) ;
return ip ;
error :
close ( sockfd ) ;
return INADDR_NONE ;
}
static rd_kafka_t * create_kafka_handle ( const char * brokerlist )
{
char kafka_errstr [ 1024 ] ;
rd_kafka_t * handle = NULL ;
rd_kafka_conf_t * rdkafka_conf = NULL ;
rdkafka_conf = rd_kafka_conf_new ( ) ;
rd_kafka_conf_set ( rdkafka_conf , " queue.buffering.max.messages " , " 1000000 " , kafka_errstr , sizeof ( kafka_errstr ) ) ;
rd_kafka_conf_set ( rdkafka_conf , " topic.metadata.refresh.interval.ms " , " 600000 " , kafka_errstr , sizeof ( kafka_errstr ) ) ;
rd_kafka_conf_set ( rdkafka_conf , " security.protocol " , " MG " , kafka_errstr , sizeof ( kafka_errstr ) ) ;
//The conf object is freed by this function and must not be used or destroyed by the application sub-sequently.
handle = rd_kafka_new ( RD_KAFKA_PRODUCER , rdkafka_conf , kafka_errstr , sizeof ( kafka_errstr ) ) ;
rdkafka_conf = NULL ;
if ( handle = = NULL )
{
return NULL ;
}
if ( rd_kafka_brokers_add ( handle , brokerlist ) = = 0 )
{
rd_kafka_destroy ( handle ) ;
return NULL ;
}
return handle ;
}
struct kni_logger * kni_sendlog_init ( )
{
int ret = - 1 ;
char nic_name [ 64 ] = { 0 } ;
g_kni_sendlog = ALLOC ( struct kni_logger , 1 ) ;
2018-12-15 22:08:47 +08:00
MESA_handle_runtime_log ( g_kni_comminfo . logger , RLOG_LV_FATAL , KNI_MODULE_SENDLOG , " kni log is inititating from %s section %s. " , KNI_CONF_FILENAME , KNI_SENDLOG_MODE ) ;
2018-12-11 23:16:41 +08:00
MESA_load_profile_int_def ( KNI_CONF_FILENAME , KNI_SENDLOG_MODE , " send_log_switch " , & ( g_kni_switch_info . send_log_switch ) , 0 ) ;
if ( g_kni_switch_info . send_log_switch = = 0 )
{
goto error_out ;
}
MESA_load_profile_string_def ( KNI_CONF_FILENAME , KNI_SENDLOG_MODE , " NIC_NAME " , nic_name , sizeof ( nic_name ) , " eth0 " ) ;
g_kni_sendlog - > local_ip_nr = get_ip_by_eth_name ( nic_name ) ;
if ( g_kni_sendlog - > local_ip_nr = = INADDR_NONE )
{
2018-12-15 22:08:47 +08:00
MESA_handle_runtime_log ( g_kni_comminfo . logger , RLOG_LV_FATAL , KNI_MODULE_SENDLOG , " %s get NIC_NAME: %s error. " , __FUNCTION__ , nic_name ) ;
2018-12-11 23:16:41 +08:00
goto error_out ;
}
inet_ntop ( AF_INET , & ( g_kni_sendlog - > local_ip_nr ) , g_kni_sendlog - > local_ip_str , sizeof ( g_kni_sendlog - > local_ip_str ) ) ;
MESA_load_profile_int_def ( KNI_CONF_FILENAME , KNI_SENDLOG_MODE , " ENTRANCE_ID " , & ( g_kni_sendlog - > entry_id ) , 0 ) ;
ret = MESA_load_profile_string_def ( KNI_CONF_FILENAME , KNI_SENDLOG_MODE , " KAFKA_BROKERLIST " , g_kni_sendlog - > brokerlist , sizeof ( g_kni_sendlog - > brokerlist ) , NULL ) ;
if ( ret < 0 )
{
2018-12-15 22:08:47 +08:00
MESA_handle_runtime_log ( g_kni_comminfo . logger , RLOG_LV_FATAL , KNI_MODULE_SENDLOG , " kni log init failed, no brokerlist in profile %s section %s. " , KNI_CONF_FILENAME , KNI_SENDLOG_MODE ) ;
2018-12-11 23:16:41 +08:00
goto error_out ;
}
g_kni_sendlog - > kafka_handle = create_kafka_handle ( g_kni_sendlog - > brokerlist ) ;
if ( g_kni_sendlog - > kafka_handle = = NULL )
{
2018-12-15 22:08:47 +08:00
MESA_handle_runtime_log ( g_kni_comminfo . logger , RLOG_LV_FATAL , KNI_MODULE_SENDLOG , " kni log init failed. Cannot create lafka handle with brokerlist: %s. " , g_kni_sendlog - > brokerlist ) ;
2018-12-11 23:16:41 +08:00
goto error_out ;
}
g_kni_sendlog - > topic_name = " PXY-KNI-LOG " ;
g_kni_sendlog - > kafka_topic = rd_kafka_topic_new ( g_kni_sendlog - > kafka_handle , g_kni_sendlog - > topic_name , NULL ) ;
return g_kni_sendlog ;
error_out :
free ( g_kni_sendlog ) ;
return NULL ;
}
2018-12-17 08:13:46 +08:00
int kni_send_log ( const struct kni_log * log_msg , char * user_region , char * content )
2018-12-11 23:16:41 +08:00
{
2018-12-15 22:08:47 +08:00
if ( g_kni_switch_info . send_log_switch = = 0 )
2018-12-11 23:16:41 +08:00
{
return 0 ;
}
2018-12-15 22:08:47 +08:00
if ( log_msg - > stream = = NULL )
{
MESA_handle_runtime_log ( g_kni_comminfo . logger , RLOG_LV_FATAL , KNI_MODULE_SENDLOG , " log_msg->stream is NULL,not send log " ) ;
return - 1 ;
}
2018-12-11 23:16:41 +08:00
const struct layer_addr * addr = & ( log_msg - > stream - > addr ) ;
2018-12-15 22:08:47 +08:00
// const char* tmp_val=NULL;
2018-12-11 23:16:41 +08:00
cJSON * common_obj = NULL , * per_hit_obj = NULL ;
char * log_payload = NULL ;
int kafka_status = 0 ;
int send_cnt = 0 ;
time_t cur_time ;
char src_ip_str [ MAX ( INET6_ADDRSTRLEN , INET_ADDRSTRLEN ) ] = { 0 } ;
char dst_ip_str [ MAX ( INET6_ADDRSTRLEN , INET_ADDRSTRLEN ) ] = { 0 } ;
common_obj = cJSON_CreateObject ( ) ;
cur_time = time ( NULL ) ;
cJSON_AddNumberToObject ( common_obj , " found_time " , cur_time ) ;
cJSON_AddNumberToObject ( common_obj , " recv_time " , cur_time ) ;
switch ( addr - > addrtype )
{
case ADDR_TYPE_IPV4 :
cJSON_AddNumberToObject ( common_obj , " addr_type " , 4 ) ;
inet_ntop ( AF_INET , & addr - > tuple4_v4 - > saddr , src_ip_str , sizeof ( src_ip_str ) ) ;
inet_ntop ( AF_INET , & addr - > tuple4_v4 - > daddr , dst_ip_str , sizeof ( dst_ip_str ) ) ;
cJSON_AddStringToObject ( common_obj , " s_ip " , src_ip_str ) ;
cJSON_AddStringToObject ( common_obj , " d_ip " , dst_ip_str ) ;
cJSON_AddNumberToObject ( common_obj , " s_port " , ntohs ( addr - > tuple4_v4 - > source ) ) ;
cJSON_AddNumberToObject ( common_obj , " d_port " , ntohs ( addr - > tuple4_v4 - > dest ) ) ;
cJSON_AddStringToObject ( common_obj , " trans_proto " , " IPv4_TCP " ) ;
break ;
case ADDR_TYPE_IPV6 :
cJSON_AddNumberToObject ( common_obj , " addr_type " , 6 ) ;
inet_ntop ( AF_INET6 , & addr - > tuple4_v6 - > saddr , src_ip_str , sizeof ( src_ip_str ) ) ;
inet_ntop ( AF_INET6 , & addr - > tuple4_v6 - > daddr , dst_ip_str , sizeof ( dst_ip_str ) ) ;
cJSON_AddStringToObject ( common_obj , " s_ip " , src_ip_str ) ;
cJSON_AddStringToObject ( common_obj , " d_ip " , dst_ip_str ) ;
cJSON_AddNumberToObject ( common_obj , " s_port " , ntohs ( addr - > tuple4_v6 - > source ) ) ;
cJSON_AddNumberToObject ( common_obj , " d_port " , ntohs ( addr - > tuple4_v6 - > dest ) ) ;
cJSON_AddStringToObject ( common_obj , " trans_proto " , " IPv6_TCP " ) ;
break ;
default :
break ;
}
cJSON_AddNumberToObject ( common_obj , " direction " , 0 ) ;
cJSON_AddNumberToObject ( common_obj , " stream_dir " , 3 ) ; //1:c2s, 2:s2c, 3:double
cJSON_AddStringToObject ( common_obj , " cap_ip " , g_kni_sendlog - > local_ip_str ) ;
cJSON_AddNumberToObject ( common_obj , " entrance_id " , g_kni_sendlog - > entry_id ) ;
cJSON_AddNumberToObject ( common_obj , " device_id " , 0 ) ;
2018-12-17 08:13:46 +08:00
if ( user_region ! = NULL )
{
cJSON_AddStringToObject ( common_obj , " user_region " , user_region ) ;
cJSON_AddStringToObject ( common_obj , " version " , content ) ;
}
else
{
cJSON_AddStringToObject ( common_obj , " user_region " , " null " ) ;
}
2018-12-11 23:16:41 +08:00
for ( size_t i = 0 ; i < log_msg - > result_num ; i + + )
{
if ( log_msg - > result [ i ] . do_log = = 0 )
{
continue ;
}
per_hit_obj = cJSON_Duplicate ( common_obj , 1 ) ;
cJSON_AddNumberToObject ( per_hit_obj , " cfg_id " , log_msg - > result [ i ] . config_id ) ;
cJSON_AddNumberToObject ( per_hit_obj , " service " , log_msg - > result [ i ] . service_id ) ;
log_payload = cJSON_Print ( per_hit_obj ) ;
fprintf ( stderr , " %s \n " , log_payload ) ;
kafka_status = rd_kafka_produce ( g_kni_sendlog - > kafka_topic , RD_KAFKA_PARTITION_UA , RD_KAFKA_MSG_F_COPY ,
log_payload , strlen ( log_payload ) , NULL , 0 , NULL ) ;
free ( log_payload ) ;
cJSON_Delete ( per_hit_obj ) ;
if ( kafka_status < 0 )
{
2018-12-15 22:08:47 +08:00
MESA_handle_runtime_log ( g_kni_comminfo . logger , RLOG_LV_FATAL , KNI_MODULE_SENDLOG , " Kafka produce failed: %s " , rd_kafka_err2name ( rd_kafka_last_error ( ) ) ) ;
2018-12-11 23:16:41 +08:00
}
send_cnt + + ;
}
cJSON_Delete ( common_obj ) ;
return send_cnt ;
}