diff --git a/lirenjie_vxlan_sapp.c b/lirenjie_vxlan_sapp.c index 2c90941..166f7fe 100644 --- a/lirenjie_vxlan_sapp.c +++ b/lirenjie_vxlan_sapp.c @@ -127,6 +127,7 @@ static int init_kafka(int partition_, char *brokers_, char *topic_) /* Quick termination */ snprintf(tmp, sizeof(tmp), "%i", SIGIO); rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0); + rd_kafka_conf_set(conf, "producer.type", "kafka.producer.AyncProducer", errstr, sizeof(errstr)); rd_kafka_conf_set(conf, "queue.buffering.max.messages", "1000000", errstr, sizeof(errstr)); rd_kafka_conf_set(conf, "topic.metadata.refresh.interval.ms", "600000",errstr, sizeof(errstr)); rd_kafka_conf_set(conf, "request.required.acks", "1", errstr, sizeof(errstr)); @@ -177,8 +178,8 @@ static int push_data_to_kafka(char *buffer, int buf_len) { return 0; } - ret = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, buffer, (size_t)buf_len, NULL, 0, NULL); - //ret = rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, buffer, (size_t)buf_len, NULL, 0, NULL); + // ret = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, buffer, (size_t)buf_len, NULL, 0, NULL); + ret = rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, buffer, (size_t)buf_len, NULL, 0, NULL); if (ret == -1) { /*fprintf(stderr, @@ -186,7 +187,7 @@ static int push_data_to_kafka(char *buffer, int buf_len) "partition %i: %s\n", rd_kafka_topic_name(rkt), partition, rd_kafka_err2str(rd_kafka_last_error()));*/ - MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"%% Failed to produce to topic %s partition %i: %s", + MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name,"%% Failed to produce to topic %s partition %i: %s", rd_kafka_topic_name(rkt), partition, rd_kafka_err2str(rd_kafka_last_error())); /* Poll to handle delivery reports */ @@ -196,8 +197,8 @@ static int push_data_to_kafka(char *buffer, int buf_len) /*fprintf(stderr, "%% Sent %zd bytes to topic " "%s partition %i\n", buf_len, rd_kafka_topic_name(rkt), partition);*/ - MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"%% Sent %zd bytes to topic %s partition %i", - buf_len, rd_kafka_topic_name(rkt), partition); + // MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"%% Sent %zd bytes to topic %s partition %i", + // buf_len, rd_kafka_topic_name(rkt), partition); //rd_kafka_poll(kafka_producer, 0); return PUSH_DATA_SUCCESS; } @@ -591,7 +592,7 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps tinfo->ipv4_sip, tinfo->ipv4_dip, tinfo->ipv4_id, tinfo->ipv4_off, //ipv6 stat is NULL, flow_type); - MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info); + // MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info); push_data_to_kafka(info,strlen(info)); break; case ADDR_TYPE_IPV6: @@ -607,7 +608,7 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->vxlan_vpn_id, tinfo->ipv6_sip, tinfo->ipv6_dip, tinfo->ipv6_bus_type, tinfo->ipv6_flow_flag, tinfo->ipv6_load_length, tinfo->ipv6_next_msg_head, tinfo->ipv6_limit, flow_type); - MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info); + // MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info); push_data_to_kafka(info,strlen(info)); break; case ADDR_TYPE_ARP: @@ -622,7 +623,7 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps tinfo->stat_time.tv_sec*1000 + tinfo->stat_time.tv_usec/1000, tinfo->vx_type, vxlan_sip_ip_str, vxlan_dip_ip_str, tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->vxlan_vpn_id, flow_type); - MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info); + // MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info); push_data_to_kafka(info,strlen(info)); break; default: @@ -917,7 +918,7 @@ int CHAR_INIT() /* kafka初始化 */ if (init_kafka(0, brokers, topic) != PRODUCER_INIT_SUCCESS) { - MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"kafka init failed!!!"); + MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name,"kafka init failed!!!"); return -1; } @@ -933,7 +934,7 @@ int CHAR_INIT() void LRJ_APP_DESTROY() { MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "TEST_APP_DESTORY in...\n"); - printf("TEST_APP_DESTORY in...\n"); + // printf("TEST_APP_DESTORY in...\n"); kafka_destroy(); if (runtime_log_handler == NULL) { @@ -942,6 +943,6 @@ void LRJ_APP_DESTROY() } MESA_destroy_runtime_log_handle(runtime_log_handler); MESA_destroy_runtime_log_handle(kafka_log_handler); - printf("TEST_APP_DESTORY out...\n"); + // printf("TEST_APP_DESTORY out...\n"); }