Update lirenjie_vxlan_sapp.c

1、init_kafka中修改kafka生产者为异步发送(producer.type设为kafka.producer.AyncProducer)
2、push_data_to_kafka中rd_kafka_produce中的partition参数改为RD_KAFKA_PARTITION_UA
3、删除多于的写日志操作
This commit is contained in:
李仁杰
2019-08-12 16:02:23 +08:00
parent 2d76f1df35
commit 1ed41041ac

View File

@@ -127,6 +127,7 @@ static int init_kafka(int partition_, char *brokers_, char *topic_)
/* Quick termination */
snprintf(tmp, sizeof(tmp), "%i", SIGIO);
rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);
rd_kafka_conf_set(conf, "producer.type", "kafka.producer.AyncProducer", errstr, sizeof(errstr));
rd_kafka_conf_set(conf, "queue.buffering.max.messages", "1000000", errstr, sizeof(errstr));
rd_kafka_conf_set(conf, "topic.metadata.refresh.interval.ms", "600000",errstr, sizeof(errstr));
rd_kafka_conf_set(conf, "request.required.acks", "1", errstr, sizeof(errstr));
@@ -177,8 +178,8 @@ static int push_data_to_kafka(char *buffer, int buf_len)
{
return 0;
}
ret = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, buffer, (size_t)buf_len, NULL, 0, NULL);
//ret = rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, buffer, (size_t)buf_len, NULL, 0, NULL);
// ret = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, buffer, (size_t)buf_len, NULL, 0, NULL);
ret = rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, buffer, (size_t)buf_len, NULL, 0, NULL);
if (ret == -1)
{
/*fprintf(stderr,
@@ -186,7 +187,7 @@ static int push_data_to_kafka(char *buffer, int buf_len)
"partition %i: %s\n",
rd_kafka_topic_name(rkt), partition,
rd_kafka_err2str(rd_kafka_last_error()));*/
MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"%% Failed to produce to topic %s partition %i: %s",
MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name,"%% Failed to produce to topic %s partition %i: %s",
rd_kafka_topic_name(rkt), partition,
rd_kafka_err2str(rd_kafka_last_error()));
/* Poll to handle delivery reports */
@@ -196,8 +197,8 @@ static int push_data_to_kafka(char *buffer, int buf_len)
/*fprintf(stderr, "%% Sent %zd bytes to topic "
"%s partition %i\n",
buf_len, rd_kafka_topic_name(rkt), partition);*/
MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"%% Sent %zd bytes to topic %s partition %i",
buf_len, rd_kafka_topic_name(rkt), partition);
// MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"%% Sent %zd bytes to topic %s partition %i",
// buf_len, rd_kafka_topic_name(rkt), partition);
//rd_kafka_poll(kafka_producer, 0);
return PUSH_DATA_SUCCESS;
}
@@ -591,7 +592,7 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps
tinfo->ipv4_sip, tinfo->ipv4_dip, tinfo->ipv4_id, tinfo->ipv4_off,
//ipv6 stat is NULL,
flow_type);
MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info);
// MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info);
push_data_to_kafka(info,strlen(info));
break;
case ADDR_TYPE_IPV6:
@@ -607,7 +608,7 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps
tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->vxlan_vpn_id,
tinfo->ipv6_sip, tinfo->ipv6_dip, tinfo->ipv6_bus_type, tinfo->ipv6_flow_flag, tinfo->ipv6_load_length,
tinfo->ipv6_next_msg_head, tinfo->ipv6_limit, flow_type);
MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info);
// MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info);
push_data_to_kafka(info,strlen(info));
break;
case ADDR_TYPE_ARP:
@@ -622,7 +623,7 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps
tinfo->stat_time.tv_sec*1000 + tinfo->stat_time.tv_usec/1000, tinfo->vx_type, vxlan_sip_ip_str, vxlan_dip_ip_str,
tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->vxlan_vpn_id,
flow_type);
MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info);
// MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info);
push_data_to_kafka(info,strlen(info));
break;
default:
@@ -917,7 +918,7 @@ int CHAR_INIT()
/* kafka初始化 */
if (init_kafka(0, brokers, topic) != PRODUCER_INIT_SUCCESS)
{
MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"kafka init failed!!!");
MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name,"kafka init failed!!!");
return -1;
}
@@ -933,7 +934,7 @@ int CHAR_INIT()
void LRJ_APP_DESTROY()
{
MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "TEST_APP_DESTORY in...\n");
printf("TEST_APP_DESTORY in...\n");
// printf("TEST_APP_DESTORY in...\n");
kafka_destroy();
if (runtime_log_handler == NULL)
{
@@ -942,6 +943,6 @@ void LRJ_APP_DESTROY()
}
MESA_destroy_runtime_log_handle(runtime_log_handler);
MESA_destroy_runtime_log_handle(kafka_log_handler);
printf("TEST_APP_DESTORY out...\n");
// printf("TEST_APP_DESTORY out...\n");
}