From e43089b1ea08759107b1667aa512629191fd91db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E7=8E=BA=E5=BA=B7?= Date: Thu, 6 Aug 2020 16:43:23 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0pom=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dependency-reduced-pom.xml | 2 +- pom.xml | 80 ++++++++++++--- .../java/cn/ac/iie/flume/bean/NewConn.java | 2 - .../cn/ac/iie/flume/interceptor/Csv2Json.java | 93 ++++++++++-------- .../ac/iie/flume/interceptor/File2Json.java | 98 +++++++++++++++++++ .../java/cn/ac/iie/flume/source/MySource.java | 42 ++++++++ src/test/java/EventTest.java | 21 ++++ 7 files changed, 281 insertions(+), 57 deletions(-) create mode 100644 src/main/java/cn/ac/iie/flume/interceptor/File2Json.java create mode 100644 src/main/java/cn/ac/iie/flume/source/MySource.java create mode 100644 src/test/java/EventTest.java diff --git a/dependency-reduced-pom.xml b/dependency-reduced-pom.xml index a7d4f5b..dc74c1b 100644 --- a/dependency-reduced-pom.xml +++ b/dependency-reduced-pom.xml @@ -32,7 +32,7 @@ - cn.ac.iie.flume.interceptor.Json2Csv + cn.ac.iie.flume.interceptor.File2Json META-INF/spring.handlers diff --git a/pom.xml b/pom.xml index 7f41270..46ff823 100644 --- a/pom.xml +++ b/pom.xml @@ -7,6 +7,72 @@ cn.ac.iie.flume flume-interceptor 1.0-SNAPSHOT + jar + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 2.4.2 + + + package + + shade + + + + + cn.ac.iie.flume.interceptor.File2Json + + + META-INF/spring.handlers + + + META-INF/spring.schemas + + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 2.3.2 + + 1.8 + 1.8 + + + + + + properties + + **/*.properties + + false + + + properties + + log4j.properties + + false + + + + + @@ -40,18 +106,4 @@ - - - - org.apache.maven.plugins - maven-compiler-plugin - - 1.8 - 1.8 - UTF-8 - - - - - \ No newline at end of file diff --git a/src/main/java/cn/ac/iie/flume/bean/NewConn.java b/src/main/java/cn/ac/iie/flume/bean/NewConn.java index 5408f51..cc80932 100644 --- a/src/main/java/cn/ac/iie/flume/bean/NewConn.java +++ b/src/main/java/cn/ac/iie/flume/bean/NewConn.java @@ -133,8 +133,6 @@ public class NewConn { private String voip_called_number; private String streaming_media_url; private String streaming_media_protocol; - - public long getCommon_log_id() { return common_log_id; } diff --git a/src/main/java/cn/ac/iie/flume/interceptor/Csv2Json.java b/src/main/java/cn/ac/iie/flume/interceptor/Csv2Json.java index 48f66ee..eb11cac 100644 --- a/src/main/java/cn/ac/iie/flume/interceptor/Csv2Json.java +++ b/src/main/java/cn/ac/iie/flume/interceptor/Csv2Json.java @@ -11,6 +11,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; +import java.util.Map; public class Csv2Json implements Interceptor { @@ -34,52 +35,64 @@ public class Csv2Json implements Interceptor { //TODO 处理单个event的代码 //取出一条csv格式的数据 - body = event.getBody(); - json = new String(body); - String[] strings = json.replaceAll("\t"," " ).replaceAll("\"","" ).split(","); + try{ + body = event.getBody(); - newConn = new NewConn(); + Map headers = event.getHeaders(); - newConn.setCommon_log_id(Long.parseLong("0")); - newConn.setCommon_policy_id(Integer.parseInt(strings[1])); - newConn.setCommon_start_time(Integer.parseInt(strings[2])); - newConn.setCommon_recv_time(Integer.parseInt(strings[3])); - newConn.setCommon_l4_protocol(strings[4]); - newConn.setCommon_address_type(Integer.parseInt(strings[5])); - newConn.setCommon_client_ip(strings[6]); - newConn.setCommon_server_ip(strings[7]); - newConn.setCommon_client_port(Integer.parseInt(strings[8])); - newConn.setCommon_server_port(Integer.parseInt(strings[9])); - newConn.setCommon_service(Integer.parseInt(strings[10])); - newConn.setCommon_entrance_id(Integer.parseInt(strings[11])); - newConn.setCommon_device_id(strings[12]); - newConn.setCommon_link_id(Integer.parseInt(strings[13])); - newConn.setCommon_encapsulation(Integer.parseInt(strings[14])); - newConn.setCommon_direction(Integer.parseInt(strings[15])); - newConn.setCommon_stream_dir(Integer.parseInt(strings[18])); - newConn.setCommon_sled_ip(strings[19]); - newConn.setCommon_address_list(strings[20]); - newConn.setCommon_server_location(strings[21]); - newConn.setCommon_client_location(strings[22]); - newConn.setCommon_server_asn(strings[23]); - newConn.setCommon_client_asn(strings[24]); - newConn.setCommon_subscriber_id(strings[26]); - newConn.setCommon_user_region(strings[27]); - newConn.setCommon_c2s_pkt_num(Integer.parseInt(strings[30])); - newConn.setCommon_s2c_pkt_num(Integer.parseInt(strings[31])); - newConn.setCommon_c2s_byte_num(Integer.parseInt(strings[32])); - newConn.setCommon_s2c_byte_num(Integer.parseInt(strings[33])); - newConn.setCommon_protocol_label(strings[34]); - newConn.setCommon_app_label(strings[35]); - newConn.setCommon_action(Integer.parseInt(strings[39])); + json = new String(body); + + String[] strings = json.replaceAll("\t"," " ).replaceAll("\"","" ).split(","); + + newConn = new NewConn(); + + newConn.setCommon_log_id(Long.parseLong("0")); + newConn.setCommon_policy_id(Integer.parseInt(strings[1])); + newConn.setCommon_start_time(Integer.parseInt(strings[2])); + newConn.setCommon_recv_time(Integer.parseInt(strings[3])); + newConn.setCommon_l4_protocol(strings[4]); + newConn.setCommon_address_type(Integer.parseInt(strings[5])); + newConn.setCommon_client_ip(strings[6]); + newConn.setCommon_server_ip(strings[7]); + newConn.setCommon_client_port(Integer.parseInt(strings[8])); + newConn.setCommon_server_port(Integer.parseInt(strings[9])); + newConn.setCommon_service(Integer.parseInt(strings[10])); + newConn.setCommon_entrance_id(Integer.parseInt(strings[11])); + newConn.setCommon_device_id(strings[12]); + newConn.setCommon_link_id(Integer.parseInt(strings[13])); + newConn.setCommon_encapsulation(Integer.parseInt(strings[14])); + newConn.setCommon_direction(Integer.parseInt(strings[15])); + newConn.setCommon_stream_dir(Integer.parseInt(strings[18])); + newConn.setCommon_sled_ip(strings[19]); + newConn.setCommon_address_list(strings[20]); + newConn.setCommon_server_location(strings[21]); + newConn.setCommon_client_location(strings[22]); + newConn.setCommon_server_asn(strings[23]); + newConn.setCommon_client_asn(strings[24]); + newConn.setCommon_subscriber_id(strings[26]); + newConn.setCommon_user_region(strings[27]); + newConn.setCommon_c2s_pkt_num(Integer.parseInt(strings[30])); + newConn.setCommon_s2c_pkt_num(Integer.parseInt(strings[31])); + newConn.setCommon_c2s_byte_num(Integer.parseInt(strings[32])); + newConn.setCommon_s2c_byte_num(Integer.parseInt(strings[33])); + newConn.setCommon_protocol_label(strings[34]); + newConn.setCommon_app_label(strings[35]); + newConn.setCommon_action(Integer.parseInt(strings[39])); - json = JSONObject.toJSONString(newConn); + json = JSONObject.toJSONString(newConn); + + body = json.getBytes(); + event.setBody(body); + return event; + } + catch (Exception e){ + logger.error("消息转换异常========"); + e.printStackTrace(); + return null; + } - body = json.getBytes(); - event.setBody(body); - return event; } //TODO 处理多个event的函数 diff --git a/src/main/java/cn/ac/iie/flume/interceptor/File2Json.java b/src/main/java/cn/ac/iie/flume/interceptor/File2Json.java new file mode 100644 index 0000000..6f5fec8 --- /dev/null +++ b/src/main/java/cn/ac/iie/flume/interceptor/File2Json.java @@ -0,0 +1,98 @@ +package cn.ac.iie.flume.interceptor; + +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.interceptor.Interceptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class File2Json implements Interceptor { + + //打印日志,便于测试方法的执行顺序 + private static final Logger logger = LoggerFactory.getLogger(File2Json.class); + + private static byte[] body; + + private static String json; + + + + @Override + public void initialize() { + logger.info("----------自定义拦截器的initialize方法执行"); + } + + @Override + public Event intercept(Event event) { + logger.info("----------intercept(Event event)方法执行,处理单个event"); + + //TODO 处理单个event的代码 + //取出一条csv格式的数据 + + try{ + body = event.getBody(); + + Map headers = event.getHeaders(); + + + logger.info( "" + headers.keySet().size()); + for (String key : headers.keySet()) { + + logger.info("[ key: " + key +"," + "value: " + headers.get(key) + "]"); + } + + event.setBody(body); + return event; + } + catch (Exception e){ + logger.error("消息转换异常========"); + e.printStackTrace(); + return null; + } + + } + + //TODO 处理多个event的函数 + @Override + public List intercept(List events) { + + List results = new ArrayList<>(); + Event event; + for (Event e : events) { + + event = intercept(e); + if (event != null) { + results.add(event); + } + } + return results; + } + + @Override + public void close() { + logger.info("----------自定义拦截器close方法执行"); + } + + public static class Builder implements Interceptor.Builder { + + @Override + public Interceptor build() { + logger.info("----------build方法执行"); + return new File2Json(); + } + + @Override + public void configure(Context context) { + logger.info("----------configure方法执行"); + } + } + + + public static void main(String[] args) { + + } +} diff --git a/src/main/java/cn/ac/iie/flume/source/MySource.java b/src/main/java/cn/ac/iie/flume/source/MySource.java new file mode 100644 index 0000000..2ac7561 --- /dev/null +++ b/src/main/java/cn/ac/iie/flume/source/MySource.java @@ -0,0 +1,42 @@ +package cn.ac.iie.flume.source; + +import cn.ac.iie.flume.bean.NewConn; +import org.apache.flume.Context; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.PollableSource; +import org.apache.flume.conf.Configurable; +import org.apache.flume.source.AbstractSource; + +/** + * @ClassNameMySource + * @Author lixkvip@126.com + * @Date2020/7/28 18:29 + * @Version V1.0 + **/ +public class MySource extends AbstractSource implements Configurable, PollableSource { + + + @Override + public Status process() throws EventDeliveryException { + + NewConn newConn = new NewConn(); + + + return null; + } + + @Override + public long getBackOffSleepIncrement() { + return 0; + } + + @Override + public long getMaxBackOffSleepInterval() { + return 0; + } + + @Override + public void configure(Context context) { + + } +} diff --git a/src/test/java/EventTest.java b/src/test/java/EventTest.java new file mode 100644 index 0000000..6d4e065 --- /dev/null +++ b/src/test/java/EventTest.java @@ -0,0 +1,21 @@ +/** + * @ClassNameEventTest + * @Author lixkvip@126.com + * @Date2020/7/29 15:38 + * @Version V1.0 + **/ +public class EventTest { + + public static void main(String[] args) { + + String json = "\"0e1178ab-986f-4bbb-91f9-94b1f70ce3a2\",0,1546624800,1546624800,\"IPv4_UDP\",0,\"37.150.25.154\",\"46.20.187.219\",32265,44985,160,2,0,0,0,0,\"\",\"\",1,\"10.3.4.10\",\"48975-14441-46.20.187.219-37.150.25.154\",\"\",\"\",\"42322\",\"9198\",\"\",\"\",\"\",\"\",\"PROTO_ID=0;APP_ID=0;OS_ID=0;BS_ID=0;WEB_ID=0;BEHAV_ID=0;\",3,0,60,0,0,0,0,0,0,0"; + + String[] strings = json.replaceAll("\t"," " ).replaceAll("\"","" ).split(","); + + System.out.println(strings[29]); + System.out.println(strings[28]); + System.out.println(strings[30]); + + + } +}