From 1ed41041ac3ae582380c29f136be30aba9f08b32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E4=BB=81=E6=9D=B0?= Date: Mon, 12 Aug 2019 16:02:23 +0800 Subject: [PATCH] =?UTF-8?q?Update=20lirenjie=5Fvxlan=5Fsapp.c=201=E3=80=81?= =?UTF-8?q?init=5Fkafka=E4=B8=AD=E4=BF=AE=E6=94=B9kafka=E7=94=9F=E4=BA=A7?= =?UTF-8?q?=E8=80=85=E4=B8=BA=E5=BC=82=E6=AD=A5=E5=8F=91=E9=80=81=EF=BC=88?= =?UTF-8?q?producer.type=E8=AE=BE=E4=B8=BAkafka.producer.AyncProducer?= =?UTF-8?q?=EF=BC=89=202=E3=80=81push=5Fdata=5Fto=5Fkafka=E4=B8=ADrd=5Fkaf?= =?UTF-8?q?ka=5Fproduce=E4=B8=AD=E7=9A=84partition=E5=8F=82=E6=95=B0?= =?UTF-8?q?=E6=94=B9=E4=B8=BARD=5FKAFKA=5FPARTITION=5FUA=203=E3=80=81?= =?UTF-8?q?=E5=88=A0=E9=99=A4=E5=A4=9A=E4=BA=8E=E7=9A=84=E5=86=99=E6=97=A5?= =?UTF-8?q?=E5=BF=97=E6=93=8D=E4=BD=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lirenjie_vxlan_sapp.c | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/lirenjie_vxlan_sapp.c b/lirenjie_vxlan_sapp.c index 2c90941..166f7fe 100644 --- a/lirenjie_vxlan_sapp.c +++ b/lirenjie_vxlan_sapp.c @@ -127,6 +127,7 @@ static int init_kafka(int partition_, char *brokers_, char *topic_) /* Quick termination */ snprintf(tmp, sizeof(tmp), "%i", SIGIO); rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0); + rd_kafka_conf_set(conf, "producer.type", "kafka.producer.AyncProducer", errstr, sizeof(errstr)); rd_kafka_conf_set(conf, "queue.buffering.max.messages", "1000000", errstr, sizeof(errstr)); rd_kafka_conf_set(conf, "topic.metadata.refresh.interval.ms", "600000",errstr, sizeof(errstr)); rd_kafka_conf_set(conf, "request.required.acks", "1", errstr, sizeof(errstr)); @@ -177,8 +178,8 @@ static int push_data_to_kafka(char *buffer, int buf_len) { return 0; } - ret = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, buffer, (size_t)buf_len, NULL, 0, NULL); - //ret = rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, buffer, (size_t)buf_len, NULL, 0, NULL); + // ret = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, buffer, (size_t)buf_len, NULL, 0, NULL); + ret = rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, buffer, (size_t)buf_len, NULL, 0, NULL); if (ret == -1) { /*fprintf(stderr, @@ -186,7 +187,7 @@ static int push_data_to_kafka(char *buffer, int buf_len) "partition %i: %s\n", rd_kafka_topic_name(rkt), partition, rd_kafka_err2str(rd_kafka_last_error()));*/ - MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"%% Failed to produce to topic %s partition %i: %s", + MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name,"%% Failed to produce to topic %s partition %i: %s", rd_kafka_topic_name(rkt), partition, rd_kafka_err2str(rd_kafka_last_error())); /* Poll to handle delivery reports */ @@ -196,8 +197,8 @@ static int push_data_to_kafka(char *buffer, int buf_len) /*fprintf(stderr, "%% Sent %zd bytes to topic " "%s partition %i\n", buf_len, rd_kafka_topic_name(rkt), partition);*/ - MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"%% Sent %zd bytes to topic %s partition %i", - buf_len, rd_kafka_topic_name(rkt), partition); + // MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"%% Sent %zd bytes to topic %s partition %i", + // buf_len, rd_kafka_topic_name(rkt), partition); //rd_kafka_poll(kafka_producer, 0); return PUSH_DATA_SUCCESS; } @@ -591,7 +592,7 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps tinfo->ipv4_sip, tinfo->ipv4_dip, tinfo->ipv4_id, tinfo->ipv4_off, //ipv6 stat is NULL, flow_type); - MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info); + // MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info); push_data_to_kafka(info,strlen(info)); break; case ADDR_TYPE_IPV6: @@ -607,7 +608,7 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->vxlan_vpn_id, tinfo->ipv6_sip, tinfo->ipv6_dip, tinfo->ipv6_bus_type, tinfo->ipv6_flow_flag, tinfo->ipv6_load_length, tinfo->ipv6_next_msg_head, tinfo->ipv6_limit, flow_type); - MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info); + // MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info); push_data_to_kafka(info,strlen(info)); break; case ADDR_TYPE_ARP: @@ -622,7 +623,7 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps tinfo->stat_time.tv_sec*1000 + tinfo->stat_time.tv_usec/1000, tinfo->vx_type, vxlan_sip_ip_str, vxlan_dip_ip_str, tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->vxlan_vpn_id, flow_type); - MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info); + // MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info); push_data_to_kafka(info,strlen(info)); break; default: @@ -917,7 +918,7 @@ int CHAR_INIT() /* kafka初始化 */ if (init_kafka(0, brokers, topic) != PRODUCER_INIT_SUCCESS) { - MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"kafka init failed!!!"); + MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name,"kafka init failed!!!"); return -1; } @@ -933,7 +934,7 @@ int CHAR_INIT() void LRJ_APP_DESTROY() { MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "TEST_APP_DESTORY in...\n"); - printf("TEST_APP_DESTORY in...\n"); + // printf("TEST_APP_DESTORY in...\n"); kafka_destroy(); if (runtime_log_handler == NULL) { @@ -942,6 +943,6 @@ void LRJ_APP_DESTROY() } MESA_destroy_runtime_log_handle(runtime_log_handler); MESA_destroy_runtime_log_handle(kafka_log_handler); - printf("TEST_APP_DESTORY out...\n"); + // printf("TEST_APP_DESTORY out...\n"); }